Description
In Kmeans algorithm, there is a zip operation before taking samples, i.e. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210, which can be simplified in the following code:
val rdd = ... val rdd2 = rdd.map(x => x) rdd.zip(rdd2).count()
However, RRDD fails on this operation with an error of "can only zip rdd with same number of elements" or "stream closed", similar to the JIRA issue: https://issues.apache.org/jira/browse/SPARK-2251
Inside RRDD, a data stream is used to ingest data from the R side. In the zip operation, zip with self computes each partition twice. So if we zip a HadoopRDD (iris dataset) with itself, we get
we get a pair (6.8, 6.8) we get a pair (5.1, 5.1) we get a pair (6.7, 6.7) we get a pair (4.9, 4.9) we get a pair (6.0, 6.0) we get a pair (4.7, 4.7) we get a pair (5.7, 5.7) we get a pair (4.6, 4.6) we get a pair (5.5, 5.5) we get a pair (5.0, 5.0) we get a pair (5.5, 5.5) we get a pair (5.4, 5.4) we get a pair (5.8, 5.8) we get a pair (4.6, 4.6) we get a pair (6.0, 6.0) we get a pair (5.0, 5.0) we get a pair (5.4, 5.4) we get a pair (4.4, 4.4) we get a pair (6.0, 6.0) we get a pair (4.9, 4.9) we get a pair (6.7, 6.7) we get a pair (5.4, 5.4) we get a pair (6.3, 6.3) we get a pair (4.8, 4.8) we get a pair (5.6, 5.6) we get a pair (4.8, 4.8) we get a pair (5.5, 5.5) we get a pair (4.3, 4.3) we get a pair (5.5, 5.5) we get a pair (5.8, 5.8) we get a pair (6.1, 6.1) we get a pair (5.7, 5.7) we get a pair (5.8, 5.8) we get a pair (5.4, 5.4) we get a pair (5.0, 5.0) we get a pair (5.1, 5.1) we get a pair (5.6, 5.6) we get a pair (5.7, 5.7) we get a pair (5.7, 5.7) we get a pair (5.1, 5.1) we get a pair (5.7, 5.7) we get a pair (5.4, 5.4) we get a pair (6.2, 6.2) we get a pair (5.1, 5.1) we get a pair (5.1, 5.1) we get a pair (4.6, 4.6) we get a pair (5.7, 5.7) we get a pair (5.1, 5.1) we get a pair (6.3, 6.3) we get a pair (4.8, 4.8) we get a pair (5.8, 5.8) we get a pair (5.0, 5.0) we get a pair (7.1, 7.1) we get a pair (5.0, 5.0) we get a pair (6.3, 6.3) we get a pair (5.2, 5.2) we get a pair (6.5, 6.5) we get a pair (5.2, 5.2) we get a pair (7.6, 7.6) we get a pair (4.7, 4.7) we get a pair (4.9, 4.9) we get a pair (4.8, 4.8) we get a pair (7.3, 7.3) we get a pair (5.4, 5.4) we get a pair (6.7, 6.7) we get a pair (5.2, 5.2) we get a pair (7.2, 7.2) we get a pair (5.5, 5.5) we get a pair (6.5, 6.5) we get a pair (4.9, 4.9) we get a pair (6.4, 6.4) we get a pair (5.0, 5.0) we get a pair (6.8, 6.8) we get a pair (5.5, 5.5) we get a pair (5.7, 5.7) we get a pair (4.9, 4.9) we get a pair (5.8, 5.8) we get a pair (4.4, 4.4) we get a pair (6.4, 6.4) we get a pair (5.1, 5.1) we get a pair (6.5, 6.5) we get a pair (5.0, 5.0) we get a pair (7.7, 7.7) we get a pair (4.5, 4.5) we get a pair (7.7, 7.7) we get a pair (4.4, 4.4) we get a pair (6.0, 6.0) we get a pair (5.0, 5.0) we get a pair (6.9, 6.9) we get a pair (5.1, 5.1) we get a pair (5.6, 5.6) we get a pair (4.8, 4.8) we get a pair (7.7, 7.7) we get a pair (6.3, 6.3) we get a pair (5.1, 5.1) we get a pair (6.7, 6.7) we get a pair (4.6, 4.6) we get a pair (7.2, 7.2) we get a pair (5.3, 5.3) we get a pair (6.2, 6.2) we get a pair (5.0, 5.0) we get a pair (6.1, 6.1) we get a pair (7.0, 7.0) we get a pair (6.4, 6.4) we get a pair (6.4, 6.4) we get a pair (7.2, 7.2) we get a pair (6.9, 6.9) we get a pair (7.4, 7.4) we get a pair (5.5, 5.5) we get a pair (7.9, 7.9) we get a pair (6.5, 6.5) we get a pair (6.4, 6.4) we get a pair (5.7, 5.7) we get a pair (6.3, 6.3) we get a pair (6.3, 6.3) we get a pair (6.1, 6.1) we get a pair (4.9, 4.9) we get a pair (7.7, 7.7) we get a pair (6.6, 6.6) we get a pair (6.3, 6.3) we get a pair (5.2, 5.2) we get a pair (6.4, 6.4) we get a pair (5.0, 5.0) we get a pair (6.0, 6.0) we get a pair (5.9, 5.9) we get a pair (6.9, 6.9) we get a pair (6.0, 6.0) we get a pair (6.7, 6.7) we get a pair (6.1, 6.1) we get a pair (6.9, 6.9) we get a pair (5.6, 5.6) we get a pair (5.8, 5.8) we get a pair (6.7, 6.7) we get a pair (6.8, 6.8) we get a pair (5.6, 5.6) we get a pair (6.7, 6.7) we get a pair (5.8, 5.8) we get a pair (6.7, 6.7) we get a pair (6.2, 6.2) we get a pair (6.3, 6.3) we get a pair (5.6, 5.6) we get a pair (6.5, 6.5) we get a pair (5.9, 5.9) we get a pair (6.2, 6.2) we get a pair (6.1, 6.1) we get a pair (5.9, 5.9) we get a pair (6.3, 6.3) we get a pair (6.1, 6.1) we get a pair (6.4, 6.4) we get a pair (6.6, 6.6)
However, in RRDD with the same setting we get:
we get a pair (5.1, 5.1) we get a pair (4.9, 4.7) we get a pair (4.6, 5.0) we get a pair (5.4, 4.6) we get a pair (5.0, 4.4) we get a pair (4.9, 5.4) we get a pair (4.8, 4.8) we get a pair (4.3, 5.8) we get a pair (5.7, 5.4) we get a pair (5.1, 5.7) we get a pair (5.1, 5.4) we get a pair (5.1, 4.6) we get a pair (5.1, 4.8) we get a pair (5.0, 5.0) we get a pair (5.2, 5.2) we get a pair (4.7, 4.8) we get a pair (5.4, 5.2) we get a pair (5.5, 4.9) we get a pair (5.0, 5.5) we get a pair (4.9, 4.4) we get a pair (5.1, 5.0) we get a pair (4.5, 4.4) we get a pair (5.0, 5.1) we get a pair (4.8, 5.1) we get a pair (4.6, 5.3) we get a pair (5.0, 7.0) we get a pair (6.4, 6.9) we get a pair (5.5, 6.5) we get a pair (5.7, 6.3) we get a pair (4.9, 6.6) we get a pair (5.2, 5.0) we get a pair (5.9, 6.0) we get a pair (6.1, 5.6) we get a pair (6.7, 5.6) we get a pair (5.8, 6.2) we get a pair (5.6, 5.9) we get a pair (6.1, 6.3) we get a pair (6.1, 6.4) we get a pair (6.6, 6.8) we get a pair (6.7, 6.0) we get a pair (5.7, 5.5) we get a pair (5.5, 5.8) we get a pair (6.0, 5.4) we get a pair (6.0, 6.7) we get a pair (6.3, 5.6) we get a pair (5.5, 5.5) we get a pair (6.1, 5.8) we get a pair (5.0, 5.6) we get a pair (5.7, 5.7) we get a pair (6.2, 5.1) we get a pair (5.7, 6.3) we get a pair (5.8, 7.1) we get a pair (6.3, 6.5) we get a pair (7.6, 4.9) we get a pair (7.3, 6.7) we get a pair (7.2, 6.5) we get a pair (6.4, 6.8) we get a pair (5.7, 5.8) we get a pair (6.4, 6.5) we get a pair (7.7, 7.7) we get a pair (6.0, 6.9) we get a pair (5.6, 7.7) we get a pair (6.3, 6.7) we get a pair (7.2, 6.2) we get a pair (6.1, 6.4) we get a pair (7.2, 7.4) we get a pair (7.9, 6.4) we get a pair (6.3, 6.1) we get a pair (7.7, 6.3) we get a pair (6.4, 6.0) we get a pair (6.9, 6.7) we get a pair (6.9, 5.8) we get a pair (6.8, 6.7) we get a pair (6.7, 6.3) we get a pair (6.5, 6.2) we need to close stream java.io.DataInputStream@507affd3 in thread 127 we need to close stream java.io.DataInputStream@507affd3 in thread 127
We can see from the end of the log that the data stream is closed twice.
The simplest way to avoid the error is using "cache" to cut off the lineage as shown below. However, sometimes we do not want to cache the data.
val rdd = ... rdd.cache() val rdd2 = rdd.map(x => x) rdd.zip(rdd2).count()
Attachments
Issue Links
- depends upon
-
SPARK-12792 Refactor RRDD to support R UDF
- Resolved
- links to
Issue resolved by pull request 12606
https://github.com/apache/spark/pull/12606