Description
After I take a deep look into `BlockMatrix.multiply` implementation, I found that current implementation may cause some problem in special cases.
Now let me use an extreme case to represent it:
Suppose we have two blockMatrix A and B
A has 10000 blocks, numRowBlocks = 1, numColBlocks = 10000
B also has 10000 blocks, numRowBlocks = 10000, numColBlocks = 1
Now if we call A.mulitiply(B), no matter how A and B is partitioned,
the resultPartitioner will always contains only one partition,
this muliplication implementation will shuffle 10000 * 10000 blocks into one reducer, this will cause the parallism became 1,
what's worse, because `RDD.cogroup` will load the total group element into memory, now at reducer-side, 10000 * 10000 blocks will be loaded into memory, because they are all shuffled into the same group. It will easily cause executor OOM.
The above case is a little extreme, but other case, such as M*N dimensions matrix A multiply N*P dimensions matrix B, when N is much larger than M and P, we met the similar problem.
The multiplication implementation do not handle the task partition properly,
it will cause:
1. when the middle dimension N is too large, it will cause reducer OOM.
2. even if OOM do not occur, it will still cause parallism too low.
3. when N is much large than M and P, and matrix A and B have many partitions, it will cause too many partition on M and P dimension, it will cause much larger shuffled data size.