Description
In SPARK-12869, this function was optimized for the case of dense matrices using Breeze. However, I have a case with very very sparse matrices which suffers a great deal from this optimization. A process we have that took about 20 mins now takes about 6.5 hours.
Here is a sample code to see the difference:
val n = 40000
val density = 0.0002
val rnd = new Random(123)
val rndEntryList = (for (i <- 0 until (n*n*density).toInt) yield (rnd.nextInt(n), rnd.nextInt(n), rnd.nextDouble()))
.groupBy(t => (t._1,t._2)).map(t => t._2.last).map{ case (i,j,d) => (i,(j,d)) }.toSeq
val entries: RDD[(Int, (Int, Double))] = sc.parallelize(rndEntryList, 10)
val indexedRows = entries.groupByKey().map(e => IndexedRow(e._1, Vectors.sparse(n, e._2.toSeq)))
val mat = new IndexedRowMatrix(indexedRows, nRows = n, nCols = n)val t1 = System.nanoTime()
println(mat.toBlockMatrix(10000,10000).toCoordinateMatrix().toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
val t2 = System.nanoTime()
println("took: " + (t2 - t1) / 1000 / 1000 + " ms")
println("============================================================")
println(mat.toBlockMatrix(10000,10000).toIndexedRowMatrix().rows.map(_.vector.numActives).sum())
val t3 = System.nanoTime()
println("took: " + (t3 - t2) / 1000 / 1000 + " ms")
println("============================================================")
I get:
took: 9404 ms
============================================================
took: 57350 ms
============================================================
Looking at it a little with a profiler, I see that the problem is with the SliceVector.update() and SparseVector.apply.
I currently work-around this by doing:
blockMatrix.toCoordinateMatrix().toIndexedRowMatrix()
like it was in version 1.6.
Attachments
Attachments
Issue Links
- is related to
-
SPARK-12869 Optimize conversion from BlockMatrix to IndexedRowMatrix
- Resolved
- links to