This task aims for a deeper integration of Spark Datasets into SystemML. Consider the following example scenario, invoked through MLContext with X being a DataSet<Row>:
Currently, we would convert the input dataset to binary block (1st shuffle) at API level and subsequently pass it into SystemML. For large data, we would then compile a single parfor data-partition execute job that slices row fragments, collects row fragments int partitions (2nd shuffle), and finally executes the parfor body per partition.
Native dataset support would allow us to avoid these two shuffles and compute the entire parfor in a data-local manner. In detail, this involves the following extensions:
- API level: Keep lineage of input dataset leveraging our existing lineage mechanism in MatrixObject
- Parfor datapartition-execute:
SYSTEMML-1367already introduced the data-local processing for special cases (if ncol<=blocksize). Given the lineage, we can simply probe the input to datapartition-execute and, for row partitioning, use directly the dataset instead of the reblocked matrix rdd in a data-local manner. This does not just avoid the 2nd shuffle but due to lazy evaluation also the 1st shuffle if no operation other than parfor accesses X (except zipwithindex if no ids are passed in, as this transformation triggers computation)
- Cleanup: Prevent cleanup (unpersist) of lineage objects of type dataset as they are passed from outside.