I have ResourceStats hooked up to LogicalOperators already, need to port the code to the new branch. This will let us take statistics, if they are available, and pass them into the PoissonSampleLoader at initialization time, so it can get the number of tuples and avg tuple size directly from Stats.
That being said, statistics may not always be available...
Before I go into the more fanciful suggestion below – perhaps a simple hack will do. We have counters in Hadoop. Any reason we can't just read "bytes read in map", "records read in map", "bytes written in map", "records written in map" counters directly?
If I am overlooking something obvious, here's the "ignore counters" suggestion:
If my understanding is correct, in PoissonSampleLoader we are interested in the average size of a tuple more than # of tuples – # of tuples is just used as a way of crudely estimating avg size of tuple on disk, which is in turn used to crudely estimate the size of tuple in memory. The estimate is likely to be very off, by the way, if we are not loading from BinStorage, but from arbitrary loadFuncs, as the underlying data, even if it is a file, might be compressed.
Perhaps we can get the average tuple size directly, instead? We could get that in the mappers of the sampling job by recording memory usage at the first getNext() call, forcing garbage collection, buffering up K tuples, and getting memory usage again.
We now have the following variables available to each sampling mapper in the SkewedPartitioner:
- sample rate S (for the appropriate Poisson distribution)
- total # of mappers, M
- available heap size on the reducer, H
- estimated avg size of tuple, s
The number of tuples we want to sample is then simply T = max(10, S*H/(s*M))
In getNext(), we can now allocate a buffer for T elements, populate it with the first T tuples, and continue scanning the partition. For every ith next() call, we generate a random number r s.t. 0<=r<i, and if r<T we insert the new tuple into our buffer at position r. This gives us a nicely random sample of the tuples in the partition.
So this gets around the need for file size info on that side.
Now, PartitionSkewedKey uses the file size / avg_tuple_disk_size to estimate total number of tuples, and uses this estimate, plus the ratio of instances of a given key in the sample to the total sample size to predict the total number of records with a given key in the input. But given the number of sampled tuples, and the sample rate, couldn't we calculate the total number of records in the original file by simply reversing the formula for determining the number of tuples to sample? If we do this, no need to append any metadata.
Lastly, if we do want to move around metadata such as number of records in input, etc, and we don't want to use Hadoop counters, we should extend BinStorage with ResourceStats serialization, and use ResourceStatistics for this. Even if the original data might not have stats, there is no reason we can't generate these basic counts at runtime for the data we write ourselves.