Details
Description
Hi
I have a little program of spark 1.5, it receive data from a publisher in spark streaming. It will process these received data with spark sql. But when the time goes by I found the memory leak in driver, so i update to spark 1.6.2. But, there is no change in the situation.
here is the code:
val lines = ssc.receiverStream(new RReceiver("10.0.200.15", 6380, "subresult"))
val jsonf = lines.map(JSON.parseFull()).map(.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
val logs = jsonf.map(data => LogStashV1(data("message").toString, data("path").toString, data("host").toString, data("lineno").toString.toDouble, data("timestamp").toString))
logs.foreachRDD( rdd => {
import sqc.implicits._
rdd.toDF.registerTempTable("logstash")
val sqlreport0 = sqc.sql("SELECT message, COUNT(message) AS host_c, SUM(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY message ORDER BY host_c DESC LIMIT 100")
sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, t(2).toString.toDouble)).collect().foreach(println)
sqlreport0.map(t => AlertMsg(t(0).toString, t(1).toString.toInt, t(2).toString.toDouble)).collect().foreach(println)
jmap information:
num #instances #bytes class name
----------------------------------------------
1: 34819 72711952 [B
2: 2297557 66010656 [C
3: 2296294 55111056 java.lang.String
4: 1063491 42539640 org.apache.spark.scheduler.AccumulableInfo
5: 1251001 40032032 scala.collection.immutable.HashMap$HashMap1
6: 1394364 33464736 java.lang.Long
7: 1102516 26460384 scala.collection.immutable.$colon$colon
8: 1058202 25396848 org.apache.spark.sql.execution.metric.LongSQLMetricValue
9: 1266499 20263984 scala.Some
10: 124052 15889104 <methodKlass>
11: 124052 15269568 <constMethodKlass>
12: 11350 12082432 <constantPoolKlass>
13: 11350 11692880 <instanceKlassKlass>
14: 96682 10828384 org.apache.spark.executor.TaskMetrics
15: 233481 9505896 [Lscala.collection.immutable.HashMap;
16: 96682 6961104 org.apache.spark.scheduler.TaskInfo
17: 9589 6433312 <constantPoolCacheKlass>
18: 233000 5592000 scala.collection.immutable.HashMap$HashTrieMap
19: 96200 5387200 org.apache.spark.executor.ShuffleReadMetrics
20: 113381 3628192 scala.collection.mutable.ListBuffer
21: 7252 2891792 <methodDataKlass>
22: 117073 2809752 scala.collection.mutable.DefaultEntry