diff --git a/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala b/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala index 5c721d8..b7e614c 100644 --- a/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala +++ b/repl/src/main/scala/org/apache/livy/repl/SparkRInterpreter.scala @@ -76,12 +76,24 @@ object SparkRInterpreter { val backendInstance = sparkRBackendClass.getDeclaredConstructor().newInstance() var sparkRBackendPort = 0 + var sparkRBackendSecret = "" val initialized = new Semaphore(0) // Launch a SparkR backend server for the R process to connect to val backendThread = new Thread("SparkR backend") { override def run(): Unit = { - sparkRBackendPort = sparkRBackendClass.getMethod("init").invoke(backendInstance) - .asInstanceOf[Int] + var initMethod = sparkRBackendClass.getMethod("init") + if (initMethod.getReturnType() == classOf[Int]) { + sparkRBackendPort = initMethod.invoke(backendInstance) + .asInstanceOf[Int] + } + else { + val authHelperClass = mirror.classLoader.loadClass("org.apache.spark.api.r.RAuthHelper") + var (port, authHelper) = initMethod.invoke(backendInstance) + .asInstanceOf[Tuple2[Int, Any]] + sparkRBackendPort = port + sparkRBackendSecret = authHelperClass.getDeclaredField("secret").get(authHelper) + .asInstanceOf[String] + } initialized.release() sparkRBackendClass.getMethod("run").invoke(backendInstance) @@ -119,6 +131,7 @@ object SparkRInterpreter { env.put("SPARKR_PACKAGE_DIR", packageDir) env.put("R_PROFILE_USER", Seq(packageDir, "SparkR", "profile", "general.R").mkString(File.separator)) + env.put("SPARK_BACKEND_AUTH_SECRET", sparkRBackendSecret) builder.redirectErrorStream(true) val process = builder.start() diff --git a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java index b87e054..782e8bf 100644 --- a/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java +++ b/rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java @@ -100,7 +100,7 @@ class ContextLauncher { // In some scenarios the user may need to configure this endpoint setting explicitly. String address = conf.get(LAUNCHER_ADDRESS); // If not specified, use the RPC server address; otherwise use the specified address. - if (address == null) { + if (address == null || address.trim().isEmpty()) { address = factory.getServer().getAddress(); } conf.set(LAUNCHER_ADDRESS, address);