Pig
  1. Pig
  2. PIG-2824

Pushing checking number of fields into LoadFunc

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.9.0, 0.10.0
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      As described in PIG-1188, if users define a schema (w or w/o types), we need to check the number of fields after loading data, so if there are less fields we need to pad null fields, and if there are more fields we need to throw them away.

      For schema with types, Pig used to insert a Foreach after the loader for type casting which also checks #fields. For schema without types there was no such Foreach, thus PIG-1188 inserted one just for checking #fields. Unfortunately, Foreach is too expensive for such checking, and ideally we can push it into the loader.

      1. 2824.png
        45 kB
        Jie Li
      2. 2824.patch
        60 kB
        Jie Li

        Issue Links

          Activity

          Hide
          Jie Li added a comment -

          Oh yeah. Linked to PIG-2661.

          Show
          Jie Li added a comment - Oh yeah. Linked to PIG-2661 .
          Hide
          Dmitriy V. Ryaboy added a comment -

          There was a related Jira where we discussed this in more detail... Jie, do you remember which one that was?

          Show
          Dmitriy V. Ryaboy added a comment - There was a related Jira where we discussed this in more detail... Jie, do you remember which one that was?
          Hide
          Alan Gates added a comment -

          Canceling patch pending response to Dmitriy's feedback.

          Show
          Alan Gates added a comment - Canceling patch pending response to Dmitriy's feedback.
          Hide
          Dmitriy V. Ryaboy added a comment -

          Jie, that's a good catch and a nice perf improvement, but the solution seems a bit heavyweight.

          What if we instead modified POLoad to automatically perform this check, and be aware of expected schemas?

          Show
          Dmitriy V. Ryaboy added a comment - Jie, that's a good catch and a nice perf improvement, but the solution seems a bit heavyweight. What if we instead modified POLoad to automatically perform this check, and be aware of expected schemas?
          Hide
          Jie Li added a comment -

          Also run a comparison using TPC-H 19:

          lineitem = load '$input/lineitem' USING PigStorage('|') as (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate,l_shipinstruct, l_shipmode, l_comment);
          
          part = load '$input/part' USING PigStorage('|') as (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment);
          
          lpart = JOIN lineitem BY l_partkey, part by p_partkey;
          
          fltResult = FILTER lpart BY 
            (
              p_brand == 'Brand#12'
          	and p_container matches 'SM CASE|SM BOX|SM PACK|SM PKG'
          	and l_quantity >= 1 and l_quantity <= 11
          	and p_size >= 1 and p_size <= 5
          	and l_shipmode matches 'AIR|AIR REG'
          	and l_shipinstruct == 'DELIVER IN PERSON'
            ) 
            or 
            (
              p_brand == 'Brand#23'
          	and p_container matches 'MED BAG|MED BOX|MED PKG|MED PACK'
          	and l_quantity >= 10 and l_quantity <= 20
          	and p_size >= 1 and p_size <= 10
          	and l_shipmode matches 'AIR|AIR REG'
          	and l_shipinstruct == 'DELIVER IN PERSON'
            )
            or
            (
          	p_brand == 'Brand#34'
          	and p_container matches 'LG CASE|LG BOX|LG PACK|LG PKG'
          	and l_quantity >= 20 and l_quantity <= 30
          	and p_size >= 1 and p_size <= 15
          	and l_shipmode matches 'AIR|AIR REG'
          	and l_shipinstruct == 'DELIVER IN PERSON'
            );
          volume = FOREACH fltResult GENERATE l_extendedprice * (1 - l_discount);
          grpResult = GROUP volume ALL;
          revenue = FOREACH grpResult GENERATE SUM(volume);
          
          store revenue into '$output/Q19out' USING PigStorage('|');
          

          It consists of a join job which dominates the running time, and a light-weight group job. Below is the comparison of the map phase time for processing 10GB data:

          trunk this patch
          7m54s 7m22s

          The improvement is less significant as previous mini benchmark because half fields are pruned, but still we can see 30 seconds speed up (6%).

          Show
          Jie Li added a comment - Also run a comparison using TPC-H 19: lineitem = load '$input/lineitem' USING PigStorage('|') as (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate,l_shipinstruct, l_shipmode, l_comment); part = load '$input/part' USING PigStorage('|') as (p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment); lpart = JOIN lineitem BY l_partkey, part by p_partkey; fltResult = FILTER lpart BY ( p_brand == 'Brand#12' and p_container matches 'SM CASE|SM BOX|SM PACK|SM PKG' and l_quantity >= 1 and l_quantity <= 11 and p_size >= 1 and p_size <= 5 and l_shipmode matches 'AIR|AIR REG' and l_shipinstruct == 'DELIVER IN PERSON' ) or ( p_brand == 'Brand#23' and p_container matches 'MED BAG|MED BOX|MED PKG|MED PACK' and l_quantity >= 10 and l_quantity <= 20 and p_size >= 1 and p_size <= 10 and l_shipmode matches 'AIR|AIR REG' and l_shipinstruct == 'DELIVER IN PERSON' ) or ( p_brand == 'Brand#34' and p_container matches 'LG CASE|LG BOX|LG PACK|LG PKG' and l_quantity >= 20 and l_quantity <= 30 and p_size >= 1 and p_size <= 15 and l_shipmode matches 'AIR|AIR REG' and l_shipinstruct == 'DELIVER IN PERSON' ); volume = FOREACH fltResult GENERATE l_extendedprice * (1 - l_discount); grpResult = GROUP volume ALL; revenue = FOREACH grpResult GENERATE SUM(volume); store revenue into '$output/Q19out' USING PigStorage('|'); It consists of a join job which dominates the running time, and a light-weight group job. Below is the comparison of the map phase time for processing 10GB data: trunk this patch 7m54s 7m22s The improvement is less significant as previous mini benchmark because half fields are pruned, but still we can see 30 seconds speed up (6%).
          Hide
          Jie Li added a comment -

          Here is the script I used:

          LineItems = LOAD '$input/lineitem' USING PigStorage('|') AS (orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, returnflag, linestatus, shipdate, commitdate, receiptdate, shipinstruct, shipmode, comment);
          
          Result = filter LineItems by 1==0; 
          
          STORE Result INTO '$output/filter';
          

          Note again we specified -t PushUpFilter to force processing Foreach before the filter, so we can observe the overhead of Foreach. With this patch, Foreach will not be inserted and we can achieve the improvement shown in 2824.png, which is about 234 seconds vs. 147 seconds for loading 10GB data.

          Show
          Jie Li added a comment - Here is the script I used: LineItems = LOAD '$input/lineitem' USING PigStorage('|') AS (orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, returnflag, linestatus, shipdate, commitdate, receiptdate, shipinstruct, shipmode, comment); Result = filter LineItems by 1==0; STORE Result INTO '$output/filter'; Note again we specified -t PushUpFilter to force processing Foreach before the filter, so we can observe the overhead of Foreach. With this patch, Foreach will not be inserted and we can achieve the improvement shown in 2824.png, which is about 234 seconds vs. 147 seconds for loading 10GB data.
          Hide
          Jie Li added a comment -

          Attached an initial patch that introduces a new interface LoadCheckSchema (any better naming?) that defines whether the loader is able to check the number of fields, i.e. padding null fields or throwing away extra fields if necessary. If a given loader does not implement this interface it'd be assumed that it's unable to check fields, and Pig will use a FOREACH to project all fields, which can be more expensive. Initially only PigStorage implements this interface.

          Adjusted (not reverted) unit tests modified by PIG-1188, by adding type info in the schema, so those tests won't be affect by this optimization now (and in future when it's got disabled). Also add a few tests verifying FOREACH is not generated if all types are bytearray.

          Any comment is appreciated.

          Show
          Jie Li added a comment - Attached an initial patch that introduces a new interface LoadCheckSchema (any better naming?) that defines whether the loader is able to check the number of fields, i.e. padding null fields or throwing away extra fields if necessary. If a given loader does not implement this interface it'd be assumed that it's unable to check fields, and Pig will use a FOREACH to project all fields, which can be more expensive. Initially only PigStorage implements this interface. Adjusted (not reverted) unit tests modified by PIG-1188 , by adding type info in the schema, so those tests won't be affect by this optimization now (and in future when it's got disabled). Also add a few tests verifying FOREACH is not generated if all types are bytearray. Any comment is appreciated.
          Hide
          Jie Li added a comment -

          The idea would be similar to pushing projection down to the loader. We can create another interface (say, LoadCheckSchema) and LoadFuncs which implement it will check #fields by themselves, otherwise Pig will insert a Foreach as usual.

          Show
          Jie Li added a comment - The idea would be similar to pushing projection down to the loader. We can create another interface (say, LoadCheckSchema) and LoadFuncs which implement it will check #fields by themselves, otherwise Pig will insert a Foreach as usual.
          Hide
          Jie Li added a comment -

          Attached result from a benchmark loading 10GB data that has 60 million records with 16 fields. We compare three runs: Mapreduce, Pig with schema (no types) and Pig without schema.

          For Mapreduce, an empty map function is specified.

          For Pig, in order to isolate the loading time, we apply a filter to throw out all data after loading, and also disable the PushUpFilter optimization so Foreach will be processed after data loading. Also note there is no type in the schema so there is no type casting here.

          We can see Pig without schema is much faster than Pig with schema, due to the saving of a Foreach for checking #fields.

          (We can also see the overhead incurred by Pig than pure Mapreduce in this case.)

          Show
          Jie Li added a comment - Attached result from a benchmark loading 10GB data that has 60 million records with 16 fields. We compare three runs: Mapreduce, Pig with schema (no types) and Pig without schema. For Mapreduce, an empty map function is specified. For Pig, in order to isolate the loading time, we apply a filter to throw out all data after loading, and also disable the PushUpFilter optimization so Foreach will be processed after data loading. Also note there is no type in the schema so there is no type casting here. We can see Pig without schema is much faster than Pig with schema, due to the saving of a Foreach for checking #fields. (We can also see the overhead incurred by Pig than pure Mapreduce in this case.)

            People

            • Assignee:
              Unassigned
              Reporter:
              Jie Li
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:

                Development