Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.3.1
-
None
Description
My realtime platform with spark support sql coding and scala coding in jsp page.
In order to be able to do scala coding. My solution is:
export SPARK_DIST_CLASSPATH=/data/xx/my-driver-jar-with-dependencies.jar --conf spark.repl.class.outputDir=/data/xx/myclasss/
val flusher = new java.io.PrintWriter(System.err) val interpreter = { val interpArguments = List( "-Yrepl-class-based", "-Yrepl-outdir", "/data/xx/myclasss/" ) val settings = new GenericRunnerSettings(println _) settings.embeddedDefaults(this.getClass.getClassLoader) settings.usejavacp.value = true settings.processArguments(interpArguments, true) new IMain(settings, flusher) } interpreter.setContextClassLoader() ExecutorContext.interpreter = interpreter
trait IApiCode extends Serializable { def sql(spark: org.apache.spark.sql.SparkSession, fromTable: String, cacheTable: String): Unit }
object InterpretCodeFactory extends Logging { val sqlActMap: Cache[String, IApiCode] = CacheBuilder.newBuilder().expireAfterAccess(30, TimeUnit.MINUTES).build() def interpret(taskId: Integer, updateTime: java.util.Date, code: String): IApiCode = { val key = taskId + DateFormatUtils.format(updateTime, "yyyyMMddHHmmss") var result = sqlActMap.getIfPresent(key) if (result == null) { result = interpret(key, code) } result } def interpret(key: String, code: String): IApiCode = synchronized { var result = sqlActMap.getIfPresent(key) if (result == null) { val genCodeResult = doGenCode(key, code) ExecutorContext.interpreter.compileString(genCodeResult) result = Class.forName(s"com.duowan.meteor.server.executor.apicode.ApiCode$key", true, ExecutorContext.interpreter.classLoader).newInstance().asInstanceOf[IApiCode] sqlActMap.put(key, result) } result } def doGenCode(key: String, code: String): String = { val result = s""" |package com.duowan.meteor.server.executor.apicode | |class ApiCode$key extends com.duowan.meteor.server.executor.IApiCode { | | override def sql(spark: org.apache.spark.sql.SparkSession, fromTable: String, cacheTable: String): Unit = { | $code | } | |} """.stripMargin logInfo(result) result } }
And then I can execute scala code from jsp
val apiCode = InterpretCodeFactory.interpret(taskId, updateTime, codeFromJsp) apiCode.sql(ExecutorContext.spark, fromTable, cacheTable)
In this way, i think i solve the problem SPARK-9219, but executor would always load class from driver disk.And it is not necessary after begin one.
It would be better if
org.apache.spark.repl.ExecutorClassLoader
could support guava cache in function findClass.And spark could also support config a switch with default close the cache.
override def findClass(name: String): Class[_] = { if (userClassPathFirst) { findClassLocally(name).getOrElse(parentLoader.loadClass(name)) } else { try { parentLoader.loadClass(name) } catch { case e: ClassNotFoundException => // here support guava cache. val classOption = findClassLocally(name) classOption match { case None => throw new ClassNotFoundException(name, e) case Some(a) => a } } } }
Attachments
Issue Links
- Blocked
-
SPARK-9219 ClassCastException in instance of org.apache.spark.rdd.MapPartitionsRDD
- Resolved