Description
If a vertex need to increase parallelism dynamically, it cannot has additional broadcast input. In the test case, join vertex requires a broadcast edge to acquire sample file (used in IsFirstReduceOfKeyTez), and the join vertex use PartitionerDefinedVertexManager which can increase parallelism dynamically. If that happens, we will see exception:
Fetch failed:java.lang.IndexOutOfBoundsException at java.nio.Buffer.checkIndex(Buffer.java:532) at java.nio.ByteBufferAsLongBufferB.get(ByteBufferAsLongBufferB.java:115) at org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord.getIndex(TezSpillRecord.java:101) at org.apache.tez.runtime.library.common.shuffle.Fetcher.getTezIndexRecord(Fetcher.java:596) at org.apache.tez.runtime.library.common.shuffle.Fetcher.doLocalDiskFetch(Fetcher.java:537) at org.apache.tez.runtime.library.common.shuffle.Fetcher.setupLocalDiskFetch(Fetcher.java:518) at org.apache.tez.runtime.library.common.shuffle.Fetcher.callInternal(Fetcher.java:191) at org.apache.tez.runtime.library.common.shuffle.Fetcher.callInternal(Fetcher.java:70) at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
The fix disable dynamic parallelism if there is a broadcast edge (including sample file in skewed outer join, scalar followed by skewed join/order by). Also in inner skewed join, the sample file is not needed in join vertex, so it is not necessarily to broadcast it thus disable dynamic parallelism.
Attachments
Attachments
Issue Links
- is broken by
-
PIG-4377 Skewed outer join produce wrong result if a key is oversampled
- Closed