Pig
  1. Pig
  2. PIG-966 Proposed rework for LoadFunc, StoreFunc, and Slice/r interfaces
  3. PIG-1062

load-store-redesign branch: change SampleLoader and subclasses to work with new LoadFunc interface

    Details

    • Type: Sub-task Sub-task
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7.0
    • Component/s: None
    • Labels:
      None
    • Hadoop Flags:
      Incompatible change, Reviewed

      Description

      This is part of the effort to implement new load store interfaces as laid out in http://wiki.apache.org/pig/LoadStoreRedesignProposal .
      PigStorage and BinStorage are now working.

      SampleLoader and subclasses -RandomSampleLoader, PoissonSampleLoader need to be changed to work with new LoadFunc interface.
      Fixing SampleLoader and RandomSampleLoader will get order-by queries working.
      PoissonSampleLoader is used by skew join.

      1. PIG-1062.patch
        39 kB
        Thejas M Nair
      2. PIG-1062.patch.3
        40 kB
        Thejas M Nair
      3. PIG-1062.5.patch
        42 kB
        Thejas M Nair

        Activity

        Thejas M Nair created issue -
        Hide
        Thejas M Nair added a comment -

        Skew-join uses the total number of input tuples, in PartitionSkewedKeys.calculateReducers(..) to calculate number of reducers.
        In the version in trunk, PoissonSampleLoader adds size on disk of the sampled tuple , as the last column of the tuple. This is used to calculate average size on disk in PartitionSkewedKeys. Total number of tuples are estimated using input-file-size/avg-size-of-tuple-on-disk .

        But with the new interface, the size on disk for a tuple cannot be estimated (there is no getPosition). Also, the size of input file on disk cannot be estimated if the input is not from a file or if the load function is passed some metadata instead of file name.

        Ideally this information should be obtained through ResourceStatistics in the proposal. Since that is not available right now, here is another proposal -

        PoissonSampleLoader currently reads almost all the rows because it tries to sample evenly spaced tuples from the split. It will now read till the last tuple, and add an additional tuple that has the number of tuples in that split. This special tuple needs to be distinguished from others that are sampled tuples. I don't have a good way to do that except for having two columns first column having an unique marker string, and second column has the number of rows. Does anybody have better suggestions ?

        PartitionSkewedKeys will look at all these special rows and add the row-nums to get total number of rows.

        Show
        Thejas M Nair added a comment - Skew-join uses the total number of input tuples, in PartitionSkewedKeys.calculateReducers(..) to calculate number of reducers. In the version in trunk, PoissonSampleLoader adds size on disk of the sampled tuple , as the last column of the tuple. This is used to calculate average size on disk in PartitionSkewedKeys. Total number of tuples are estimated using input-file-size/avg-size-of-tuple-on-disk . But with the new interface, the size on disk for a tuple cannot be estimated (there is no getPosition). Also, the size of input file on disk cannot be estimated if the input is not from a file or if the load function is passed some metadata instead of file name. Ideally this information should be obtained through ResourceStatistics in the proposal. Since that is not available right now, here is another proposal - PoissonSampleLoader currently reads almost all the rows because it tries to sample evenly spaced tuples from the split. It will now read till the last tuple, and add an additional tuple that has the number of tuples in that split. This special tuple needs to be distinguished from others that are sampled tuples. I don't have a good way to do that except for having two columns first column having an unique marker string, and second column has the number of rows. Does anybody have better suggestions ? PartitionSkewedKeys will look at all these special rows and add the row-nums to get total number of rows.
        Thejas M Nair made changes -
        Field Original Value New Value
        Parent PIG-966 [ 12436093 ]
        Issue Type Task [ 3 ] Sub-task [ 7 ]
        Hide
        Ying He added a comment -

        I would suggest to add the total number of tuples of a split into the last sample as a field. All other sample tuples can have this field as NULL. Then in PartitionSkewedKey.calculateReducers, it can add up this field from all the samples to get total number of tuples from input.

        If we use a separate tuple with different format to represent total number of tuples, that would involve a bigger change. The sampling job currently add an "all" to all samples to group them into one bag, and then sort the tuples by keys. If tuples are of different format, the execution plan has to be changed to be more complex to deal with these special tuples.

        Show
        Ying He added a comment - I would suggest to add the total number of tuples of a split into the last sample as a field. All other sample tuples can have this field as NULL. Then in PartitionSkewedKey.calculateReducers, it can add up this field from all the samples to get total number of tuples from input. If we use a separate tuple with different format to represent total number of tuples, that would involve a bigger change. The sampling job currently add an "all" to all samples to group them into one bag, and then sort the tuples by keys. If tuples are of different format, the execution plan has to be changed to be more complex to deal with these special tuples.
        Hide
        Dmitriy V. Ryaboy added a comment -

        I have ResourceStats hooked up to LogicalOperators already, need to port the code to the new branch. This will let us take statistics, if they are available, and pass them into the PoissonSampleLoader at initialization time, so it can get the number of tuples and avg tuple size directly from Stats.

        That being said, statistics may not always be available...

        Before I go into the more fanciful suggestion below – perhaps a simple hack will do. We have counters in Hadoop. Any reason we can't just read "bytes read in map", "records read in map", "bytes written in map", "records written in map" counters directly?

        If I am overlooking something obvious, here's the "ignore counters" suggestion:

        If my understanding is correct, in PoissonSampleLoader we are interested in the average size of a tuple more than # of tuples – # of tuples is just used as a way of crudely estimating avg size of tuple on disk, which is in turn used to crudely estimate the size of tuple in memory. The estimate is likely to be very off, by the way, if we are not loading from BinStorage, but from arbitrary loadFuncs, as the underlying data, even if it is a file, might be compressed.

        Perhaps we can get the average tuple size directly, instead? We could get that in the mappers of the sampling job by recording memory usage at the first getNext() call, forcing garbage collection, buffering up K tuples, and getting memory usage again.

        We now have the following variables available to each sampling mapper in the SkewedPartitioner:

        • sample rate S (for the appropriate Poisson distribution)
        • total # of mappers, M
        • available heap size on the reducer, H
        • estimated avg size of tuple, s

        The number of tuples we want to sample is then simply T = max(10, S*H/(s*M))

        In getNext(), we can now allocate a buffer for T elements, populate it with the first T tuples, and continue scanning the partition. For every ith next() call, we generate a random number r s.t. 0<=r<i, and if r<T we insert the new tuple into our buffer at position r. This gives us a nicely random sample of the tuples in the partition.

        So this gets around the need for file size info on that side.

        Now, PartitionSkewedKey uses the file size / avg_tuple_disk_size to estimate total number of tuples, and uses this estimate, plus the ratio of instances of a given key in the sample to the total sample size to predict the total number of records with a given key in the input. But given the number of sampled tuples, and the sample rate, couldn't we calculate the total number of records in the original file by simply reversing the formula for determining the number of tuples to sample? If we do this, no need to append any metadata.

        Lastly, if we do want to move around metadata such as number of records in input, etc, and we don't want to use Hadoop counters, we should extend BinStorage with ResourceStats serialization, and use ResourceStatistics for this. Even if the original data might not have stats, there is no reason we can't generate these basic counts at runtime for the data we write ourselves.

        -D

        Show
        Dmitriy V. Ryaboy added a comment - I have ResourceStats hooked up to LogicalOperators already, need to port the code to the new branch. This will let us take statistics, if they are available, and pass them into the PoissonSampleLoader at initialization time, so it can get the number of tuples and avg tuple size directly from Stats. That being said, statistics may not always be available... Before I go into the more fanciful suggestion below – perhaps a simple hack will do. We have counters in Hadoop. Any reason we can't just read "bytes read in map", "records read in map", "bytes written in map", "records written in map" counters directly? If I am overlooking something obvious, here's the "ignore counters" suggestion: If my understanding is correct, in PoissonSampleLoader we are interested in the average size of a tuple more than # of tuples – # of tuples is just used as a way of crudely estimating avg size of tuple on disk, which is in turn used to crudely estimate the size of tuple in memory. The estimate is likely to be very off, by the way, if we are not loading from BinStorage, but from arbitrary loadFuncs, as the underlying data, even if it is a file, might be compressed. Perhaps we can get the average tuple size directly, instead? We could get that in the mappers of the sampling job by recording memory usage at the first getNext() call, forcing garbage collection, buffering up K tuples, and getting memory usage again. We now have the following variables available to each sampling mapper in the SkewedPartitioner: sample rate S (for the appropriate Poisson distribution) total # of mappers, M available heap size on the reducer, H estimated avg size of tuple, s The number of tuples we want to sample is then simply T = max(10, S*H/(s*M)) In getNext(), we can now allocate a buffer for T elements, populate it with the first T tuples, and continue scanning the partition. For every ith next() call, we generate a random number r s.t. 0<=r<i, and if r<T we insert the new tuple into our buffer at position r. This gives us a nicely random sample of the tuples in the partition. So this gets around the need for file size info on that side. Now, PartitionSkewedKey uses the file size / avg_tuple_disk_size to estimate total number of tuples, and uses this estimate, plus the ratio of instances of a given key in the sample to the total sample size to predict the total number of records with a given key in the input. But given the number of sampled tuples, and the sample rate, couldn't we calculate the total number of records in the original file by simply reversing the formula for determining the number of tuples to sample? If we do this, no need to append any metadata. Lastly, if we do want to move around metadata such as number of records in input, etc, and we don't want to use Hadoop counters, we should extend BinStorage with ResourceStats serialization, and use ResourceStatistics for this. Even if the original data might not have stats, there is no reason we can't generate these basic counts at runtime for the data we write ourselves. -D
        Thejas M Nair made changes -
        Assignee Thejas M Nair [ thejas ]
        Hide
        Thejas M Nair added a comment -

        Dmitriy,
        I had overlooked the fact that input size of the file is being used also to calculate the number of samples. Thanks for pointing it out.

        I don't know if there are any problems in using counters directly, as long as information is required only after (first mapreduce) sampling phase, ie it could be used in PartitionSkewedKey().

        The logic in PoissonSampleLoader.computeSamples is ( a detailed explanation will be added soon to the sampler wiki page). - The goal is to sample all keys from the first input that are will need to be partitioned across multiple reducers in the join phase.
        Let us assume X tuples fit into available memory in reducer. Lets say we want to sample 10 samples in each set of X tuples, with 95% confidence. Using poisson distribution formulas, we arrive at the number 17 as number of tuples to be sampled every X tuples. ( I don't know why poisson distrubution is the appropriate choice )

        The total number of tuples to be sampled cannot be calculated without knowing total number of tuples. But what we know is that we should sample one tuple every (X/17) tuples. To calculate X, we need the average size of tuple in memory. Using the process memory usage is unlikely to give good approximation of that, because (as per my understanding) calling the garbage collector is not guaranteed to free memory used by all unused objects. Tuple.getMemorySize() can be used to get an estimate of the memory used by the tuple. The average size could be estimated/corrected as we sample more tuples.
        ie, PoissonSampleLoader.getNext() will return every H/s tuple in the input. (using H, s in previous comment)

        In PartitionSkewedKey.exec(), Dmitriy's idea of using number of samples, and sample rate (H/s) can be used to estimate total tuples.

        WeightedRangePartitioner.setConf is another function using fileSize(). That needs to change as well. I haven't looked at that yet.

        Show
        Thejas M Nair added a comment - Dmitriy, I had overlooked the fact that input size of the file is being used also to calculate the number of samples. Thanks for pointing it out. I don't know if there are any problems in using counters directly, as long as information is required only after (first mapreduce) sampling phase, ie it could be used in PartitionSkewedKey(). The logic in PoissonSampleLoader.computeSamples is ( a detailed explanation will be added soon to the sampler wiki page). - The goal is to sample all keys from the first input that are will need to be partitioned across multiple reducers in the join phase. Let us assume X tuples fit into available memory in reducer. Lets say we want to sample 10 samples in each set of X tuples, with 95% confidence. Using poisson distribution formulas, we arrive at the number 17 as number of tuples to be sampled every X tuples. ( I don't know why poisson distrubution is the appropriate choice ) The total number of tuples to be sampled cannot be calculated without knowing total number of tuples. But what we know is that we should sample one tuple every (X/17) tuples. To calculate X, we need the average size of tuple in memory. Using the process memory usage is unlikely to give good approximation of that, because (as per my understanding) calling the garbage collector is not guaranteed to free memory used by all unused objects. Tuple.getMemorySize() can be used to get an estimate of the memory used by the tuple. The average size could be estimated/corrected as we sample more tuples. ie, PoissonSampleLoader.getNext() will return every H/s tuple in the input. (using H, s in previous comment) In PartitionSkewedKey.exec(), Dmitriy's idea of using number of samples, and sample rate (H/s) can be used to estimate total tuples. WeightedRangePartitioner.setConf is another function using fileSize(). That needs to change as well. I haven't looked at that yet.
        Hide
        Thejas M Nair added a comment -

        WeightedRangePartitioner.setConf use of fileSize() is alright, it is checking size of intermediate file.

        Show
        Thejas M Nair added a comment - WeightedRangePartitioner.setConf use of fileSize() is alright, it is checking size of intermediate file.
        Hide
        Thejas M Nair added a comment -

        Even after the interface changes, pig can compute the file size by adding up size of each split (from InputSplit.getLenght()) . The documentation of the function in the interface does not make it clear if this is size on disk , compressed/uncompressed etc. Assuming it is size on disk (uncompressed), estimating the total memory it will require is a challenge, one has to make assumption about the compression ratio and the serialization method.
        Using Tuple.getMemorySize() while sampling will give more accurate numbers for reducer memory that it will consume.

        Show
        Thejas M Nair added a comment - Even after the interface changes, pig can compute the file size by adding up size of each split (from InputSplit.getLenght()) . The documentation of the function in the interface does not make it clear if this is size on disk , compressed/uncompressed etc. Assuming it is size on disk (uncompressed), estimating the total memory it will require is a challenge, one has to make assumption about the compression ratio and the serialization method. Using Tuple.getMemorySize() while sampling will give more accurate numbers for reducer memory that it will consume.
        Hide
        Thejas M Nair added a comment -

        As indicated in previous comment, I am planning to go ahead with the earlier proposal . The current sample frequency would be one tuple every ( (H/s) * (1/17) ) tuples.

        In PartitionSkewedKey.exec(), the number of reducers for join key k1 can be computed using (no_of_samples(k1) / 17) . But the accuracy of this calculation depends on how accurate the average tuple size computed is (s in (H/s) * (1/17)). Sending a special tuple with number of rows in the split will likely lead to more accurate estimate of number of reducers required.

        Show
        Thejas M Nair added a comment - As indicated in previous comment, I am planning to go ahead with the earlier proposal . The current sample frequency would be one tuple every ( (H/s) * (1/17) ) tuples. In PartitionSkewedKey.exec(), the number of reducers for join key k1 can be computed using (no_of_samples(k1) / 17) . But the accuracy of this calculation depends on how accurate the average tuple size computed is (s in (H/s) * (1/17)). Sending a special tuple with number of rows in the split will likely lead to more accurate estimate of number of reducers required.
        Hide
        Sriranjan Manjunath added a comment -

        We cannot use counters since load/join will result in 2 jobs - the first one being the sampler. Your design looks good otherwise.

        Show
        Sriranjan Manjunath added a comment - We cannot use counters since load/join will result in 2 jobs - the first one being the sampler. Your design looks good otherwise.
        Hide
        Dmitriy V. Ryaboy added a comment -

        The sampler (in this design) reads all the data, so number of records read is total number of records in dataset, and the number of records written is total number of samples. Same for bytes. The sampler produces a histogram file, which is then used by the join task – so there is no reliance on counters there.

        Show
        Dmitriy V. Ryaboy added a comment - The sampler (in this design) reads all the data, so number of records read is total number of records in dataset, and the number of records written is total number of samples. Same for bytes. The sampler produces a histogram file, which is then used by the join task – so there is no reliance on counters there.
        Hide
        Dmitriy V. Ryaboy added a comment -

        Thejas:

        sending a special tuple with number of rows in the split will likely lead to more accurate estimate of number of reducers required.

        You can get the same info from the counters without unnecessarily complicating tuple processing, imo. In fact you can use (num bytes read / num records read) to get the old calculation, and not rely on number of samples and local average size estimates.

        Show
        Dmitriy V. Ryaboy added a comment - Thejas: sending a special tuple with number of rows in the split will likely lead to more accurate estimate of number of reducers required. You can get the same info from the counters without unnecessarily complicating tuple processing, imo. In fact you can use (num bytes read / num records read) to get the old calculation, and not rely on number of samples and local average size estimates.
        Hide
        Thejas M Nair added a comment -

        You can get the same info from the counters without unnecessarily complicating tuple processing, imo. In fact you can use (num bytes read / num records read) to get the old calculation, and not rely on number of samples and local average size estimates.

        Yes, I agree that using counters would have been a cleaner way to get total number of tuples in reducer, but it looks there is no reliable way to get the complete counter information of a map from the reducer part of the MR job.

        Show
        Thejas M Nair added a comment - You can get the same info from the counters without unnecessarily complicating tuple processing, imo. In fact you can use (num bytes read / num records read) to get the old calculation, and not rely on number of samples and local average size estimates. Yes, I agree that using counters would have been a cleaner way to get total number of tuples in reducer, but it looks there is no reliable way to get the complete counter information of a map from the reducer part of the MR job.
        Hide
        Dmitriy V. Ryaboy added a comment -

        It looks like ReduceContext has a getCounter() method. Am I missing a subtlety?
        http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapreduce/ReduceContext.html

        Show
        Dmitriy V. Ryaboy added a comment - It looks like ReduceContext has a getCounter() method. Am I missing a subtlety? http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapreduce/ReduceContext.html
        Hide
        Thejas M Nair added a comment -

        Proposal for sampling in RandomSampleLoader (as well as SampleLoader class)- (used for order-by queries) -
        Problem: With new interface, we cannot use the old approach of dividing the size of file by number of samples required and skipping that many bytes to get new sample.
        Proposal: The approach proposed by Dmitriy for sampling is used -

        In getNext(), we can now allocate a buffer for T elements, populate it with the first T tuples, and continue scanning the partition. For every ith next() call, we generate a random number r s.t. 0<=r<i, and if r<T we insert the new tuple into our buffer at position r. This gives us a nicely random sample of the tuples in the partition.

        To avoid parsing all tuples RecordReader.nextKeyValue() will be called (instead of loader.getNext()) if the current tuple is to be skipped.

        It looks like ReduceContext has a getCounter() method. Am I missing a subtlety?

        Arun C Murthy (mapreduce comitter) has agreed to elaborate on his recommendation on this in the jira.

        Show
        Thejas M Nair added a comment - Proposal for sampling in RandomSampleLoader (as well as SampleLoader class)- (used for order-by queries) - Problem: With new interface, we cannot use the old approach of dividing the size of file by number of samples required and skipping that many bytes to get new sample. Proposal: The approach proposed by Dmitriy for sampling is used - In getNext(), we can now allocate a buffer for T elements, populate it with the first T tuples, and continue scanning the partition. For every ith next() call, we generate a random number r s.t. 0<=r<i, and if r<T we insert the new tuple into our buffer at position r. This gives us a nicely random sample of the tuples in the partition. To avoid parsing all tuples RecordReader.nextKeyValue() will be called (instead of loader.getNext()) if the current tuple is to be skipped. It looks like ReduceContext has a getCounter() method. Am I missing a subtlety? Arun C Murthy (mapreduce comitter) has agreed to elaborate on his recommendation on this in the jira.
        Hide
        Thejas M Nair added a comment -

        Patch for order-by sampling and skew-join sampling changes.

        Show
        Thejas M Nair added a comment - Patch for order-by sampling and skew-join sampling changes.
        Thejas M Nair made changes -
        Attachment PIG-1062.patch [ 12424797 ]
        Hide
        Thejas M Nair added a comment -

        For load-store-redesign branch, hudson might not be able to apply patch to trunk.

        Show
        Thejas M Nair added a comment - For load-store-redesign branch, hudson might not be able to apply patch to trunk.
        Thejas M Nair made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12424797/PIG-1062.patch
        against trunk revision 835499.

        +1 @author. The patch does not contain any @author tags.

        -1 tests included. The patch doesn't appear to include any new or modified tests.
        Please justify why no tests are needed for this patch.

        -1 patch. The patch command could not apply the patch.

        Console output: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h8.grid.sp2.yahoo.net/48/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12424797/PIG-1062.patch against trunk revision 835499. +1 @author. The patch does not contain any @author tags. -1 tests included. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. -1 patch. The patch command could not apply the patch. Console output: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h8.grid.sp2.yahoo.net/48/console This message is automatically generated.
        Thejas M Nair made changes -
        Status Patch Available [ 10002 ] Open [ 1 ]
        Hide
        Thejas M Nair added a comment -

        New patch after merge with latest changes to load-store-redesign branch. Incompatible with trunk .
        Pasting output of test-patch (test cases have not been updated)

        [exec] -1 overall.
        [exec]
        [exec] +1 @author. The patch does not contain any @author tags.
        [exec]
        [exec] -1 tests included. The patch doesn't appear to include any new or modified tests.
        [exec] Please justify why no tests are needed for this patch.
        [exec]
        [exec] +1 javadoc. The javadoc tool did not generate any warning messages.
        [exec]
        [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
        [exec]
        [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
        [exec]
        [exec] +1 release audit. The applied patch does not increase the total number of release audit warnings.

        Show
        Thejas M Nair added a comment - New patch after merge with latest changes to load-store-redesign branch. Incompatible with trunk . Pasting output of test-patch (test cases have not been updated) [exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] -1 tests included. The patch doesn't appear to include any new or modified tests. [exec] Please justify why no tests are needed for this patch. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] +1 release audit. The applied patch does not increase the total number of release audit warnings.
        Thejas M Nair made changes -
        Attachment PIG-1062.patch.3 [ 12424927 ]
        Thejas M Nair made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Hadoop QA added a comment -

        -1 overall. Here are the results of testing the latest attachment
        http://issues.apache.org/jira/secure/attachment/12424927/PIG-1062.patch.3
        against trunk revision 835499.

        +1 @author. The patch does not contain any @author tags.

        -1 tests included. The patch doesn't appear to include any new or modified tests.
        Please justify why no tests are needed for this patch.

        -1 patch. The patch command could not apply the patch.

        Console output: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/156/console

        This message is automatically generated.

        Show
        Hadoop QA added a comment - -1 overall. Here are the results of testing the latest attachment http://issues.apache.org/jira/secure/attachment/12424927/PIG-1062.patch.3 against trunk revision 835499. +1 @author. The patch does not contain any @author tags. -1 tests included. The patch doesn't appear to include any new or modified tests. Please justify why no tests are needed for this patch. -1 patch. The patch command could not apply the patch. Console output: http://hudson.zones.apache.org/hudson/job/Pig-Patch-h7.grid.sp2.yahoo.net/156/console This message is automatically generated.
        Hide
        Pradeep Kamath added a comment -

        Review comments:
        In SampleLoader.java
        ====================
        Isn't the idea of SampleLoader only to carry common code for RandomSampleLoader and PoissonLoader
        and add a computeSamples() method? - Looks like now it has the getNext() implementation
        needed by RandomSampleLoader in it now. Should we move that to RandomSampleLoader instead?

        134             System.err.println("Sample " + samples[nextSampleIdx]);
        

        Debug statement above should be removed.

        Why is skipNext() needed? Can't loader.getNext() == null be used instead? If so, is recordReader
        needed?

        In RandomSampleLoader.java
        ==========================
        XXX FIXME comment (put in by me )should be removed

        I think we should move the actual getNext() implementation code from SampleLoader to here

        In PoissonSampleLoader.java
        ============================

         40         // this will be value of first column in the special row   
        

        I think this is no longer the case - should be removed.

            58     // memory per sample. divide this by avgTupleMemSize to get skipInterval 
             59     private long memPerSample=0;
             60 
        

        Should the above be called memToSkipPerSample?

         104         if(skipInterval == -1){
        

        It doesn't look like skipInterval is initialized to -1

        Instead of keeping track of max. num of columns in the different rows and then appending the
        special marker string and num of rows at the end, would it be better to just have these as the
        first two fields of the last tuple emitted and then introduce a split-union combination to
        ensure that the foreach pipeline gets the regular tuples (excluding the special tuple)?

        Show
        Pradeep Kamath added a comment - Review comments: In SampleLoader.java ==================== Isn't the idea of SampleLoader only to carry common code for RandomSampleLoader and PoissonLoader and add a computeSamples() method? - Looks like now it has the getNext() implementation needed by RandomSampleLoader in it now. Should we move that to RandomSampleLoader instead? 134 System .err.println( "Sample " + samples[nextSampleIdx]); Debug statement above should be removed. Why is skipNext() needed? Can't loader.getNext() == null be used instead? If so, is recordReader needed? In RandomSampleLoader.java ========================== XXX FIXME comment (put in by me )should be removed I think we should move the actual getNext() implementation code from SampleLoader to here In PoissonSampleLoader.java ============================ 40 // this will be value of first column in the special row I think this is no longer the case - should be removed. 58 // memory per sample. divide this by avgTupleMemSize to get skipInterval 59 private long memPerSample=0; 60 Should the above be called memToSkipPerSample? 104 if (skipInterval == -1){ It doesn't look like skipInterval is initialized to -1 Instead of keeping track of max. num of columns in the different rows and then appending the special marker string and num of rows at the end, would it be better to just have these as the first two fields of the last tuple emitted and then introduce a split-union combination to ensure that the foreach pipeline gets the regular tuples (excluding the special tuple)?
        Hide
        Arun C Murthy added a comment -

        It looks like ReduceContext has a getCounter() method. Am I missing a subtlety?

        The counters you get from a

        {Map|Reduce}

        Context are only specific to the specific task. One would have to jump through a whole set of hoops i.e. create new JobClient or equivalent in the new context object apis), query the JobTracker for rolled up counters and even then they aren't guaranteed to be completely accurate (until job completion), thus I wouldn't recommend that we rely upon them.

        Show
        Arun C Murthy added a comment - It looks like ReduceContext has a getCounter() method. Am I missing a subtlety? The counters you get from a {Map|Reduce} Context are only specific to the specific task. One would have to jump through a whole set of hoops i.e. create new JobClient or equivalent in the new context object apis), query the JobTracker for rolled up counters and even then they aren't guaranteed to be completely accurate (until job completion), thus I wouldn't recommend that we rely upon them.
        Hide
        Thejas M Nair added a comment -

        In SampleLoader.java
        ====================
        Isn't the idea of SampleLoader only to carry common code for RandomSampleLoader and PoissonLoader
        and add a computeSamples() method? - Looks like now it has the getNext() implementation
        needed by RandomSampleLoader in it now. Should we move that to RandomSampleLoader instead?

        RandomSampleLoader.getNext() is fairly generic, it can be used by any new sample loader classes where the number of samples to be sampled in each map is known in advance. So having this getNext() implementation in SampleLoader can be useful in future.

        Why is skipNext() needed? Can't loader.getNext() == null be used instead? If so, is recordReader
        needed?

        skipNext() calls recordReader.getNext() which does not parse the record in to a tuple, unlike loader.getNext(). This way records can be more efficiently skipped.

        I will create a new patch addressing other comments.

        Show
        Thejas M Nair added a comment - In SampleLoader.java ==================== Isn't the idea of SampleLoader only to carry common code for RandomSampleLoader and PoissonLoader and add a computeSamples() method? - Looks like now it has the getNext() implementation needed by RandomSampleLoader in it now. Should we move that to RandomSampleLoader instead? RandomSampleLoader.getNext() is fairly generic, it can be used by any new sample loader classes where the number of samples to be sampled in each map is known in advance. So having this getNext() implementation in SampleLoader can be useful in future. Why is skipNext() needed? Can't loader.getNext() == null be used instead? If so, is recordReader needed? skipNext() calls recordReader.getNext() which does not parse the record in to a tuple, unlike loader.getNext(). This way records can be more efficiently skipped. I will create a new patch addressing other comments.
        Hide
        Pradeep Kamath added a comment -

        Isn't the current implementation in SampleLoader.getNext() a random sample implementation? Given that it is random, would it not be more appropriate in RandomSampleLoader than in the base class SampleLoader? Since right now the only two subclasses under SampleLoader are Poisson and RandomSampleLoader, should we decide on moving this implementation into SampleLoader later when there is a use case for this being in the base class?

        Show
        Pradeep Kamath added a comment - Isn't the current implementation in SampleLoader.getNext() a random sample implementation? Given that it is random, would it not be more appropriate in RandomSampleLoader than in the base class SampleLoader? Since right now the only two subclasses under SampleLoader are Poisson and RandomSampleLoader, should we decide on moving this implementation into SampleLoader later when there is a use case for this being in the base class?
        Hide
        Thejas M Nair added a comment -

        Yes, I think SampleLoader.getNext() can be moved to RandomSampleLoader(). Any new class that needs to use random sample implementation, can subclass RandomSampleLoader instead of SampleLoader.

        Show
        Thejas M Nair added a comment - Yes, I think SampleLoader.getNext() can be moved to RandomSampleLoader(). Any new class that needs to use random sample implementation, can subclass RandomSampleLoader instead of SampleLoader.
        Hide
        Thejas M Nair added a comment -

        Instead of adding the num-rows information as a last special tuple, I am making a change to have this as part of the last tuple, appended to its end (special marker column and num-rows column).

        Instead of keeping track of max. num of columns in the different rows and then appending the
        special marker string and num of rows at the end, would it be better to just have these as the
        first two fields of the last tuple emitted and then introduce a split-union combination to
        ensure that the foreach pipeline gets the regular tuples (excluding the special tuple)?

        In the implementation in my upcoming patch, foreach pipleline that evaluates the join expression (in map of sampling MR job) would be getting regular tuples, except in case of last tuple. This is safer than existing implementation in trunk where all the tuples had a disk-size column appended to it. The split-union approach proposed above helps in getting the special tuple to bypass the foreach, but getting it around the reduce stage (of sampling MR job) sort would involve lot more changes (if the special tuple has marker and num-rows as first two columns).

        Show
        Thejas M Nair added a comment - Instead of adding the num-rows information as a last special tuple, I am making a change to have this as part of the last tuple, appended to its end (special marker column and num-rows column). Instead of keeping track of max. num of columns in the different rows and then appending the special marker string and num of rows at the end, would it be better to just have these as the first two fields of the last tuple emitted and then introduce a split-union combination to ensure that the foreach pipeline gets the regular tuples (excluding the special tuple)? In the implementation in my upcoming patch, foreach pipleline that evaluates the join expression (in map of sampling MR job) would be getting regular tuples, except in case of last tuple. This is safer than existing implementation in trunk where all the tuples had a disk-size column appended to it. The split-union approach proposed above helps in getting the special tuple to bypass the foreach, but getting it around the reduce stage (of sampling MR job) sort would involve lot more changes (if the special tuple has marker and num-rows as first two columns).
        Hide
        Thejas M Nair added a comment -

        Latest patch addressing Pradeep's comments.
        I will fix the orderby, skew-join test cases in a separate patch.
        testpatch output -
        [exec] -1 overall.
        [exec]
        [exec] +1 @author. The patch does not contain any @author tags.
        [exec]
        [exec] -1 tests included. The patch doesn't appear to include any new or modified tests.
        [exec] Please justify why no tests are needed for this patch.
        [exec]
        [exec] +1 javadoc. The javadoc tool did not generate any warning messages.
        [exec]
        [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings.
        [exec]
        [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings.
        [exec]
        [exec] +1 release audit. The applied patch does not increase the total number of release audit warnings.

        Show
        Thejas M Nair added a comment - Latest patch addressing Pradeep's comments. I will fix the orderby, skew-join test cases in a separate patch. testpatch output - [exec] -1 overall. [exec] [exec] +1 @author. The patch does not contain any @author tags. [exec] [exec] -1 tests included. The patch doesn't appear to include any new or modified tests. [exec] Please justify why no tests are needed for this patch. [exec] [exec] +1 javadoc. The javadoc tool did not generate any warning messages. [exec] [exec] +1 javac. The applied patch does not increase the total number of javac compiler warnings. [exec] [exec] +1 findbugs. The patch does not introduce any new Findbugs warnings. [exec] [exec] +1 release audit. The applied patch does not increase the total number of release audit warnings.
        Thejas M Nair made changes -
        Attachment PIG-1062.5.patch [ 12425358 ]
        Hide
        Pradeep Kamath added a comment -

        Marking resolved since all changes have been committed.

        Show
        Pradeep Kamath added a comment - Marking resolved since all changes have been committed.
        Pradeep Kamath made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Hadoop Flags [Incompatible change, Reviewed]
        Fix Version/s 0.7.0 [ 12314397 ]
        Resolution Fixed [ 1 ]
        Daniel Dai made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Transition Time In Source Status Execution Times Last Executer Last Execution Date
        Patch Available Patch Available Open Open
        23h 7m 1 Thejas M Nair 14/Nov/09 01:12
        Open Open Patch Available Patch Available
        14d 6h 58m 2 Thejas M Nair 14/Nov/09 01:14
        Patch Available Patch Available Resolved Resolved
        73d 22h 59m 1 Pradeep Kamath 27/Jan/10 00:14
        Resolved Resolved Closed Closed
        107d 6h 31m 1 Daniel Dai 14/May/10 07:45

          People

          • Assignee:
            Thejas M Nair
            Reporter:
            Thejas M Nair
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development