Details
-
Improvement
-
Status: Resolved
-
Minor
-
Resolution: Not A Problem
-
1.0.0
-
None
-
All
Description
This issue was noticed while perusing the MapOutputTracker source code. Notice that the synchronization is on the containing "fetching" collection - which makes ALL fetches wait if any fetch were occurring.
The fix is to synchronize instead on the shuffleId (interned as a string to ensure JVM wide visibility).
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized { // This is existing code
// shuffleId.toString.intern.synchronized { // New Code
if (fetching.contains(shuffleId)) {
// Someone else is fetching it; wait for them to be done
while (fetching.contains(shuffleId)) {
try
catch
{ case e: InterruptedException => }}
This is only a small code change, but the testcases to prove (a) proper functionality and (b) proper performance improvement are not so trivial.
For (b) it is not worthwhile to add a testcase to the codebase. Instead I have added a git project that demonstrates the concurrency/performance improvement using the fine-grained approach . The github project is at
https://github.com/javadba/scalatesting.git . Simply run "sbt test". Note: it is unclear how/where to include this ancillary testing/verification information that will not be included in the git PR: i am open for any suggestions - even as far as simply removing references to it.