I am facing OOM whithin a spark streaming application with GraphX.
While trying to reproduce the issue on a simple application, I was able to identify what appears to be 2 kind of memory leaks.
It can be reproduced with this simple scala application (that simulates more or less what I'm doing in my spark streaming application, each iteration within the loop simulating one micro-batch).
In this simple application, I am just applying a mapVertices transformation on the graph and then I am doing a checkpoint on the graph. I am doing this operation 10 times.
After this application finished the loop, I am taking an heapdump.
In this heapdump, I am able to see 11 "live" GraphImpl instances in memory.
My expectation is to have only 1 (the one referenced in the global variable impactingGraph).
The "leak" is coming from the f function in a MapPartitionsRDD (which is referenced by the partitionsRDD variable of my VertexRDD).
This f function contains an outer reference to the graph created in the previous iteration.
I can see that in the clearDependencies function of MapPartitionsRDD, the f function is not reset to null.
When looking to similar issues, I found this pull request:
In this pull request, the f variable is reset to null in the clearDependencies method of the ZippedPartitionsRDD.
I do not understand why the same is not done within the MapPartitionsRDD.
I made a try by patching spark-core and by setting f to null in clearDependencies of MapPartitionsRDD and it solved my leak on this simple use case.
Don't you think the f variable has to be reset to null also in MapPartitionsRDD ?
Now, I'll do the same but in the propageEvent method in addition to the mapVertices I am doing a joinVertices on the graph.
It can be found in the following application:
When running this application and taking a memory dump, I can still see 11 "live" GraphImpl in memory (where I am expecting only 1) (even with the patch described in the previous section).
When analyzing this dump, I can see that the "leak" is coming from a reference to an array of partitions hold by the "partitions_" variable within the EdgeRDD (this array of partitions contains a reference to the MapPartitionsRDD that contains a reference to the graph created by the previous iteration similarly to what is described in the Leak 1 section)
This array of partitions is referenced 2 times:
- once in the "partitions_" variable of the partitionsRDD emebedded within the EdgeRDD
- once in the "partitions_" variable of the EdgeRDD itself
This is coming from the getPartition method within the EdgeRDD
After the checkpoint and count is called on graph edges, the reference to this array is cleaned within the partitionsRDD of the EdgeRDD.
It is done through this call:
But this is not done for the "partitions_" variable of the EdgeRDD itself.
Indeed, the markCheckpointed() method is not called on the EdgeRDD itself but only on the partitionsRDD embedded within the EdgeRDD.
Due to that, we still have a reference to this array of partitions (that references a MapPartitionsRDD that references the graph of the previous iteration).
I am able to solve this leak if I am calling the checkpoint and count on the edges just after the mapVertices (and before the joinVertices) (and if the patch described in the previous section is applied on MapPartitionsRDD).
But it doesn't seem clean to me.
In my mind:
- either the "partitions_" variable of the EdgeRDD should be reset to null after a checkpoint is called on the Graph
- either the "partitions_" variable of the EdgeRDD should not reference the same array of partitions as the one referenced by the "partitions_" variable of the partitionsRDD. (don't know if this "partitions_" is really usefull on the EdgeRDD)
What do you think?