Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Cannot Reproduce
-
1.6.0
-
None
-
None
Description
Hi,
I found that spark job hangs when kryo.serializers.FieldSerializer is called under multi-executor-cores settings.
Concretely to say, when I try to load facts (e.g., <Obama> <wasBornIn> <America>) from gzipped .nt text file and convert it to RDD[Triple] [0] and evaluate it, spark job hangs under specific conditions.
A reproducible procedure is as follows:
1) Create .nt file
# BIBM deta generator ( https://sourceforge.net/projects/bsbmtools/files/bsbmtools/bsbmtools-0.2/bsbmtools-v0.2.zip/download )
$ ./generate -fc -s nt -fn dataset_10M -pc 28480
2) Compress .nt file
$ spark-shell > import org.apache.hadoop.io.compress.GzipCodec > sc.textFile("dataset_10M.nt").repartition(100).saveAsTextFile("dataset_10M_gzip_100", classOf[GzipCodec])
3) Load the .nt file and evaluate (i.e., RDD.count) it after repartition
Code:
package jp.hang.spark import java.io.{StringReader, ByteArrayInputStream} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.jena.graph.Triple import org.apache.jena.rdf.model.{Model, ModelFactory} object MyTest { private def argParser(args: Array[String]): ArgConfig = { val optionParser = new scopt.OptionParser[ArgConfig]("spark-submit <spark commands>") { head("AAAA") opt[String]('i', "nt-path") required() valueName "<Path>" action { (x, c) => c.copy(ntPath = x) } text "Path to the ntriple" } optionParser.parse(args, ArgConfig()) match { case Some(config) => config case None => sys.exit(-1) // arguments are bad, error message will have been displayed } } private case class ArgConfig(ntPath: String = "") def main(args: Array[String]) { val arguments: ArgConfig = argParser(args) val conf = new SparkConf().setAppName("MyTest") val sc = new SparkContext(conf) val rawTriples: RDD[String] = sc.textFile(arguments.ntPath) convertRawTriple(rawTriples).repartition(5000).count } private def convertRawTriple(rawTriples: RDD[String]): RDD[Triple] = { rawTriples.mapPartitions { case iter=> iter.map { case tripleText => val model: Model = ModelFactory.createDefaultModel val r: StringReader = new StringReader(tripleText) model.read(r, null, "N-TRIPLE") //scalastyle:ignore null val stmt = model.listStatements.next() val triple = Triple.create(stmt.getSubject.asNode, stmt.getPredicate.asNode, stmt.getObject.asNode) r.close model.close triple } } } }
These commands show that when executor-cores is 1 spark could finish the job; but when executore-cores is 5 spark hangs regardless of how the input file is stored (e.g., compressed by gzip, uncompressed "as-is"):
$ spark-submit --executor-cores 1 --num-executors 50 --driver-memory 12G --executor-memory 3G spark_hangtest-assembly-0.1-SNAPSHOT.jar --nt-path dataset_10M.nt # ok $ spark-submit --executor-cores 5 --num-executors 50 --driver-memory 12G --executor-memory 3G spark_hangtest-assembly-0.1-SNAPSHOT.jar --nt-path dataset_10M.nt # hang $ spark-submit --executor-cores 1 --num-executors 50 --driver-memory 12G --executor-memory 3G spark_hangtest-assembly-0.1-SNAPSHOT.jar --nt-path dataset_10M_gzip_100 # ok $ spark-submit --executor-cores 5 --num-executors 50 --driver-memory 12G --executor-memory 3G spark_hangtest-assembly-0.1-SNAPSHOT.jar --nt-path dataset_10M_gzip_100 # hang
When spark hangs, jstack shows that kryo.serializers.FieldSerializer couldn't finish its task:
"Executor task launch worker-9" #164 daemon prio=5 os_prio=0 tid=0x00007fcb2cd5a000 nid=0x3416 in Object.wait() [0x00007fcaf0f48000] java.lang.Thread.State: RUNNABLE at sun.reflect.GeneratedSerializationConstructorAccessor325.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065) at com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)e at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228) at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:171) at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201) at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:152) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1196) at org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply$mcV$sp(DiskStore.scala:81) at org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81) at org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1251) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:82) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:798) - locked <0x00000007b736da08> (a org.apache.spark.storage.BlockInfo) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:645) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Locked ownable synchronizers: - <0x00000007b83e8768> (a java.util.concurrent.ThreadPoolExecutor$Worker)
–
I also found that spark doesn't hang when I use serializer other than kryo.serializers.FieldSerializer.
For example, this serializer is based on STriple [1]:
class TripleSerializer extends KryoSerializer[Triple] { override def write(kryo: Kryo, output: Output, triple: Triple) { val protocol: TProtocol = TRDF.protocol(output); val tterm: RDF_Term = new RDF_Term(); SerializerRDF.write(protocol, tterm, triple.getSubject); SerializerRDF.write(protocol, tterm, triple.getPredicate); SerializerRDF.write(protocol, tterm, triple.getObject); TRDF.flush(protocol); } override def read(kryo: Kryo, input: Input, objClass: Class[Triple]): Triple = { val protocol: TProtocol = TRDF.protocol(input); val tterm: RDF_Term = new RDF_Term(); val s: Node = SerializerRDF.read(protocol, tterm); val p: Node = SerializerRDF.read(protocol, tterm); val o: Node = SerializerRDF.read(protocol, tterm); Triple.create(s, p, o); } }
and SNode [2]:
class NodeSerializer extends KryoSerializer[Node] { override def write(kryo: Kryo, output: Output, obj: Node) { output.writeString(FmtUtils.stringForNode(obj)) } override def read(kryo: Kryo, input: Input, objClass: Class[Node]): Node = { val s = input.readString RiotLib.parse(s) } }
Cheers,
[0] Jena Triple (I used version 3.6.0)
https://jena.apache.org/documentation/javadoc/jena/org/apache/jena/graph/Triple.html
[1] https://github.com/apache/jena/blob/jena-3.6.0/jena-arq/src/main/java/org/apache/jena/riot/system/STriple.java
[2] https://github.com/apache/jena/blob/jena-3.6.0/jena-arq/src/main/java/org/apache/jena/riot/system/SNode.java