Description
Startup and enable spark.python.profile.
./bin/pyspark --master local --conf spark.python.profile=true
Execute code related to Spark RDD. When pyspark is closed, Pyspark will output profile information.
>>> rdd = sc.parallelize(range(100)).map(str) >>> rdd.count() [Stage 0:> (0 + 1) / 1] 100 >>> ============================================================ Profile of RDD<id=1> ============================================================ 244 function calls (241 primitive calls) in 0.001 seconds Ordered by: internal time, cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 101 0.000 0.000 0.000 0.000 rdd.py:1237(<genexpr>) 101 0.000 0.000 0.000 0.000 util.py:72(wrapper) 1 0.000 0.000 0.000 0.000 serializers.py:255(dump_stream) 1 0.000 0.000 0.000 0.000 serializers.py:213(load_stream) 2 0.000 0.000 0.000 0.000 \{built-in method builtins.sum} 1 0.000 0.000 0.001 0.001 worker.py:607(process) 1 0.000 0.000 0.000 0.000 context.py:549(f) 1 0.000 0.000 0.000 0.000 \{built-in method _pickle.dumps} 1 0.000 0.000 0.000 0.000 serializers.py:561(read_int) 1 0.000 0.000 0.000 0.000 serializers.py:568(write_int) 4/1 0.000 0.000 0.000 0.000 rdd.py:2917(pipeline_func) 1 0.000 0.000 0.000 0.000 serializers.py:426(dumps) 1 0.000 0.000 0.000 0.000 rdd.py:1237(<lambda>) 1 0.000 0.000 0.000 0.000 serializers.py:135(load_stream) 2 0.000 0.000 0.000 0.000 rdd.py:1072(func) 1 0.000 0.000 0.000 0.000 rdd.py:384(func) 1 0.000 0.000 0.000 0.000 util.py:67(fail_on_stopiteration) 1 0.000 0.000 0.000 0.000 serializers.py:151(_read_with_length) 2 0.000 0.000 0.000 0.000 context.py:546(getStart) 3 0.000 0.000 0.000 0.000 rdd.py:416(func) 1 0.000 0.000 0.000 0.000 serializers.py:216(_load_stream_without_unbatching) 2 0.000 0.000 0.000 0.000 \{method 'write' of '_io.BufferedWriter' objects} 1 0.000 0.000 0.000 0.000 \{method 'read' of '_io.BufferedReader' objects} 1 0.000 0.000 0.000 0.000 \{built-in method _operator.add} 1 0.000 0.000 0.000 0.000 \{built-in method builtins.hasattr} 3 0.000 0.000 0.000 0.000 \{built-in method builtins.len} 1 0.000 0.000 0.000 0.000 \{built-in method _struct.unpack} 1 0.000 0.000 0.000 0.000 rdd.py:1226(<lambda>) 1 0.000 0.000 0.000 0.000 \{method 'close' of 'generator' objects} 1 0.000 0.000 0.000 0.000 \{built-in method from_iterable} 1 0.000 0.000 0.000 0.000 \{built-in method _struct.pack} 1 0.000 0.000 0.000 0.000 \{method 'disable' of '_lsprof.Profiler' objects} 1 0.000 0.000 0.000 0.000 \{built-in method builtins.iter}
This is because Spark register show_profiles when Spark exit in profile.py
def add_profiler(self, id, profiler): """Add a profiler for RDD/UDF `id`""" if not self.profilers: if self.profile_dump_path: atexit.register(self.dump_profiles, self.profile_dump_path) else: atexit.register(self.show_profiles) self.profilers.append([id, profiler, False])
For Livy session, Livy does not convert the output to JSON format. And throw below exception:
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('=' (code 61)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false') at [Source: (String)"============================================================"; line: 1, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:635) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1952) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:781) at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:355) at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2023) at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1491) at org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods.parse(JsonMethods.scala:33) at org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods.parse$(JsonMethods.scala:20) at org.livy.toolkit.shaded.org.json4s.jackson.JsonMethods$.parse(JsonMethods.scala:71) at org.apache.livy.repl.PythonInterpreter.$anonfun$sendRequest$1(PythonInterpreter.scala:288) at scala.Option.map(Option.scala:230) at org.apache.livy.repl.PythonInterpreter.sendRequest(PythonInterpreter.scala:287) at org.apache.livy.repl.PythonInterpreter.sendShutdownRequest(PythonInterpreter.scala:277) at org.apache.livy.repl.ProcessInterpreter.close(ProcessInterpreter.scala:62) at org.apache.livy.repl.PythonInterpreter.close(PythonInterpreter.scala:234) at org.apache.livy.repl.Session.$anonfun$close$1(Session.scala:232) at org.apache.livy.repl.Session.$anonfun$close$1$adapted(Session.scala:232) at scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158) at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158) at org.apache.livy.repl.Session.close(Session.scala:232) at org.apache.livy.toolkit.IpynbBootstrap.close(IpynbBootstrap.scala:246) at org.apache.livy.toolkit.IpynbBootstrap$.main(IpynbBootstrap.scala:72) at org.apache.livy.toolkit.IpynbBootstrap.main(IpynbBootstrap.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:764)
Livy sendShutdownRequest in PythonInterpreter.scala
override protected def sendShutdownRequest(): Unit = { sendRequest(Map( "msg_type" -> "shutdown_request", "content" -> () )).foreach { case rep => warn(f"process failed to shut down while returning $rep") } } private def sendRequest(request: Map[String, Any]): Option[JValue] = { stdin.println(write(request)) stdin.flush() Option(stdout.readLine()).map { case line => parse(line) } }
Livy does not convert stdout to json when exit in fake_shell.py
def shutdown_request(_content): sys.exit() msg_type_router = { 'execute_request': execute_request, 'shutdown_request': shutdown_request, } try: handler = msg_type_router[msg_type] except KeyError: LOG.error('unknown message type: %s', msg_type) continue response = handler(content)