@Yiping
I see what you mean. Maybe we should have FOREACH and FORALL as in B = FORALL A GENERATE SUM(m);
Another version of this my be B = OVER A GENERATE SUM(m); or B = OVERALL A GENERATE SUM(m);
There was a hallway conversation about the situation of:
B = GROUP A BY key;
C = FOREACH B {
SORTED = ORDER A BY value;
GENERATE
COUNT(SORTED) as count,
QUANTILES(SORTED.value, 0.0, 0.5, 0.75, 0.9, 1.0) as quantiles: (p00, p50, p75, p90, p100);
};
I was told that a ReadOnce bag would not solve this problem because we'd need to pass through SORTED twice because there were two UDFs.
I disagree. It is possible to pass over this data once and only once if we create a class of Accumulating or Running functions that differs from the current DataBag Eval and AlgebraicEval functions.
First, functions like SUM, COUNT, AVG, VAR, MIN, MAX, STDEV, ResevoirSampling, statistics.SUMMARY, can all computed on a ReadOnce / Streaming DataBag of unknown length or size. For each of these functions, we simply "add" or "accumulate" the values on row at a time, we can invoke a combiner for intermediate results across partitions, and produce a final result, all without materializing a DataBag as implemented today.
QUANTILES is a different beast. To compute quantiles, the data must be sorted, which I prefer to do outside the UDF at this time. Also, the COUNT of the data is needed a prior. Fortunately sorting COULD produce a ReadOnce / Streaming DataBag of KNOWN as opposed to unknown length or size so only two scans through the data (sorting and quantiles) are needed without needing three scans (sort, count, quantiles).
So, if Pig could understand two additional data types
ReadOnceSizeUnknown – COUNT() counts all individual rows
ReadOnceSizeKnown – COUNT() just returns size attribute of ReadOnce data reference
And if Pig had RunningEval and RunningAlgebraicEval classes of functions which accumulate values a row at a time, many computations in Pig could be much much more efficient.
In case anyone doesn't "get" what I mean by having running functions, here's some Perl code that implements what I'm suggesting. I'll leave it as an exercise for the Pig development team to figure out the RunningAlgebraicEval versions of these functions/classes. :^)
runningsums.pl
#! /usr/bin/perl
use RunningSum;
use RunningCount;
$a_count = RunningCount->new();
$a_sum = RunningSum->new();
$b_sum = RunningSum->new();
$c_sum = RunningSum->new();
while (<>)
{
s/\r*\n*
($a, $b, $c) = split(/\t/);
$a_count->accumulate($a);
$a_sum->accumulate($a);
$b_sum->accumulate($b);
$c_sum->accumulate($c);
}
print join("\t",
$a_count->final(),
$a_sum->final(),
$b_sum->final(),
$c_sum->final()
), "\n";
RunningCount.pm
package RunningCount;
sub new
{
my $class = shift;
my $self = {};
bless $self, $class;
return $self;
}
sub accumulate
{
my $self = shift;
my $value = shift;
$self->{'count'} ++;
}
sub final
{
my $self = shift;
return $self->{'count'};
}
1;
RunningSum.pl
package RunningSum;
sub new
{
my $class = shift;
my $self = {};
bless $self, $class;
return $self;
}
sub accumulate
{
my $self = shift;
my $value = shift;
$self->{'sum'} += $value;
}
sub final
{
my $self = shift;
return $self->{'sum'};
}
1;
I would say instead of annotating the UDF to indicate ""read once" bags, it would be easier to do that in the co-group command. We would skip bag materialization only if it is accessed by UDFs that ALL read it in the "read once" manner. Thus we only need to specify that once.