Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-13178

RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.0.0
    • SparkR
    • None

    Description

      In Kmeans algorithm, there is a zip operation before taking samples, i.e. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210, which can be simplified in the following code:

      zip.scala
      val rdd =  ...
      val rdd2 = rdd.map(x => x)
      rdd.zip(rdd2).count()
      

      However, RRDD fails on this operation with an error of "can only zip rdd with same number of elements" or "stream closed", similar to the JIRA issue: https://issues.apache.org/jira/browse/SPARK-2251

      Inside RRDD, a data stream is used to ingest data from the R side. In the zip operation, zip with self computes each partition twice. So if we zip a HadoopRDD (iris dataset) with itself, we get

      log-from-zip-HadoopRDD
      we get a pair (6.8, 6.8)
      we get a pair (5.1, 5.1)
      we get a pair (6.7, 6.7)
      we get a pair (4.9, 4.9)
      we get a pair (6.0, 6.0)
      we get a pair (4.7, 4.7)
      we get a pair (5.7, 5.7)
      we get a pair (4.6, 4.6)
      we get a pair (5.5, 5.5)
      we get a pair (5.0, 5.0)
      we get a pair (5.5, 5.5)
      we get a pair (5.4, 5.4)
      we get a pair (5.8, 5.8)
      we get a pair (4.6, 4.6)
      we get a pair (6.0, 6.0)
      we get a pair (5.0, 5.0)
      we get a pair (5.4, 5.4)
      we get a pair (4.4, 4.4)
      we get a pair (6.0, 6.0)
      we get a pair (4.9, 4.9)
      we get a pair (6.7, 6.7)
      we get a pair (5.4, 5.4)
      we get a pair (6.3, 6.3)
      we get a pair (4.8, 4.8)
      we get a pair (5.6, 5.6)
      we get a pair (4.8, 4.8)
      we get a pair (5.5, 5.5)
      we get a pair (4.3, 4.3)
      we get a pair (5.5, 5.5)
      we get a pair (5.8, 5.8)
      we get a pair (6.1, 6.1)
      we get a pair (5.7, 5.7)
      we get a pair (5.8, 5.8)
      we get a pair (5.4, 5.4)
      we get a pair (5.0, 5.0)
      we get a pair (5.1, 5.1)
      we get a pair (5.6, 5.6)
      we get a pair (5.7, 5.7)
      we get a pair (5.7, 5.7)
      we get a pair (5.1, 5.1)
      we get a pair (5.7, 5.7)
      we get a pair (5.4, 5.4)
      we get a pair (6.2, 6.2)
      we get a pair (5.1, 5.1)
      we get a pair (5.1, 5.1)
      we get a pair (4.6, 4.6)
      we get a pair (5.7, 5.7)
      we get a pair (5.1, 5.1)
      we get a pair (6.3, 6.3)
      we get a pair (4.8, 4.8)
      we get a pair (5.8, 5.8)
      we get a pair (5.0, 5.0)
      we get a pair (7.1, 7.1)
      we get a pair (5.0, 5.0)
      we get a pair (6.3, 6.3)
      we get a pair (5.2, 5.2)
      we get a pair (6.5, 6.5)
      we get a pair (5.2, 5.2)
      we get a pair (7.6, 7.6)
      we get a pair (4.7, 4.7)
      we get a pair (4.9, 4.9)
      we get a pair (4.8, 4.8)
      we get a pair (7.3, 7.3)
      we get a pair (5.4, 5.4)
      we get a pair (6.7, 6.7)
      we get a pair (5.2, 5.2)
      we get a pair (7.2, 7.2)
      we get a pair (5.5, 5.5)
      we get a pair (6.5, 6.5)
      we get a pair (4.9, 4.9)
      we get a pair (6.4, 6.4)
      we get a pair (5.0, 5.0)
      we get a pair (6.8, 6.8)
      we get a pair (5.5, 5.5)
      we get a pair (5.7, 5.7)
      we get a pair (4.9, 4.9)
      we get a pair (5.8, 5.8)
      we get a pair (4.4, 4.4)
      we get a pair (6.4, 6.4)
      we get a pair (5.1, 5.1)
      we get a pair (6.5, 6.5)
      we get a pair (5.0, 5.0)
      we get a pair (7.7, 7.7)
      we get a pair (4.5, 4.5)
      we get a pair (7.7, 7.7)
      we get a pair (4.4, 4.4)
      we get a pair (6.0, 6.0)
      we get a pair (5.0, 5.0)
      we get a pair (6.9, 6.9)
      we get a pair (5.1, 5.1)
      we get a pair (5.6, 5.6)
      we get a pair (4.8, 4.8)
      we get a pair (7.7, 7.7)
      we get a pair (6.3, 6.3)
      we get a pair (5.1, 5.1)
      we get a pair (6.7, 6.7)
      we get a pair (4.6, 4.6)
      we get a pair (7.2, 7.2)
      we get a pair (5.3, 5.3)
      we get a pair (6.2, 6.2)
      we get a pair (5.0, 5.0)
      we get a pair (6.1, 6.1)
      we get a pair (7.0, 7.0)
      we get a pair (6.4, 6.4)
      we get a pair (6.4, 6.4)
      we get a pair (7.2, 7.2)
      we get a pair (6.9, 6.9)
      we get a pair (7.4, 7.4)
      we get a pair (5.5, 5.5)
      we get a pair (7.9, 7.9)
      we get a pair (6.5, 6.5)
      we get a pair (6.4, 6.4)
      we get a pair (5.7, 5.7)
      we get a pair (6.3, 6.3)
      we get a pair (6.3, 6.3)
      we get a pair (6.1, 6.1)
      we get a pair (4.9, 4.9)
      we get a pair (7.7, 7.7)
      we get a pair (6.6, 6.6)
      we get a pair (6.3, 6.3)
      we get a pair (5.2, 5.2)
      we get a pair (6.4, 6.4)
      we get a pair (5.0, 5.0)
      we get a pair (6.0, 6.0)
      we get a pair (5.9, 5.9)
      we get a pair (6.9, 6.9)
      we get a pair (6.0, 6.0)
      we get a pair (6.7, 6.7)
      we get a pair (6.1, 6.1)
      we get a pair (6.9, 6.9)
      we get a pair (5.6, 5.6)
      we get a pair (5.8, 5.8)
      we get a pair (6.7, 6.7)
      we get a pair (6.8, 6.8)
      we get a pair (5.6, 5.6)
      we get a pair (6.7, 6.7)
      we get a pair (5.8, 5.8)
      we get a pair (6.7, 6.7)
      we get a pair (6.2, 6.2)
      we get a pair (6.3, 6.3)
      we get a pair (5.6, 5.6)
      we get a pair (6.5, 6.5)
      we get a pair (5.9, 5.9)
      we get a pair (6.2, 6.2)
      we get a pair (6.1, 6.1)
      we get a pair (5.9, 5.9)
      we get a pair (6.3, 6.3)
      we get a pair (6.1, 6.1)
      we get a pair (6.4, 6.4)
      we get a pair (6.6, 6.6)
      

      However, in RRDD with the same setting we get:

      log-from-zip-RRDD
      we get a pair (5.1, 5.1)
      we get a pair (4.9, 4.7)
      we get a pair (4.6, 5.0)
      we get a pair (5.4, 4.6)
      we get a pair (5.0, 4.4)
      we get a pair (4.9, 5.4)
      we get a pair (4.8, 4.8)
      we get a pair (4.3, 5.8)
      we get a pair (5.7, 5.4)
      we get a pair (5.1, 5.7)
      we get a pair (5.1, 5.4)
      we get a pair (5.1, 4.6)
      we get a pair (5.1, 4.8)
      we get a pair (5.0, 5.0)
      we get a pair (5.2, 5.2)
      we get a pair (4.7, 4.8)
      we get a pair (5.4, 5.2)
      we get a pair (5.5, 4.9)
      we get a pair (5.0, 5.5)
      we get a pair (4.9, 4.4)
      we get a pair (5.1, 5.0)
      we get a pair (4.5, 4.4)
      we get a pair (5.0, 5.1)
      we get a pair (4.8, 5.1)
      we get a pair (4.6, 5.3)
      we get a pair (5.0, 7.0)
      we get a pair (6.4, 6.9)
      we get a pair (5.5, 6.5)
      we get a pair (5.7, 6.3)
      we get a pair (4.9, 6.6)
      we get a pair (5.2, 5.0)
      we get a pair (5.9, 6.0)
      we get a pair (6.1, 5.6)
      we get a pair (6.7, 5.6)
      we get a pair (5.8, 6.2)
      we get a pair (5.6, 5.9)
      we get a pair (6.1, 6.3)
      we get a pair (6.1, 6.4)
      we get a pair (6.6, 6.8)
      we get a pair (6.7, 6.0)
      we get a pair (5.7, 5.5)
      we get a pair (5.5, 5.8)
      we get a pair (6.0, 5.4)
      we get a pair (6.0, 6.7)
      we get a pair (6.3, 5.6)
      we get a pair (5.5, 5.5)
      we get a pair (6.1, 5.8)
      we get a pair (5.0, 5.6)
      we get a pair (5.7, 5.7)
      we get a pair (6.2, 5.1)
      we get a pair (5.7, 6.3)
      we get a pair (5.8, 7.1)
      we get a pair (6.3, 6.5)
      we get a pair (7.6, 4.9)
      we get a pair (7.3, 6.7)
      we get a pair (7.2, 6.5)
      we get a pair (6.4, 6.8)
      we get a pair (5.7, 5.8)
      we get a pair (6.4, 6.5)
      we get a pair (7.7, 7.7)
      we get a pair (6.0, 6.9)
      we get a pair (5.6, 7.7)
      we get a pair (6.3, 6.7)
      we get a pair (7.2, 6.2)
      we get a pair (6.1, 6.4)
      we get a pair (7.2, 7.4)
      we get a pair (7.9, 6.4)
      we get a pair (6.3, 6.1)
      we get a pair (7.7, 6.3)
      we get a pair (6.4, 6.0)
      we get a pair (6.9, 6.7)
      we get a pair (6.9, 5.8)
      we get a pair (6.8, 6.7)
      we get a pair (6.7, 6.3)
      we get a pair (6.5, 6.2)
      we need to close stream java.io.DataInputStream@507affd3 in thread 127
      we need to close stream java.io.DataInputStream@507affd3 in thread 127
      

      We can see from the end of the log that the data stream is closed twice.

      The simplest way to avoid the error is using "cache" to cut off the lineage as shown below. However, sometimes we do not want to cache the data.

      work-around-zip.scala
      val rdd =  ...
      rdd.cache()
      val rdd2 = rdd.map(x => x)
      rdd.zip(rdd2).count()
      

      Attachments

        Issue Links

          Activity

            Issue resolved by pull request 12606
            https://github.com/apache/spark/pull/12606

            shivaram Shivaram Venkataraman added a comment - Issue resolved by pull request 12606 https://github.com/apache/spark/pull/12606
            sunrui Sun Rui added a comment -

            This is fixed as the SparkR unit tests can pass after removing the workaround for this issue.

            sunrui Sun Rui added a comment - This is fixed as the SparkR unit tests can pass after removing the workaround for this issue.
            apachespark Apache Spark added a comment -

            User 'sun-rui' has created a pull request for this issue:
            https://github.com/apache/spark/pull/12606

            apachespark Apache Spark added a comment - User 'sun-rui' has created a pull request for this issue: https://github.com/apache/spark/pull/12606

            sunrui yinxusen Now that https://github.com/apache/spark/pull/10947 has been merged, is this issue resolved ?

            shivaram Shivaram Venkataraman added a comment - sunrui yinxusen Now that https://github.com/apache/spark/pull/10947 has been merged, is this issue resolved ?
            sunrui Sun Rui added a comment - Remember to clean the code at https://github.com/apache/spark/pull/11124/files#diff-51c07c6af7649f6c021e5a5438e31a4fR122 when close this JIRA.
            yinxusen Xusen Yin added a comment -

            Cheers for the good news!

            yinxusen Xusen Yin added a comment - Cheers for the good news!
            sunrui Sun Rui added a comment -

            The root cause is that RRDD.compute() uses some instance variables. If compute() is called con-currently, the variables are shared, which breaks concurrency. Good news is that this happens to be fixed by https://github.com/apache/spark/pull/10947 for SPARK-12972, where each call to compute() will create a new instance of RRunner. There is no shared state among multiple calls to compute().

            sunrui Sun Rui added a comment - The root cause is that RRDD.compute() uses some instance variables. If compute() is called con-currently, the variables are shared, which breaks concurrency. Good news is that this happens to be fixed by https://github.com/apache/spark/pull/10947 for SPARK-12972 , where each call to compute() will create a new instance of RRunner. There is no shared state among multiple calls to compute().
            yinxusen Xusen Yin added a comment -

            Yes, it works, we can use read.json to load a DataFrame that avoids to use RRDD. So maybe I need to mark on the K-means algorithm to avoid use RRDD, for now.

            yinxusen Xusen Yin added a comment - Yes, it works, we can use read.json to load a DataFrame that avoids to use RRDD. So maybe I need to mark on the K-means algorithm to avoid use RRDD, for now.
            yinxusen Xusen Yin added a comment -

            Thanks! I'll try it.

            yinxusen Xusen Yin added a comment - Thanks! I'll try it.
            sunrui Sun Rui added a comment -

            xusen Could you first use a DataFrame created from something like read.json(), within which no RRDD is involved? I will spend some time to investigate why this issue happens with RRDD

            sunrui Sun Rui added a comment - xusen Could you first use a DataFrame created from something like read.json(), within which no RRDD is involved? I will spend some time to investigate why this issue happens with RRDD

            Ah I see - so the problem is that createDataFrame is returning this RRDD which on zipping leads to the problem. Is the problem just about closing the stream twice ? If that is the case we should probably fix that.

            cc sunrui

            shivaram Shivaram Venkataraman added a comment - Ah I see - so the problem is that createDataFrame is returning this RRDD which on zipping leads to the problem. Is the problem just about closing the stream twice ? If that is the case we should probably fix that. cc sunrui
            yinxusen Xusen Yin added a comment -

            We can work around with just adding a cache for the "df". But it is not elegant. Or do you have other suggestions to work around?

            yinxusen Xusen Yin added a comment - We can work around with just adding a cache for the "df". But it is not elegant. Or do you have other suggestions to work around?
            yinxusen Xusen Yin added a comment - - edited

            I don't zip RRDD with itself. Actually, the bug exists when I calling KMeans from R side. I wrote the KMeans for SparkR in this JIRA https://issues.apache.org/jira/browse/SPARK-13011 with a code below:

            r-side.R
            model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitKMeans", algorithm,
                          x@sdf, iter.max, centers, "Sepal_Length Sepal_Width Petal_Length Petal_Width")
            

            In the spark side, I wrote a fitKMeans in org.apache.spark.ml.api.r.SparkRWrappers:

            spark-side.scala
            def fitKMeans(
                  initMode: String,
                  df: DataFrame,
                  maxIter: Double,
                  k: Double,
                  columns: String): KMeansModel = {
                val assembler = new VectorAssembler().setInputCols(columns.split(" ")).setOutputCol("features")
                val features = assembler.transform(df).select("features")
                val kMeans = new KMeans()
                  .setInitMode(initMode)
                  .setMaxIter(maxIter.toInt)
                  .setK(k.toInt)
                val model = kMeans.fit(features)
                model
              }
            

            The calling of KMeans have the code of rdd.zip(rdd.map(...)), and the rdd is derived from RRDD, so I cannot move on without fixing it.

            yinxusen Xusen Yin added a comment - - edited I don't zip RRDD with itself. Actually, the bug exists when I calling KMeans from R side. I wrote the KMeans for SparkR in this JIRA https://issues.apache.org/jira/browse/SPARK-13011 with a code below: r-side.R model <- callJStatic( "org.apache.spark.ml.api.r.SparkRWrappers" , "fitKMeans" , algorithm, x@sdf, iter.max, centers, "Sepal_Length Sepal_Width Petal_Length Petal_Width" ) In the spark side, I wrote a fitKMeans in org.apache.spark.ml.api.r.SparkRWrappers: spark-side.scala def fitKMeans( initMode: String , df: DataFrame, maxIter: Double , k: Double , columns: String ): KMeansModel = { val assembler = new VectorAssembler().setInputCols(columns.split( " " )).setOutputCol( "features" ) val features = assembler.transform(df).select( "features" ) val kMeans = new KMeans() .setInitMode(initMode) .setMaxIter(maxIter.toInt) .setK(k.toInt) val model = kMeans.fit(features) model } The calling of KMeans have the code of rdd.zip(rdd.map(...)), and the rdd is derived from RRDD, so I cannot move on without fixing it.

            Hmm this is tricky to debug – A higher level question: Why do we need to implement this using RRDD and zip on it ? The RRDD class is deprecated and going to go away soon. I thought the KMeans effort would only require wrapping the scala algorithm ?

            shivaram Shivaram Venkataraman added a comment - Hmm this is tricky to debug – A higher level question: Why do we need to implement this using RRDD and zip on it ? The RRDD class is deprecated and going to go away soon. I thought the KMeans effort would only require wrapping the scala algorithm ?
            yinxusen Xusen Yin added a comment -

            Ping mengxr shivaram to know about the concurrency issue. I am on my way to find a solution. It's better to know more from you since I am not the expert on this kind of bug.

            yinxusen Xusen Yin added a comment - Ping mengxr shivaram to know about the concurrency issue. I am on my way to find a solution. It's better to know more from you since I am not the expert on this kind of bug.

            People

              sunrui Sun Rui
              yinxusen Xusen Yin
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: