Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Resolved
-
2.3.2
-
None
-
None
Description
now i use pyspark to load a large csv file with line number about 1.4 million, each line contains two filed: imageId, kws (image keywords seperate by ',')
when i run the following code, it appears outOfMemory:
df_imageIdsKws = spark.read.format('com.databricks.spark.csv').options(delimiter="\t", header='true').schema(schema=schema).load(imagesKwsFilePath) numClass=1868 def mapRow(row): imageId=row.imageId hotVector = np.zeros((numClass,), dtype=float) for kw in row.kws.split(','): kwIndex=kwsIndexMap_broadcast.value.get(kw) hotVector[int(kwIndex)]=1.0 return (imageId,hotVector.tolist()) df_imageIdsKws=df_imageIdsKws.rdd.persist(storageLevel=StorageLevel.DISK_ONLY) imageIdsKws_rdd_=df_imageIdsKws.map(lambda row:mapRow(row)).persist(storageLevel=StorageLevel.DISK_ONLY)
even i use DISK_ONLY for all rdds, still outOfMemory,
but when i change the numClass=1 for test , all work well.
following error messages from executor log:
java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:431) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:431) at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87) at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75) at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:351) at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336) at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:336) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1381) at org.apache.spark.util.Utils$.copyStream(Utils.scala:357) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:436) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:62) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:223) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:439) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:247) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1992) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170) 2018-11-08 10:53:06 ERROR SparkUncaughtExceptionHandler:91 - Uncaught exception in thread Thread[stdout writer for /data/anaconda3/bin/python3.5,5,main]