Affects Version/s: 0.1.0
Fix Version/s: 0.1.0
Patch Info:Patch Available
If a spill occurs while elements are being inserted into a DistinctDataBag, it's possible that non-unique items will be added to the in-memory data structure, and the mSize counter will be incremented. If the same elements also exist on disk, the count will be higher than it should be.
The following is copied from an email exchange I had with Alan Gates:
Thanks for your help. I've done a bit more experimentation and have discovered a couple more things. I first looked at how COUNT was implemented. It looks like COUNT calls size() on the bag, which will return mSize. I thought that mSize might be calculated improperly so I added "SUM(unique_ids) AS crazy_userid_sum" to my GENERATE line and re-ran the pigfile:
GENERATE FLATTEN(group), SUM(nice_data.duration) AS total_duration, COUNT(nice_data) AS channel_switches, COUNT(unique_ids) AS unique_users, SUM(unique_ids) AS crazy_userid_sum;
It turns out that the SUM generates the correct result in all cases, while there are still occasional errors in the COUNT. Since SUM requires an iteration over all the elements in the DistinctDataBag, this led me to believe that the uniqueness constraint is indeed operating correctly, but there is some error in the logic that calculates mSize.
Then I started poking around in DistinctDataBag looking for anything that changes mSize that might be incorrect. I noticed that on line 87 in addAll(), the size of the DataBag that is passed into the method is added to the mSize instance variable, and then during the iteration a few lines later mSize is being incremented when an element is successfully added to mContents. I thought this might be the problem, since it seems like elements would be double counted if addAll() was called. I commented out line 87, recompiled Pig, and ran it again, but there are still errors (though I do think line 87 might be incorrect anyways).
Thanks to my coworker Marshall, I think we may have discovered what the actual problem is. The scenario is as follows: We're adding a bunch of stuff to the bag, and before we're finished a spill occurs. mContents is cleared during the spill (line 157). All add() does is check uniqueness against mContents. So now we will get duplicates in mContents that are already on disk and an inflated mSize. Now, the reason why SUM works is because the iterator is smart and enforces uniqueness as it reads the records back in. We think this occurs at the beginning of addToQueue, around line 363 - 369. mMergeTree is a TreeSet, so it'll enforce uniqueness and the call to addToQueue is aborted if there's already a matching record in mMergeTree.
Do you think our assessment is correct? If so, it seems that the calculation of mSize needs to be significantly more complex than it is now. It looks to me like the entire bag will need to be iterated in order to reliably calculate the size. Do you have any ideas about how to implement this in a less expensive way? I'd be happy to take a stab at it, but I don't want to do anything particularly silly if you have a better idea.