Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Invalid
-
2.0.2
-
None
-
None
-
Amazon Elastic MapReduce (emr-5.2.0)
Description
I'm experiencing a problem with subtractByKey using Spark 2.0.2 with Scala 2.11.x:
Relevant code:
object Types { type ContentId = Int type ContentKey = Tuple2[Int, ContentId] type InternalContentId = Int } val inverseItemIDMap: RDD[(InternalContentId, ContentKey)] = itemIDMap.map(_.swap).cache() logger.info(s"Built an inverse map of ${inverseItemIDMap.count()} item IDs") logger.info(inverseItemIDMap.collect().mkString("I->E ", "\nI->E ", "")) val superfluousItems: RDD[(InternalContentId, Int)] = .. .cache() logger.info(superfluousItems.collect().mkString("SI ", "\nSI ", "")) val filteredInverseItemIDMap: RDD[(InternalContentId, ContentKey)] = inverseItemIDMap.subtractByKey(superfluousItems).cache() // <<===!!! logger.info(s"${filteredInverseItemIDMap.count()} items in the filtered inverse ID mapping") logger.info(filteredInverseItemIDMap.collect().mkString("F I->E ", "\nF I->E ", ""))
The operation in question is .subtractByKey. Both RDDs involved are cached and forced via count() prior to calling subtractByKey, so I would expect the result to be unaffected by how exactly superfluousItems is built.
I added debugging output and filtered the resulting logs by relevant InternalContentId values (829911, 830071). Output:
Built an inverse map of 827354 item IDs . . I->E (829911,(2,1135081)) I->E (830071,(1,2295102)) . . 748190 items in the training set had less than 28 ratings SI (829911,3) . . 79164 items in the filtered inverse ID mapping F I->E (830071,(2,1135081))
There's no element with key 830071 in superfluousItems (SI), so it's not removed from the source RDD. However, its value is for some reason replaced with the one from key 829911. How could this be? I cannot reproduce it locally - only when running on a multi-machine cluster. Is this a bug or I'm missing something?