Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Cannot Reproduce
-
0.9.1
-
None
Description
I have created a random matrix of 1M rows with 10K items on each row, semicolon-separated. While reading it with Spark 0.9.1 and doing a count, I consistently get less than 1M rows, and a different number every time at that ( !! ). Example below:
head -n 1 tool-generate-random-matrix*log
==> tool-generate-random-matrix-999158.log <==
Row item counts: 999158
==> tool-generate-random-matrix.log <==
Row item counts: 997163
The data is split into 1000 partitions. When I download it using s3cmd sync, and run the following AWK on it, I get the correct number of rows in each partition (1000x1000 = 1M). What is up?
for k in part-0* do echo $k awk -F ";" ' NF != 10000 { print "Wrong number of items:",NF } END { if (NR != 1000) { print "Wrong number of rows:",NR } }' "$k" done
The matrix generation and counting code is below:
package fi.helsinki.cs.nodes.matrix import java.util.Random import org.apache.spark._ import org.apache.spark.SparkContext._ import scala.collection.mutable.ListBuffer import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel._ object GenerateRandomMatrix { def NewGeMatrix(rSeed: Int, rdd: RDD[Int], features: Int) = { rdd.mapPartitions(part => part.map(xarr => { val rdm = new Random(rSeed + xarr) val arr = new Array[Double](features) for (i <- 0 until features) arr(i) = rdm.nextDouble() new Row(xarr, arr) })) } case class Row(id: Int, elements: Array[Double]) {} def rowFromText(line: String) = { val idarr = line.split(" ") val arr = idarr(1).split(";") // -1 to fix saved matrix indexing error new Row(idarr(0).toInt-1, arr.map(_.toDouble)) } def main(args: Array[String]) { val master = args(0) val tasks = args(1).toInt val savePath = args(2) val read = args.contains("read") val datapoints = 1000000 val features = 10000 val sc = new SparkContext(master, "RandomMatrix") if (read) { val randomMatrix: RDD[Row] = sc.textFile(savePath, tasks).map(rowFromText).persist(MEMORY_AND_DISK) println("Row item counts: "+ randomMatrix.count) } else { val rdd = sc.parallelize(0 until datapoints, tasks) val bcSeed = sc.broadcast(128) /* Generating a matrix of random Doubles */ val randomMatrix = NewGeMatrix(bcSeed.value, rdd, features).persist(MEMORY_AND_DISK) randomMatrix.map(row => row.id + " " + row.elements.mkString(";")).saveAsTextFile(savePath) } sc.stop } }
I run this with:
appassembler/bin/tool-generate-random-matrix master 1000 s3n://keys@path/to/data 1>matrix.log 2>matrix.err
Reading from HDFS gives the right count and right number of items on each row. However, I had to run with the full path with the server name, just /matrix does not work (it thinks I want file://):
p="hdfs://ec2-54-188-6-77.us-west-2.compute.amazonaws.com:9000/matrix"
appassembler/bin/tool-generate-random-matrix $( cat /root/spark-ec2/cluster-url ) 1000 "$p" read 1>readmatrix.log 2>readmatrix.err
Attachments
Issue Links
- is duplicated by
-
SPARK-5917 Distinct is broken
- Resolved