Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
I'm getting a "Scalar has more than one row in the output" error with the following script:
a = LOAD 't' as (x:chararray);
b = GROUP a BY x PARALLEL 2;
c = GROUP b by group;
d = FOREACH (GROUP a ALL) GENERATE COUNT(a) as count;
e = FOREACH c GENERATE group, d.count;
DUMP e;
If I add a PARALLEL clause to c, the error goes away, so the issue seems to be related to auto parallelism.
I'm not very familiar with Tez, so I'm not sure how things are supposed to work, the issue seems to be related to the following (I know almost nothing about Tez so take this with a grain of salt):
- PigGraceShuffleVertexManager calls VertexImpl.reconfigureVertex(), which configures the parallelism of the vertex (VertexImpl.numTasks)
- The InputSpec for the scalar input is created (via Edge.getDestinationSpec()) with physicalInputCount equal to the parallelism set above
- The input is created (in LogicalIOProcessorRuntimeTask.createInput()) based on this InputSpec.
- The resulting UnorderedKVInput creates a ShuffleManager with numInputs = numPhysicalInputs.
This creates a reader that reads the scalar input numPhysicalInputs times, which results in the "Scalar has more than one row in the output" error in ReadScalarsTez.
When parallelism is specified explicitly, VertexImpl.reconfigureVertex() is never called, and numPhysicalInputs remains as 1 for the scalar input.