Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
tez-branch
-
None
Description
Let's say you have a query like this-
set default_parallel 200; x = cogroup foo by a, bar by b parallel 10; y = join x by c, z by d;
I would expect that cogroup has a parallel of 10 while join has a parallel of 200. However, the parallel of cogroup is also set to 200.
Here is where the default parallelism overwrites the user-specified parallelism.
TezCompiler.java#L390
if (op.getRequestedParallelism() > curTezOp.getRequestedParallelism()) {
curTezOp.setRequestedParallelism(op.getRequestedParallelism());
}
In the above example, "op" is POLocalRearrange of join, and "curTezOp" is TezOperator that contains both POPackage of cogroup and POLocalRearrange of join.
Here is what the TezOperator looks like-
| join_allocs_mop: Local Rearrange[tuple]{long}(false) - scope-134 -> null | | | | | Project[long][10] - scope-135 | | | |---join_allocs_subscrn: New For Each(true)[bag] - scope-75 | | | | | POUserFunc(org.apache.pig.scripting.jython.JythonFunction)[bag] - scope-70 | | | | | |---POUserFunc(org.apache.pig.builtin.TOTUPLE)[tuple] - scope-69 | | | | | |---Project[bag][0] - scope-67 | | | | | |---RelationToExpressionProject[bag][*] - scope-68 | | | | | |---ab_exp_63_day_subscrn_d_ordered: POSort[bag]() - scope-74 | | | | | | | Project[chararray][9] - scope-73 | | | | | |---Project[bag][1] - scope-72 | | | |---New For Each(false,false)[bag] - scope-66 | | | | | Project[bag][1] - scope-62 | | | | | Project[bag][2] - scope-64 | | | |---abNonmemberByCustomer: Package(Packager)[tuple]{long} - scope-57
The problem is that the parallelism of root (POPackage) is overwritten by that of leaves (POLocalRearrange) because the latter (200) > the former (10).