We've seen significant amounts of execution skew (big gap between avg and max execution time for a scan node) with multithreading enabled on TPC-DS queries. We balance bytes well, but bytes of input files are often not correlated with the amount of work in the scan, or above the scan. Some causes are:
- Dynamic partition pruning leading to different instance with variable numbers of input splits
- Different amounts of rows being filtered out by predicates and row filters, leading to skew in rows returned from the plan.
- Different amounts of compressibility
- Files being written in different ways, e.g. different schema, different writer.
More dynamic load balancing can address all of this if scans pick up the next range when its pipeline has finished processing the rows from the previous range. I.e. with the threading model we can deal with time skew anywhere in the pipeline by balancing in the scan.
I think we can solve this for HDFS scans by lifting the ReaderContext up to the FragmentState (one per plan node) and making corresponding changes to the scan implementation. We would need to add a bit more machinery to support Kudu and HBase scans but I think a similar approach would work conceptually.
A more invasive (and probably expensive) solution is to do a local exchange above the scan node, e.g. a multi-producer multi-consumer queue.