Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18565

subtractByKey modifes values in the source RDD

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Invalid
    • 2.0.2
    • None
    • Spark Core
    • None
    • Amazon Elastic MapReduce (emr-5.2.0)

    Description

      I'm experiencing a problem with subtractByKey using Spark 2.0.2 with Scala 2.11.x:

      Relevant code:

      object Types {
         type ContentId = Int
         type ContentKey = Tuple2[Int, ContentId]
         type InternalContentId = Int
      }
      
      val inverseItemIDMap: RDD[(InternalContentId, ContentKey)] = itemIDMap.map(_.swap).cache()
      logger.info(s"Built an inverse map of ${inverseItemIDMap.count()} item IDs")
      logger.info(inverseItemIDMap.collect().mkString("I->E ", "\nI->E ", ""))
      
      val superfluousItems: RDD[(InternalContentId, Int)] = .. .cache()
      logger.info(superfluousItems.collect().mkString("SI ", "\nSI ", ""))
      
      val filteredInverseItemIDMap: RDD[(InternalContentId, ContentKey)] = 
        inverseItemIDMap.subtractByKey(superfluousItems).cache() // <<===!!!
      logger.info(s"${filteredInverseItemIDMap.count()} items in the filtered inverse ID mapping")
      logger.info(filteredInverseItemIDMap.collect().mkString("F I->E ", "\nF I->E ", ""))
      

      The operation in question is .subtractByKey. Both RDDs involved are cached and forced via count() prior to calling subtractByKey, so I would expect the result to be unaffected by how exactly superfluousItems is built.

      I added debugging output and filtered the resulting logs by relevant InternalContentId values (829911, 830071). Output:

      Built an inverse map of 827354 item IDs
      .
      .
      I->E (829911,(2,1135081))
      I->E (830071,(1,2295102))
      .
      .
      748190 items in the training set had less than 28 ratings
      SI (829911,3)
      .
      .
      79164 items in the filtered inverse ID mapping
      F I->E (830071,(2,1135081))
      

      There's no element with key 830071 in superfluousItems (SI), so it's not removed from the source RDD. However, its value is for some reason replaced with the one from key 829911. How could this be? I cannot reproduce it locally - only when running on a multi-machine cluster. Is this a bug or I'm missing something?

      Attachments

        Activity

          People

            Unassigned Unassigned
            DmitryDzhus Dmitry Dzhus
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: