Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24728

org.apache.spark.repl.ExecutorClassLoader with cache

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.1
    • None
    • Spark Core

    Description

      My realtime platform with spark support sql coding and scala coding in jsp page.

      In order to be able to do scala coding. My solution is:

      export SPARK_DIST_CLASSPATH=/data/xx/my-driver-jar-with-dependencies.jar
      
      --conf spark.repl.class.outputDir=/data/xx/myclasss/ 
      val flusher = new java.io.PrintWriter(System.err)
      val interpreter = {
       val interpArguments = List(
       "-Yrepl-class-based",
       "-Yrepl-outdir", "/data/xx/myclasss/"
       )
       val settings = new GenericRunnerSettings(println _)
       settings.embeddedDefaults(this.getClass.getClassLoader)
       settings.usejavacp.value = true
       settings.processArguments(interpArguments, true)
       new IMain(settings, flusher)
      }
      interpreter.setContextClassLoader()
      ExecutorContext.interpreter = interpreter
      trait IApiCode extends Serializable {
       def sql(spark: org.apache.spark.sql.SparkSession, fromTable: String, cacheTable: String): Unit
      }
      object InterpretCodeFactory extends Logging {
      
       val sqlActMap: Cache[String, IApiCode] = CacheBuilder.newBuilder().expireAfterAccess(30, TimeUnit.MINUTES).build()
      
       def interpret(taskId: Integer, updateTime: java.util.Date, code: String): IApiCode = {
       val key = taskId + DateFormatUtils.format(updateTime, "yyyyMMddHHmmss")
       var result = sqlActMap.getIfPresent(key)
       if (result == null) {
       result = interpret(key, code)
       }
       result
       }
      
       def interpret(key: String, code: String): IApiCode = synchronized {
       var result = sqlActMap.getIfPresent(key)
       if (result == null) {
       val genCodeResult = doGenCode(key, code)
       ExecutorContext.interpreter.compileString(genCodeResult)
       result = Class.forName(s"com.duowan.meteor.server.executor.apicode.ApiCode$key", true, ExecutorContext.interpreter.classLoader).newInstance().asInstanceOf[IApiCode]
       sqlActMap.put(key, result)
       }
       result
       }
      
       def doGenCode(key: String, code: String): String = {
       val result = s"""
       |package com.duowan.meteor.server.executor.apicode
       |
       |class ApiCode$key extends com.duowan.meteor.server.executor.IApiCode {
       |
       | override def sql(spark: org.apache.spark.sql.SparkSession, fromTable: String, cacheTable: String): Unit = {
       | $code
       | }
       |
       |}
       """.stripMargin
       logInfo(result)
       result
       }
      }

       And then I can execute scala code from jsp

      val apiCode = InterpretCodeFactory.interpret(taskId, updateTime, codeFromJsp)
      apiCode.sql(ExecutorContext.spark, fromTable, cacheTable)

      In this way, i think i solve the problem SPARK-9219, but executor would always load class from driver disk.And it is not necessary after begin one.

      It would be better if 

      org.apache.spark.repl.ExecutorClassLoader

      could support guava cache in function findClass.And spark could also support config a switch with default close the cache.

      override def findClass(name: String): Class[_] = {
       if (userClassPathFirst) {
       findClassLocally(name).getOrElse(parentLoader.loadClass(name))
       } else {
       try {
       parentLoader.loadClass(name)
       } catch {
       case e: ClassNotFoundException =>
      // here support guava cache.
       val classOption = findClassLocally(name)
       classOption match {
       case None => throw new ClassNotFoundException(name, e)
       case Some(a) => a
       }
       }
       }
      }

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ant_nebula ant_nebula
              Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: