Pig
  1. Pig
  2. PIG-2167 CUBE operation in Pig
  3. PIG-2831

MR-Cube implementation (Distributed cubing for holistic measures)

    Details

    • Type: Sub-task Sub-task
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: None
    • Labels:
      None

      Description

      Implementing distributed cube materialization on holistic measure based on MR-Cube approach as described in http://arnab.org/files/mrcube.pdf.
      Primary steps involved:
      1) Identify if the measure is holistic or not
      2) Determine algebraic attribute (can be detected automatically for few cases, if automatic detection fails user should hint the algebraic attribute)
      3) Modify MRPlan to insert a sampling job which executes naive cube algorithm and generates annotated cube lattice (contains large group partitioning information)
      4) Modify plan to distribute annotated cube lattice to all mappers using distributed cache
      5) Execute actual cube materialization on full dataset
      6) Modify MRPlan to insert a post process job for combining the results of actual cube materialization job
      7) OOM exception handling

      1. PIG-2831.9.git.patch
        287 kB
        Prasanth J
      2. PIG-2831.8.git.patch
        287 kB
        Prasanth J
      3. PIG-2831.7.git.patch
        282 kB
        Prasanth J
      4. PIG-2831.6.git.patch
        272 kB
        Prasanth J
      5. PIG-2831.5.git.patch
        272 kB
        Prasanth J
      6. PIG-2831.4.git.patch
        242 kB
        Prasanth J
      7. PIG-2831.3.git.patch
        136 kB
        Prasanth J
      8. PIG-2831.2.git.patch
        136 kB
        Prasanth J
      9. PIG-2831.1.git.patch
        132 kB
        Prasanth J

        Activity

        Prasanth J created issue -
        Hide
        Prasanth J added a comment -

        Hello everyone

        With reference to the description of this issue, I am working on step 3 which involves creating a sampling job and executing naive cube computation algorithm over the sample dataset. The requirement for this sample job is that I should be able to select sample size proportional to the size of the input data size. This sampling job is required to determine the large group size and perform partitioning of the large groups so that no single reducer gets overloaded with large groups.
        One thing I am stuck with is dynamically choosing the sample size. In the current implementation I am using sample operator to load a fixed sample size (10% data). Since the sample size is not chosen dynamically this fixed sampling will result in over sampling for large datasets. For dynamically choosing the sample size, we need to know the total number of tuples in the input dataset. But finding the total number of tuples is not trivial. One way to find the total number of tuples is to first find the total input size and size of one tuple in memory. The problem with this approach is that since tuple is List<Object> the reported in-memory size of tuple will be much larger than actual size of row in bytes. To verify this I tested with a simple dataset

        Input file size : 319 bytes
        Actual number of rows: 13
        Number of dimensions: 5
        Schema: int, chararray, chararray, chararray, int
        Actual row size: 319/13 ~= 25 bytes
        In-memory tuple size reported: 264 bytes (~10x greater than actual size of row)

        Since, in-memory tuple size is higher we cannot make a good estimation of the total number of rows in the dataset and hence the sample size.

        Other approaches,
        I looked into how PoissonSampleLoader and RandomSampleLoader works. Both takes a different approach for loading sample dataset. PoissonSampleLoader uses the distribution of the skewed key to generate sample rows that best represent the underlying data. This loader inserts a special marker at the last tuple with the number of rows in the dataset. Since, this loader is specifically meant for handling skewed keys, I cannot use this in my case for generating sample dataset.
        For using RandomSampleLoader, we need to specify the number of samples to be loaded beforehand so that the loader stops after loading the specified number of tuples. Since we need to specify the sample size before loading we have no means to dynamically load samples for datasets of varying size.
        Also, for using these 2 loaders we need to copy the entire dataset to a temp file and use any of these loaders to load data from temp file. This consumes an additional map job. I don't know why there is a need for copying entire dataset to a temp file and then reading back again. I believe the reason (from what I can understand from the source) for copying the dataset to temp file and reading from it is that the loader classes can only read using InterStorage format.

        I have listed below few pros and cons of different approaches
        1) Using sample operator
        Pros:
        1 less map job compared to other loaders

        Cons:
        Reads entire dataset for generating sample dataset because sample operator is implemented as filter + RANDOM udf + less than expression(sample size) after projecting the input columns.
        May result in oversampling for larger dataset

        2) RandomSampleLoader
        Pros:
        Fixed sample size (the paper provided in the description mentions that 2M sample size is good enough to represent 20B tuples, 100K is good enough for 2B tuples. plz refer page-6 in the paper.)
        Stops reading after sample size is reached (useful for large dataset) - NOT sure about this!! Please correct me if I am wrong.

        Cons:
        1 additional map job required ( including post processing there will be 4 MR jobs with 2 map only jobs )
        Since fixed sample size is used this method is not scalable

        3) PoissonSampleLoader
        Pros:
        Dynamically determines sample size
        Can determine number of rows in dataset using special tuple

        Cons:
        1 additional map job required ( including post processing there will be 4 MR jobs with 2 map only jobs )
        Not suitable for my usecase since the sample size generated is not proportional to input size

        I think what I need is a hybrid loader (combination of concepts from random + poisson) which dynamically loads sample tuples based on the input dataset size.

        Any thoughts about how I can generate sample size proportional to input data size? Or is there any way I can find the number of rows available in a dataset? Am I missing any other ideas for finding/estimating the number of rows in the dataset?

        Show
        Prasanth J added a comment - Hello everyone With reference to the description of this issue, I am working on step 3 which involves creating a sampling job and executing naive cube computation algorithm over the sample dataset. The requirement for this sample job is that I should be able to select sample size proportional to the size of the input data size. This sampling job is required to determine the large group size and perform partitioning of the large groups so that no single reducer gets overloaded with large groups. One thing I am stuck with is dynamically choosing the sample size. In the current implementation I am using sample operator to load a fixed sample size (10% data). Since the sample size is not chosen dynamically this fixed sampling will result in over sampling for large datasets. For dynamically choosing the sample size, we need to know the total number of tuples in the input dataset. But finding the total number of tuples is not trivial. One way to find the total number of tuples is to first find the total input size and size of one tuple in memory. The problem with this approach is that since tuple is List<Object> the reported in-memory size of tuple will be much larger than actual size of row in bytes. To verify this I tested with a simple dataset Input file size : 319 bytes Actual number of rows: 13 Number of dimensions: 5 Schema: int, chararray, chararray, chararray, int Actual row size: 319/13 ~= 25 bytes In-memory tuple size reported: 264 bytes (~10x greater than actual size of row) Since, in-memory tuple size is higher we cannot make a good estimation of the total number of rows in the dataset and hence the sample size. Other approaches, I looked into how PoissonSampleLoader and RandomSampleLoader works. Both takes a different approach for loading sample dataset. PoissonSampleLoader uses the distribution of the skewed key to generate sample rows that best represent the underlying data. This loader inserts a special marker at the last tuple with the number of rows in the dataset. Since, this loader is specifically meant for handling skewed keys, I cannot use this in my case for generating sample dataset. For using RandomSampleLoader, we need to specify the number of samples to be loaded beforehand so that the loader stops after loading the specified number of tuples. Since we need to specify the sample size before loading we have no means to dynamically load samples for datasets of varying size. Also, for using these 2 loaders we need to copy the entire dataset to a temp file and use any of these loaders to load data from temp file. This consumes an additional map job. I don't know why there is a need for copying entire dataset to a temp file and then reading back again. I believe the reason (from what I can understand from the source) for copying the dataset to temp file and reading from it is that the loader classes can only read using InterStorage format. I have listed below few pros and cons of different approaches 1) Using sample operator Pros: 1 less map job compared to other loaders Cons: Reads entire dataset for generating sample dataset because sample operator is implemented as filter + RANDOM udf + less than expression(sample size) after projecting the input columns. May result in oversampling for larger dataset 2) RandomSampleLoader Pros: Fixed sample size (the paper provided in the description mentions that 2M sample size is good enough to represent 20B tuples, 100K is good enough for 2B tuples. plz refer page-6 in the paper.) Stops reading after sample size is reached (useful for large dataset) - NOT sure about this!! Please correct me if I am wrong. Cons: 1 additional map job required ( including post processing there will be 4 MR jobs with 2 map only jobs ) Since fixed sample size is used this method is not scalable 3) PoissonSampleLoader Pros: Dynamically determines sample size Can determine number of rows in dataset using special tuple Cons: 1 additional map job required ( including post processing there will be 4 MR jobs with 2 map only jobs ) Not suitable for my usecase since the sample size generated is not proportional to input size I think what I need is a hybrid loader (combination of concepts from random + poisson) which dynamically loads sample tuples based on the input dataset size. Any thoughts about how I can generate sample size proportional to input data size? Or is there any way I can find the number of rows available in a dataset? Am I missing any other ideas for finding/estimating the number of rows in the dataset?
        Hide
        Dmitriy V. Ryaboy added a comment -

        Could you use the InputFormat / RecordReader to read in a few records, determine how much of the underlying input stream you've read, and estimate size of records on disk based on that?

        Show
        Dmitriy V. Ryaboy added a comment - Could you use the InputFormat / RecordReader to read in a few records, determine how much of the underlying input stream you've read, and estimate size of records on disk based on that?
        Dmitriy V. Ryaboy made changes -
        Field Original Value New Value
        Assignee Prasanth J [ prasanth_j ]
        Hide
        Prasanth J added a comment -

        Ya. I could do that. Looks like this will need a separate map job for reading few tuples? I think this will require tweaking the loader to emit a special tuple with the estimated number of records.

        I will try that once the end to end base implementation is up. For now the way I am counting the sample size is by using RandomSampleLoader. I am sampling 1000 tuples per mapper and using that samples for naive computation and determining the partition size. But RandomSampleLoader always returns more samples than expected. Not sure if its a bug!!. Once the complete implementation is done we can look into more accurate estimate of number of tuples etc. Will soon submit an intermediate patch for review.

        Also given the in-memory size of a tuple, how can we estimate the number of tuples that a reducer can handle without spilling to disk?

        Show
        Prasanth J added a comment - Ya. I could do that. Looks like this will need a separate map job for reading few tuples? I think this will require tweaking the loader to emit a special tuple with the estimated number of records. I will try that once the end to end base implementation is up. For now the way I am counting the sample size is by using RandomSampleLoader. I am sampling 1000 tuples per mapper and using that samples for naive computation and determining the partition size. But RandomSampleLoader always returns more samples than expected. Not sure if its a bug!!. Once the complete implementation is done we can look into more accurate estimate of number of tuples etc. Will soon submit an intermediate patch for review. Also given the in-memory size of a tuple, how can we estimate the number of tuples that a reducer can handle without spilling to disk?
        Hide
        Prasanth J added a comment -

        This is an initial patch of MR-Cube implementation for holistic measures. As of now this supports only COUNT + DISTINCT holistic measure.
        In overall, the way this patch works is when a CUBE operator is encountered in the logical plan the plan is modified to insert LOCube operator. This operator is attached with information about the clause of operation (CUBE/ROLLUP) and the corresponding dimensions. When the logical plan is compiled to physical plan, the determination of holistic measure happens. If holistic measure is detected then the information about the holistic measure and algebraic attribute is stored in POCube operator. Main changes happens when physical plan is compiled to MRPlan in MRCompiler. Following are the changes to the MRPlan
        1) POCube visitor determines the sample size based on the estimated number of rows. A sample operator is inserted into the plan to get a sample dataset, upon which naive cubing is performed. The output of this MRJob is an annotated cube lattice with partition factors for each region.
        2) Second MRJob receives the annotated lattice through distributed cache and will perform the actual full cube materialization. UDFs for partitioning large groups and performing post processing of output tuples are attached to this plan.
        3) Final MRJob is post aggregation job in which the output (measure values) of groups that are spread across multiple reducers are aggregated together to get a final result.

        There are some issues/improvements that I have mentioned in the patch as FIXME and TODOs that require suggestions.
        One issue which I would like to point out here is about finding the actual tuple size (NOT in-memory size).
        For finding the actual tuple size, I wrote a ReadSingleLoader which reads one tuple from the dataset and gets the number of bytes read from PigStorage. Using the number of bytes read and overall input data size I am finding the approximate number of rows in the dataset. One issue with this approach is that if the schema contains a variable datatype like chararray or bytearray then the estimation will have high error rate. To get a more accurate estimate of number of rows for variable datatypes we need to know the underlying distribution of the column and load n number of tuples based on the distribution. It will be helpful if someone can share thoughts about a better way of finding/estimating the number of rows.

        Review request for this patch: https://reviews.apache.org/r/6651/

        Show
        Prasanth J added a comment - This is an initial patch of MR-Cube implementation for holistic measures. As of now this supports only COUNT + DISTINCT holistic measure. In overall, the way this patch works is when a CUBE operator is encountered in the logical plan the plan is modified to insert LOCube operator. This operator is attached with information about the clause of operation (CUBE/ROLLUP) and the corresponding dimensions. When the logical plan is compiled to physical plan, the determination of holistic measure happens. If holistic measure is detected then the information about the holistic measure and algebraic attribute is stored in POCube operator. Main changes happens when physical plan is compiled to MRPlan in MRCompiler. Following are the changes to the MRPlan 1) POCube visitor determines the sample size based on the estimated number of rows. A sample operator is inserted into the plan to get a sample dataset, upon which naive cubing is performed. The output of this MRJob is an annotated cube lattice with partition factors for each region. 2) Second MRJob receives the annotated lattice through distributed cache and will perform the actual full cube materialization. UDFs for partitioning large groups and performing post processing of output tuples are attached to this plan. 3) Final MRJob is post aggregation job in which the output (measure values) of groups that are spread across multiple reducers are aggregated together to get a final result. There are some issues/improvements that I have mentioned in the patch as FIXME and TODOs that require suggestions. One issue which I would like to point out here is about finding the actual tuple size (NOT in-memory size). For finding the actual tuple size, I wrote a ReadSingleLoader which reads one tuple from the dataset and gets the number of bytes read from PigStorage. Using the number of bytes read and overall input data size I am finding the approximate number of rows in the dataset. One issue with this approach is that if the schema contains a variable datatype like chararray or bytearray then the estimation will have high error rate. To get a more accurate estimate of number of rows for variable datatypes we need to know the underlying distribution of the column and load n number of tuples based on the distribution. It will be helpful if someone can share thoughts about a better way of finding/estimating the number of rows. Review request for this patch: https://reviews.apache.org/r/6651/
        Prasanth J made changes -
        Attachment PIG-2831.1.git.patch [ 12541184 ]
        Hide
        Prasanth J added a comment -

        Attaching a new patch. Added a small critical fix and a unit test case. Will update the same in RB as well.

        Show
        Prasanth J added a comment - Attaching a new patch. Added a small critical fix and a unit test case. Will update the same in RB as well.
        Prasanth J made changes -
        Attachment PIG-2831.2.git.patch [ 12541200 ]
        Hide
        Prasanth J added a comment -

        Updated the patch with number of reducers for sample job set to 1. Since the size of sample dataset is small it can be easily handled by single reducer.

        Show
        Prasanth J added a comment - Updated the patch with number of reducers for sample job set to 1. Since the size of sample dataset is small it can be easily handled by single reducer.
        Prasanth J made changes -
        Attachment PIG-2831.3.git.patch [ 12541288 ]
        Hide
        Dmitriy V. Ryaboy added a comment -

        Prasanth, I'll go through with more details, but the biggest issue I see is the one you pointed out – code for figuring out tuple size. Not only is the method of reading a single tuple unreliable, the method is not generally applicable, and we really don't want to tie this whole thing to PigStorage.

        The reason you are getting raw tuple size is to estimate the total number of tuples. One way to achieve this is to check if the loader implements LoadMetadata, and try to get the number of tuples from provided stats if it does. That should be the primary method of determining the tuple size, as it will allow individual storage implementations to supply their own method for this approximation, and give us all the benefits of HCatalog work when that comes around.

        In the meantime, we still have a problem with PigStorage.. how do SkewedJoin and Order currently get around this problem? My understanding is that they force the preceding data to be written to disk, then run a sampler job, and use memory estimates to determine how many reducers sampled keys need to go to. Can we not use the same approach here?

        Show
        Dmitriy V. Ryaboy added a comment - Prasanth, I'll go through with more details, but the biggest issue I see is the one you pointed out – code for figuring out tuple size. Not only is the method of reading a single tuple unreliable, the method is not generally applicable, and we really don't want to tie this whole thing to PigStorage. The reason you are getting raw tuple size is to estimate the total number of tuples. One way to achieve this is to check if the loader implements LoadMetadata, and try to get the number of tuples from provided stats if it does. That should be the primary method of determining the tuple size, as it will allow individual storage implementations to supply their own method for this approximation, and give us all the benefits of HCatalog work when that comes around. In the meantime, we still have a problem with PigStorage.. how do SkewedJoin and Order currently get around this problem? My understanding is that they force the preceding data to be written to disk, then run a sampler job, and use memory estimates to determine how many reducers sampled keys need to go to. Can we not use the same approach here?
        Hide
        Prasanth J added a comment -

        Yes. It's true that skewed join and order by forces the data to be written to disk in a map-only job and then use PoissonSampleLoader/RandomSampleLoader resp. PoissonSampleLoader loads n tuples from the dataset based on the join key distribution and appends a special tuple at the end with the number of tuples loaded info. Whereas, RandomSampleLoader just uses 100 tuples to be loaded from each mapper. PoissonSampleLoader is definitely not applicable for our case. RandomSampleLoader can be used but we need to specify how many samples to load per mapper based on the overall datasize. I think this method will also be not reliable because it may lead to oversampling or undersampling. Also we need to know the number of mappers before specifying the #samples per mapper. One more disadvantage with this approach is the cost of one map-only job. This will be very expensive if the datasize is too big. I also noted that after the dataset is forcefully copied to disk the overall size gets increased because of InterStorage format.

        Performance wise I found the current approach of using SAMPLE operator to be much faster. The entire sample extraction happens within few mins (1 min 23s for ~100K samples from 100M tuples). Also this doesn't cost addition map job and saves space.

        I like the idea of using LoadMetadata approach but until we have HCatalog work integrated we may not be able to use it.

        Show
        Prasanth J added a comment - Yes. It's true that skewed join and order by forces the data to be written to disk in a map-only job and then use PoissonSampleLoader/RandomSampleLoader resp. PoissonSampleLoader loads n tuples from the dataset based on the join key distribution and appends a special tuple at the end with the number of tuples loaded info. Whereas, RandomSampleLoader just uses 100 tuples to be loaded from each mapper. PoissonSampleLoader is definitely not applicable for our case. RandomSampleLoader can be used but we need to specify how many samples to load per mapper based on the overall datasize. I think this method will also be not reliable because it may lead to oversampling or undersampling. Also we need to know the number of mappers before specifying the #samples per mapper. One more disadvantage with this approach is the cost of one map-only job. This will be very expensive if the datasize is too big. I also noted that after the dataset is forcefully copied to disk the overall size gets increased because of InterStorage format. Performance wise I found the current approach of using SAMPLE operator to be much faster. The entire sample extraction happens within few mins (1 min 23s for ~100K samples from 100M tuples). Also this doesn't cost addition map job and saves space. I like the idea of using LoadMetadata approach but until we have HCatalog work integrated we may not be able to use it.
        Hide
        Dmitriy V. Ryaboy added a comment -

        I added another suggestion about a potential sampling approach that might work in the RB.

        Show
        Dmitriy V. Ryaboy added a comment - I added another suggestion about a potential sampling approach that might work in the RB.
        Hide
        Prasanth J added a comment -

        Code review-1 changes added. Submitting the updated patch. Update in RB as well. Still working on the custom loader and so not included as a part of this patch.

        Show
        Prasanth J added a comment - Code review-1 changes added. Submitting the updated patch. Update in RB as well. Still working on the custom loader and so not included as a part of this patch.
        Prasanth J made changes -
        Attachment PIG-2831.4.git.patch [ 12541738 ]
        Hide
        Prasanth J added a comment -

        Hi Dmitriy,

        I have implemented the new inter storage with statistics gathering and new sample loader as per your idea on RB. Attached is the new patch containing the following changes
        1) Added new RichInterStorage which implements StoreMetadata and LoadMetadata interfaces for storing and loading the statistics of intermediate data. RichInterStorage uses RichRecordReader, RichInputFormat for reading intermediate data and RichRecordWriter, RichOutputFormat for storing intermediate data. RichRecordWriter and RichOutputFormat are the same as InterRecordWriter and InterOutputFormat. The main difference is with the RichRecordReader and RichInputFormat. The RichInputFormat wraps all the splits to one logical split so that only one mapper is used for loading sample dataset.
        2) CubeSampleLoader uses underlying RichRecordReader for getting random samples of data. RichRecordReader opens utmost 100 inner splits and chooses a random split while reading the tuple.
        3) Changes to PigOutputCommitter for storing statistics. Statistics are stored at the end of every commitTask(). Statistics are stored for each output partition. RichInterStorage takes care of loading all the statistics corresponding to different partitions and aggregating them together. Statistics stores the numberOfRows and avgInMemTupleSize for each partitions (only these two values are required for holistic cubing).

        This patch is quite bigger mainly because most of the changes (at the logical layer) are due to an old formatting issue which I fixed in this patch. Sorry about that.

        I have also updated the patch in RB. Please review it and let me know your feedback. Also I have kept some of the issues opened in your earlier review comments which require some of your thoughts.

        Show
        Prasanth J added a comment - Hi Dmitriy, I have implemented the new inter storage with statistics gathering and new sample loader as per your idea on RB. Attached is the new patch containing the following changes 1) Added new RichInterStorage which implements StoreMetadata and LoadMetadata interfaces for storing and loading the statistics of intermediate data. RichInterStorage uses RichRecordReader, RichInputFormat for reading intermediate data and RichRecordWriter, RichOutputFormat for storing intermediate data. RichRecordWriter and RichOutputFormat are the same as InterRecordWriter and InterOutputFormat. The main difference is with the RichRecordReader and RichInputFormat. The RichInputFormat wraps all the splits to one logical split so that only one mapper is used for loading sample dataset. 2) CubeSampleLoader uses underlying RichRecordReader for getting random samples of data. RichRecordReader opens utmost 100 inner splits and chooses a random split while reading the tuple. 3) Changes to PigOutputCommitter for storing statistics. Statistics are stored at the end of every commitTask(). Statistics are stored for each output partition. RichInterStorage takes care of loading all the statistics corresponding to different partitions and aggregating them together. Statistics stores the numberOfRows and avgInMemTupleSize for each partitions (only these two values are required for holistic cubing). This patch is quite bigger mainly because most of the changes (at the logical layer) are due to an old formatting issue which I fixed in this patch. Sorry about that. I have also updated the patch in RB. Please review it and let me know your feedback. Also I have kept some of the issues opened in your earlier review comments which require some of your thoughts.
        Prasanth J made changes -
        Attachment PIG-2831.5.git.patch [ 12543590 ]
        Hide
        Prasanth J added a comment -

        One more thing I forgot to mention. Since we have implemented statistics gathering which is a runtime operation, we cannot fallback to naive cubing if we detect a small dataset. Earlier we estimated the total number of rows during the compile time and based on our estimation we chose whether to use mrcube approach or naive approach. We need to provide a way for the user to disable mrcube approach for smaller dataset as naive cubing on small dataset is much faster than mrcube. mrcube takes 4 MRJobs whereas naive cubing can be done in a single job. Should we provide pig property for enabling/disabling mrcube approach?

        Show
        Prasanth J added a comment - One more thing I forgot to mention. Since we have implemented statistics gathering which is a runtime operation, we cannot fallback to naive cubing if we detect a small dataset. Earlier we estimated the total number of rows during the compile time and based on our estimation we chose whether to use mrcube approach or naive approach. We need to provide a way for the user to disable mrcube approach for smaller dataset as naive cubing on small dataset is much faster than mrcube. mrcube takes 4 MRJobs whereas naive cubing can be done in a single job. Should we provide pig property for enabling/disabling mrcube approach?
        Hide
        Prasanth J added a comment -

        Attaching a patch that fixed GC errors in PostProcessCube UDF. Also fixed the parallelism of sampling job. Updated RB as well.

        Show
        Prasanth J added a comment - Attaching a patch that fixed GC errors in PostProcessCube UDF. Also fixed the parallelism of sampling job. Updated RB as well.
        Prasanth J made changes -
        Attachment PIG-2831.6.git.patch [ 12543983 ]
        Hide
        Prasanth J added a comment -

        Attaching a patch that adds a new NestedProxyBag which proxies all calls to the inner bag except while iterating it applies a specified filter function. This bag is used in PostProcessCube UDF to minimize a copy of databag. Updated RB.

        Show
        Prasanth J added a comment - Attaching a patch that adds a new NestedProxyBag which proxies all calls to the inner bag except while iterating it applies a specified filter function. This bag is used in PostProcessCube UDF to minimize a copy of databag. Updated RB.
        Prasanth J made changes -
        Attachment PIG-2831.7.git.patch [ 12544397 ]
        Hide
        Dmitriy V. Ryaboy added a comment -

        Excellent, Prasanth! Any idea how much RAM that saves us?
        Question: why did you choose to create a TupleFieldFilter function instead of using the more generic Guava Functions ? Because of exception handling? You could just rethrow using RuntimeExceptions. Also, not sure why TupleFieldFilter is so specific (one field only). It could be just about anything that takes a Tuple and returns a Tuple, right?

        Show
        Dmitriy V. Ryaboy added a comment - Excellent, Prasanth! Any idea how much RAM that saves us? Question: why did you choose to create a TupleFieldFilter function instead of using the more generic Guava Functions ? Because of exception handling? You could just rethrow using RuntimeExceptions. Also, not sure why TupleFieldFilter is so specific (one field only). It could be just about anything that takes a Tuple and returns a Tuple, right?
        Hide
        Prasanth J added a comment -

        Sorry. I should have posted it earlier.
        Following are the memory statistics for the actual full cube job with the new proxy bag vs the older approach running on 10 node cluster using CDH4 (with default settings)

        Older approach:
        Spilled Records - 130,616,769
        Physical memory (bytes) snapshot - 7,162,691,584
        Virtual memory (bytes) snapshot - 27,694,501,888
        Total committed heap usage (bytes) - 4,517,134,336

        Proxy bag approach:
        Spilled Records - 130,616,769
        Physical memory (bytes) snapshot - 6,429,990,912
        Virtual memory (bytes) snapshot - 27,698,757,632
        Total committed heap usage (bytes) - 3,681,222,656 (~19% improvement)

        Tested it for 3 runs and we get approximately 19% improvement in heap usage.

        Damn, I totally forgot about the guava transform functions!! Will update the patch. Thanks Dmitriy for your quick code review

        Show
        Prasanth J added a comment - Sorry. I should have posted it earlier. Following are the memory statistics for the actual full cube job with the new proxy bag vs the older approach running on 10 node cluster using CDH4 (with default settings) Older approach: Spilled Records - 130,616,769 Physical memory (bytes) snapshot - 7,162,691,584 Virtual memory (bytes) snapshot - 27,694,501,888 Total committed heap usage (bytes) - 4,517,134,336 Proxy bag approach: Spilled Records - 130,616,769 Physical memory (bytes) snapshot - 6,429,990,912 Virtual memory (bytes) snapshot - 27,698,757,632 Total committed heap usage (bytes) - 3,681,222,656 (~19% improvement) Tested it for 3 runs and we get approximately 19% improvement in heap usage. Damn, I totally forgot about the guava transform functions!! Will update the patch. Thanks Dmitriy for your quick code review
        Hide
        Prasanth J added a comment -

        Also I will update the filter function to accept a mask value instead of filed number so that we can filter out the fields based on the specified mask.

        Show
        Prasanth J added a comment - Also I will update the filter function to accept a mask value instead of filed number so that we can filter out the fields based on the specified mask.
        Hide
        Prasanth J added a comment -

        Updated the patch with the following changes
        1) TupleFieldFilter function now implementes guava generic Function. Also this function now accepts a mask, based on which the tuple fields will be filtered when getNext() is called.
        2) Fixes issues with Filter followed Cube incase of mrcube.
        3) The basic version of mrcube supports only filtering operation ahead of Cube operator. If any other operator (blocking operator as well) is used it will fallback to naive cubing instead of mr-cubing. This can be enhanced further once this patch is stabilized. One more optimization can be explored when using blocking operators ahead of cube operator, the force loading of input data for statistics gathering can be removed if the blocking operator uses RichInterStorage instead of InterStorage.

        Show
        Prasanth J added a comment - Updated the patch with the following changes 1) TupleFieldFilter function now implementes guava generic Function. Also this function now accepts a mask, based on which the tuple fields will be filtered when getNext() is called. 2) Fixes issues with Filter followed Cube incase of mrcube. 3) The basic version of mrcube supports only filtering operation ahead of Cube operator. If any other operator (blocking operator as well) is used it will fallback to naive cubing instead of mr-cubing. This can be enhanced further once this patch is stabilized. One more optimization can be explored when using blocking operators ahead of cube operator, the force loading of input data for statistics gathering can be removed if the blocking operator uses RichInterStorage instead of InterStorage.
        Prasanth J made changes -
        Attachment PIG-2831.8.git.patch [ 12545367 ]
        Hide
        Prasanth J added a comment -

        Updated the patch with following changes
        1) Partition factor algorithm is tweaked to better distributed the reducer workload.
        2) Partition factor in PartitionLargeGroups UDF is initialized to 0 (earlier it was 1), which generates many smaller bags (depends on cardinality of algebraic attribute). Earlier method initialized to 1 which generated few large bags.

        The above changes also reduced the amount of records/bags spilled during full cube materialization job. In a test experiment, with 3M tuples and rollup on 3 dimensions following improvements were observed with the above changes
        PROACTIVE_SPILL_COUNT_RECS improved by ~34% (from 5206793 to 3440694)
        PROACTIVE_SPILL_COUNT_BAGS improved by ~54% (from 22 to 10)

        Show
        Prasanth J added a comment - Updated the patch with following changes 1) Partition factor algorithm is tweaked to better distributed the reducer workload. 2) Partition factor in PartitionLargeGroups UDF is initialized to 0 (earlier it was 1), which generates many smaller bags (depends on cardinality of algebraic attribute). Earlier method initialized to 1 which generated few large bags. The above changes also reduced the amount of records/bags spilled during full cube materialization job. In a test experiment, with 3M tuples and rollup on 3 dimensions following improvements were observed with the above changes PROACTIVE_SPILL_COUNT_RECS improved by ~34% (from 5206793 to 3440694) PROACTIVE_SPILL_COUNT_BAGS improved by ~54% (from 22 to 10)
        Prasanth J made changes -
        Attachment PIG-2831.9.git.patch [ 12548317 ]

          People

          • Assignee:
            Prasanth J
            Reporter:
            Prasanth J
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:

              Development