diff --git a/bin/livy-server b/bin/livy-server index a2636ba..f9b9bd1 100755 --- a/bin/livy-server +++ b/bin/livy-server @@ -92,7 +92,7 @@ start_livy_server() { if [ -n "$YARN_CONF_DIR" ]; then LIVY_CLASSPATH="$LIVY_CLASSPATH:$YARN_CONF_DIR" fi - + command="$RUNNER $LIVY_SERVER_JAVA_OPTS -cp $LIVY_CLASSPATH:$CLASSPATH com.cloudera.livy.server.LivyServer" if [ $1 = "old" ]; then diff --git a/client-http/src/main/java/com/cloudera/livy/client/http/HttpConf.java b/client-http/src/main/java/com/cloudera/livy/client/http/HttpConf.java index d8f6617..33a3521 100644 --- a/client-http/src/main/java/com/cloudera/livy/client/http/HttpConf.java +++ b/client-http/src/main/java/com/cloudera/livy/client/http/HttpConf.java @@ -25,6 +25,8 @@ import com.cloudera.livy.client.common.ClientConf; class HttpConf extends ClientConf { static enum Entry implements ConfEntry { + CONNECTION_USERNAME("connection.username", "anonymous"), + CONNECTION_PASSWORD("connection.password", "anonymous"), CONNETION_TIMEOUT("connection.timeout", "10s"), CONNECTION_IDLE_TIMEOUT("connection.idle.timeout", "10m"), SOCKET_TIMEOUT("connection.socket.timeout", "5m"), diff --git a/client-http/src/main/java/com/cloudera/livy/client/http/LivyConnection.java b/client-http/src/main/java/com/cloudera/livy/client/http/LivyConnection.java index 12decc0..f3510af 100644 --- a/client-http/src/main/java/com/cloudera/livy/client/http/LivyConnection.java +++ b/client-http/src/main/java/com/cloudera/livy/client/http/LivyConnection.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.net.URI; import java.security.Principal; import java.util.concurrent.TimeUnit; -import static java.nio.charset.StandardCharsets.UTF_8; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpEntity; @@ -31,6 +30,7 @@ import org.apache.http.HttpHeaders; import org.apache.http.HttpStatus; import org.apache.http.auth.AuthScope; import org.apache.http.auth.Credentials; +import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.*; @@ -53,149 +53,158 @@ import static com.cloudera.livy.client.http.HttpConf.Entry.*; */ class LivyConnection { - static final String SESSIONS_URI = "/sessions"; - private static final String APPLICATION_JSON = "application/json"; - - private final URI server; - private final String uriRoot; - private final CloseableHttpClient client; - private final ObjectMapper mapper; - - LivyConnection(URI uri, final HttpConf config) { - HttpClientContext ctx = HttpClientContext.create(); - int port = uri.getPort() > 0 ? uri.getPort() : 8998; - - String path = uri.getPath() != null ? uri.getPath() : ""; - this.uriRoot = path + SESSIONS_URI; - - RequestConfig reqConfig = new RequestConfig() { - @Override - public int getConnectTimeout() { - return (int) config.getTimeAsMs(CONNETION_TIMEOUT); - } - - @Override - public int getSocketTimeout() { - return (int) config.getTimeAsMs(SOCKET_TIMEOUT); - } - - @Override - public boolean isAuthenticationEnabled() { - return true; - } - }; - - Credentials dummyCredentials = new Credentials() { - @Override - public String getPassword() { - return null; - } - - @Override - public Principal getUserPrincipal() { - return null; - } - }; - - // This is needed to get Kerberos credentials from the environment, instead of - // requiring the application to manually obtain the credentials. - System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); - - CredentialsProvider credsProvider = new BasicCredentialsProvider(); - credsProvider.setCredentials(AuthScope.ANY, dummyCredentials); - - HttpClientBuilder builder = HttpClientBuilder.create() - .disableAutomaticRetries() - .evictExpiredConnections() - .evictIdleConnections(config.getTimeAsMs(CONNECTION_IDLE_TIMEOUT), TimeUnit.MILLISECONDS) - .setConnectionManager(new BasicHttpClientConnectionManager()) - .setConnectionReuseStrategy(new NoConnectionReuseStrategy()) - .setDefaultRequestConfig(reqConfig) - .setMaxConnTotal(1) - .setDefaultCredentialsProvider(credsProvider) - .setUserAgent("livy-client-http"); - - this.server = uri; - this.client = builder.build(); - this.mapper = new ObjectMapper(); - } - - synchronized void close() throws IOException { - client.close(); - } - - synchronized V delete(Class retType, String uri, Object... uriParams) throws Exception { - return sendJSONRequest(new HttpDelete(), retType, uri, uriParams); - } - - synchronized V get(Class retType, String uri, Object... uriParams) throws Exception { - return sendJSONRequest(new HttpGet(), retType, uri, uriParams); - } - - synchronized V post( - Object body, - Class retType, - String uri, - Object... uriParams) throws Exception { - HttpPost post = new HttpPost(); - if (body != null) { - byte[] bodyBytes = mapper.writeValueAsBytes(body); - post.setEntity(new ByteArrayEntity(bodyBytes)); + static final String SESSIONS_URI = "/sessions"; + private static final String APPLICATION_JSON = "application/json"; + + private final URI server; + private final String uriRoot; + private final CloseableHttpClient client; + private final ObjectMapper mapper; + + LivyConnection(URI uri, final HttpConf config) { + HttpClientContext ctx = HttpClientContext.create(); + int port = uri.getPort() > 0 ? uri.getPort() : 8998; + + String path = uri.getPath() != null ? uri.getPath() : ""; + this.uriRoot = path + SESSIONS_URI; + + RequestConfig reqConfig = new RequestConfig() { + @Override + public int getConnectTimeout() { + return (int) config.getTimeAsMs(CONNETION_TIMEOUT); + } + + @Override + public int getSocketTimeout() { + return (int) config.getTimeAsMs(SOCKET_TIMEOUT); + } + + @Override + public boolean isAuthenticationEnabled() { + return true; + } + }; + + Credentials dummyCredentials = new Credentials() { + @Override + public String getPassword() { + return null; + } + + @Override + public Principal getUserPrincipal() { + return null; + } + }; + + // This is needed to get Kerberos credentials from the environment, instead of + // requiring the application to manually obtain the credentials. + System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); + + CredentialsProvider credsProvider = new BasicCredentialsProvider(); + if (config.get(CONNECTION_USERNAME) == "anonymous") { + credsProvider.setCredentials(AuthScope.ANY, dummyCredentials); + } + else { + UsernamePasswordCredentials credentials = new UsernamePasswordCredentials( + config.get(CONNECTION_USERNAME), + config.get(CONNECTION_PASSWORD)); + credsProvider.setCredentials(AuthScope.ANY, credentials); + } + + HttpClientBuilder builder = HttpClientBuilder.create() + .disableAutomaticRetries() + .evictExpiredConnections() + .evictIdleConnections(config.getTimeAsMs(CONNECTION_IDLE_TIMEOUT), + TimeUnit.MILLISECONDS) + .setConnectionManager(new BasicHttpClientConnectionManager()) + .setConnectionReuseStrategy(new NoConnectionReuseStrategy()) + .setDefaultRequestConfig(reqConfig) + .setMaxConnTotal(1) + .setDefaultCredentialsProvider(credsProvider) + .setUserAgent("livy-client-http"); + + this.server = uri; + this.client = builder.build(); + this.mapper = new ObjectMapper(); } - return sendJSONRequest(post, retType, uri, uriParams); - } - - synchronized V post( - File f, - Class retType, - String paramName, - String uri, - Object... uriParams) throws Exception { - HttpPost post = new HttpPost(); - MultipartEntityBuilder builder = MultipartEntityBuilder.create(); - builder.addPart(paramName, new FileBody(f)); - post.setEntity(builder.build()); - return sendRequest(post, retType, uri, uriParams); - } - - private V sendJSONRequest( - HttpRequestBase req, - Class retType, - String uri, - Object... uriParams) throws Exception { - req.setHeader(HttpHeaders.ACCEPT, APPLICATION_JSON); - req.setHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON); - req.setHeader(HttpHeaders.CONTENT_ENCODING, "UTF-8"); - return sendRequest(req, retType, uri, uriParams); - } - - private V sendRequest( - HttpRequestBase req, - Class retType, - String uri, - Object... uriParams) throws Exception { - req.setURI(new URI(server.getScheme(), null, server.getHost(), server.getPort(), - uriRoot + String.format(uri, uriParams), null, null)); - // It is no harm to set X-Requested-By when csrf protection is disabled. - if (req instanceof HttpPost || req instanceof HttpDelete || req instanceof HttpPut - || req instanceof HttpPatch) { - req.addHeader("X-Requested-By", "livy"); + + synchronized void close() throws IOException { + client.close(); } - try (CloseableHttpResponse res = client.execute(req)) { - int status = (res.getStatusLine().getStatusCode() / 100) * 100; - HttpEntity entity = res.getEntity(); - if (status == HttpStatus.SC_OK) { - if (!Void.class.equals(retType)) { - return mapper.readValue(entity.getContent(), retType); - } else { - return null; + + synchronized V delete(Class retType, String uri, Object... uriParams) throws Exception { + return sendJSONRequest(new HttpDelete(), retType, uri, uriParams); + } + + synchronized V get(Class retType, String uri, Object... uriParams) throws Exception { + return sendJSONRequest(new HttpGet(), retType, uri, uriParams); + } + + synchronized V post( + Object body, + Class retType, + String uri, + Object... uriParams) throws Exception { + HttpPost post = new HttpPost(); + if (body != null) { + byte[] bodyBytes = mapper.writeValueAsBytes(body); + post.setEntity(new ByteArrayEntity(bodyBytes)); + } + return sendJSONRequest(post, retType, uri, uriParams); + } + + synchronized V post( + File f, + Class retType, + String paramName, + String uri, + Object... uriParams) throws Exception { + HttpPost post = new HttpPost(); + MultipartEntityBuilder builder = MultipartEntityBuilder.create(); + builder.addPart(paramName, new FileBody(f)); + post.setEntity(builder.build()); + return sendRequest(post, retType, uri, uriParams); + } + + private V sendJSONRequest( + HttpRequestBase req, + Class retType, + String uri, + Object... uriParams) throws Exception { + req.setHeader(HttpHeaders.ACCEPT, APPLICATION_JSON); + req.setHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON); + req.setHeader(HttpHeaders.CONTENT_ENCODING, "UTF-8"); + return sendRequest(req, retType, uri, uriParams); + } + + private V sendRequest( + HttpRequestBase req, + Class retType, + String uri, + Object... uriParams) throws Exception { + req.setURI(new URI(server.getScheme(), null, server.getHost(), server.getPort(), + uriRoot + String.format(uri, uriParams), null, null)); + // It is no harm to set X-Requested-By when csrf protection is disabled. + if (req instanceof HttpPost || req instanceof HttpDelete || req instanceof HttpPut + || req instanceof HttpPatch) { + req.addHeader("X-Requested-By", "livy"); + } + try (CloseableHttpResponse res = client.execute(req)) { + int status = (res.getStatusLine().getStatusCode() / 100) * 100; + HttpEntity entity = res.getEntity(); + if (status == HttpStatus.SC_OK) { + if (!Void.class.equals(retType)) { + return mapper.readValue(entity.getContent(), retType); + } else { + return null; + } + } else { + String error = EntityUtils.toString(entity); + throw new IOException(String.format("%s: %s", res.getStatusLine().getReasonPhrase(), + error)); + } } - } else { - String error = EntityUtils.toString(entity); - throw new IOException(String.format("%s: %s", res.getStatusLine().getReasonPhrase(), - error)); - } } - } } diff --git a/client-http/src/test/resources/shiro.ini b/client-http/src/test/resources/shiro.ini new file mode 100644 index 0000000..5c2bb1c --- /dev/null +++ b/client-http/src/test/resources/shiro.ini @@ -0,0 +1,4 @@ +[main] + +[users] +user = password \ No newline at end of file diff --git a/client-http/src/test/scala/com/cloudera/livy/client/http/HttpClientSpec.scala b/client-http/src/test/scala/com/cloudera/livy/client/http/HttpClientSpec.scala index c8403d2..c5116b8 100644 --- a/client-http/src/test/scala/com/cloudera/livy/client/http/HttpClientSpec.scala +++ b/client-http/src/test/scala/com/cloudera/livy/client/http/HttpClientSpec.scala @@ -21,7 +21,7 @@ package com.cloudera.livy.client.http import java.io.{File, InputStream} import java.net.{InetAddress, URI} import java.nio.file.{Files, Paths} -import java.util.concurrent.{Future => JFuture, _} +import java.util.concurrent.{CancellationException, ExecutionException, Future => JFuture, TimeoutException, TimeUnit} import java.util.concurrent.atomic.AtomicLong import javax.servlet.ServletContext import javax.servlet.http.HttpServletRequest @@ -45,10 +45,10 @@ import com.cloudera.livy.test.jobs.Echo import com.cloudera.livy.utils.AppInfo /** - * The test for the HTTP client is written in Scala so we can reuse the code in the livy-server - * module, which implements the client session backend. The client servlet has some functionality - * overridden to avoid creating sub-processes for each seession. - */ + * The test for the HTTP client is written in Scala so we can reuse the code in the livy-server + * module, which implements the client session backend. The client servlet has some functionality + * overridden to avoid creating sub-processes for each seession. + */ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUnitTestSuite { import HttpClientSpec._ @@ -62,8 +62,12 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni override def beforeAll(): Unit = { super.beforeAll() - server = new WebServer(new LivyConf(), "0.0.0.0", 0) - + System.setProperty(LivyConf.AUTH_TYPE.key, "basic") + System.setProperty(LivyConf.BASIC_AUTHENTICATION_REALM.key, + "../client-http/src/test/resources/shiro.ini") + System.setProperty(LivyConf.IMPERSONATION_ENABLED.key, "true") + System.setProperty(LivyConf.SUPERUSERS.key, "user") + server = new WebServer(new LivyConf(true), "0.0.0.0", 0) server.context.setResourceBase("src/main/com/cloudera/livy/server") server.context.setInitParameter(ScalatraListener.LifeCycleKey, classOf[HttpClientTestBootstrap].getCanonicalName) @@ -91,7 +95,10 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni // WebServer does this internally instad of respecting "0.0.0.0", so try to use the same // address. val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}/" - client = new LivyClientBuilder(false).setURI(new URI(uri)).build() + client = new LivyClientBuilder(false). + setURI(new URI(uri)). + setConf(HttpConf.Entry.CONNECTION_USERNAME.key(), "user"). + setConf(HttpConf.Entry.CONNECTION_PASSWORD.key(), "password").build() } withClient("should run and monitor asynchronous jobs") { @@ -100,9 +107,10 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni withClient("should propagate errors from jobs") { val errorMessage = "This job throws an error." - val (jobId, handle) = runJob(false, { id => Seq( + val (jobId, handle) = runJob(false, { id => + Seq( new JobStatus(id, JobHandle.State.FAILED, null, errorMessage)) - }) + }) val error = intercept[ExecutionException] { handle.get(TIMEOUT_S, TimeUnit.SECONDS) @@ -133,10 +141,11 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni } withClient("should cancel jobs") { - val (jobId, handle) = runJob(false, { id => Seq( + val (jobId, handle) = runJob(false, { id => + Seq( new JobStatus(id, JobHandle.State.STARTED, null, null), new JobStatus(id, JobHandle.State.CANCELLED, null, null)) - }) + }) handle.cancel(true) intercept[CancellationException] { @@ -147,10 +156,11 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni } withClient("should notify listeners of job completion") { - val (jobId, handle) = runJob(false, { id => Seq( + val (jobId, handle) = runJob(false, { id => + Seq( new JobStatus(id, JobHandle.State.STARTED, null, null), new JobStatus(id, JobHandle.State.SUCCEEDED, serialize(id), null)) - }) + }) val listener = mock(classOf[JobHandle.Listener[Long]]) handle.asInstanceOf[JobHandle[Long]].addListener(listener) @@ -163,12 +173,13 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni // JobHandleImpl does exponential backoff checking the result of a job. Given an initial // wait of 100ms, 4 iterations should result in a wait of 800ms, so the handle should at that // point timeout a wait of 100ms. - val (jobId, handle) = runJob(false, { id => Seq( + val (jobId, handle) = runJob(false, { id => + Seq( new JobStatus(id, JobHandle.State.STARTED, null, null), new JobStatus(id, JobHandle.State.STARTED, null, null), new JobStatus(id, JobHandle.State.STARTED, null, null), new JobStatus(id, JobHandle.State.SUCCEEDED, serialize(id), null)) - }) + }) intercept[TimeoutException] { handle.get(100, TimeUnit.MILLISECONDS) @@ -185,7 +196,10 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni var sid = client.asInstanceOf[HttpClient].getSessionId() val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" + s"${LivyConnection.SESSIONS_URI}/$sid" - val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).build() + val newClient = new LivyClientBuilder(false). + setURI(new URI(uri)). + setConf(HttpConf.Entry.CONNECTION_USERNAME.key(), "user"). + setConf(HttpConf.Entry.CONNECTION_PASSWORD.key(), "password").build() newClient.stop(false) verify(session, never()).stop() } @@ -230,10 +244,11 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni } private def testJob(sync: Boolean, response: Option[Any] = None): Unit = { - val (jobId, handle) = runJob(sync, { id => Seq( + val (jobId, handle) = runJob(sync, { id => + Seq( new JobStatus(id, JobHandle.State.STARTED, null, null), new JobStatus(id, JobHandle.State.SUCCEEDED, serialize(response.getOrElse(id)), null)) - }) + }) assert(handle.get(TIMEOUT_S, TimeUnit.SECONDS) === response.getOrElse(jobId)) verify(session, times(2)).jobStatus(meq(jobId)) } diff --git a/conf/livy.conf b/conf/livy.conf index c91b392..5cd5ab2 100644 --- a/conf/livy.conf +++ b/conf/livy.conf @@ -11,16 +11,16 @@ # livy.server.port = 8998 # What spark master Livy sessions should use. -# livy.spark.master = local +livy.spark.master = yarn # What spark deploy mode Livy sessions should use. -# livy.spark.deployMode = +livy.spark.deployMode = client # Time in milliseconds on how long Livy will wait before timing out an idle session. # livy.server.session.timeout = 1h # If livy should impersonate the requesting users when creating a new session. -# livy.impersonation.enabled = true +livy.impersonation.enabled = true # Comma-separated list of Livy RSC jars. By default Livy will upload jars from its installation # directory every time a session is started. By caching these files in HDFS, for example, startup @@ -77,3 +77,5 @@ # How often Livy polls YARN to refresh YARN app state. # livy.server.yarn.poll-interval = 1s +livy.server.auth.type = basic +livy.server.auth.basic_authentication.realm = /home/livy/realm.properties diff --git a/pom.xml b/pom.xml index 9b3f3a1..415061e 100644 --- a/pom.xml +++ b/pom.xml @@ -542,6 +542,12 @@ 1.7.0 + + org.apache.shiro + shiro-core + 1.3.2 + + diff --git a/server/pom.xml b/server/pom.xml index b2fc858..7875d07 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -218,6 +218,11 @@ test + + org.apache.shiro + shiro-core + + diff --git a/server/src/main/scala/com/cloudera/livy/LivyConf.scala b/server/src/main/scala/com/cloudera/livy/LivyConf.scala index f57ea50..6228c75 100644 --- a/server/src/main/scala/com/cloudera/livy/LivyConf.scala +++ b/server/src/main/scala/com/cloudera/livy/LivyConf.scala @@ -72,7 +72,8 @@ object LivyConf { LivyConf.Entry("livy.server.launch.kerberos.refresh_interval", "1h") val KINIT_FAIL_THRESHOLD = LivyConf.Entry("livy.server.launch.kerberos.kinit_fail_threshold", 5) - + val BASIC_AUTHENTICATION_REALM = + LivyConf.Entry("livy.server.auth.basic_authentication.realm", null) /** * Recovery mode of Livy. Possible values: * off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions. diff --git a/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala b/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala index 84864c0..fc8d863 100644 --- a/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala +++ b/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala @@ -155,6 +155,9 @@ class LivyServer extends Logging { } startKinitThread(launch_keytab, launch_principal) + case "basic" => + // Nothing to do. + case null => // Nothing to do. diff --git a/server/src/main/scala/com/cloudera/livy/server/ShiroLoginService.scala b/server/src/main/scala/com/cloudera/livy/server/ShiroLoginService.scala new file mode 100644 index 0000000..3d59e8e --- /dev/null +++ b/server/src/main/scala/com/cloudera/livy/server/ShiroLoginService.scala @@ -0,0 +1,81 @@ +/* + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.livy.server + +import javax.security.auth.Subject + +import org.apache.shiro.SecurityUtils +import org.apache.shiro.authc.UsernamePasswordToken +import org.apache.shiro.config.IniSecurityManagerFactory +import org.eclipse.jetty.security.{DefaultIdentityService, DefaultUserIdentity, IdentityService, LoginService} +import org.eclipse.jetty.security.MappedLoginService.UserPrincipal +import org.eclipse.jetty.server.UserIdentity +import org.eclipse.jetty.util.component.AbstractLifeCycle +import scala.util.{Failure, Success, Try} + +class ShiroLoginService(val name: String, shiroConf: String) + extends AbstractLifeCycle with LoginService { + private val _identityService: IdentityService = new DefaultIdentityService + + override def setIdentityService(service: IdentityService): Unit = {} + + override def getName: String = name + + override def logout(user: UserIdentity): Unit = {} + + override def getIdentityService: IdentityService = _identityService + + override def validate(user: UserIdentity): Boolean = { + true + } + + override def login(username: String, credentials: AnyRef): UserIdentity = { + val token = new UsernamePasswordToken(username, credentials.asInstanceOf[String]) + val currentUser = SecurityUtils.getSubject + val userPrincipal = Try { + currentUser.login(token) + } match { + case Success(_) => new UserPrincipal { + override def isAuthenticated: Boolean = true + + override def authenticate(o: scala.Any): Boolean = true + + override def getName: String = username + } + case Failure(_) => null + } + if (userPrincipal == null) { + null + } + else { + new DefaultUserIdentity(new Subject, userPrincipal, Array("livy")) + } + } + + override protected def doStart() { + super.doStart() + val factory = new IniSecurityManagerFactory(s"file:$shiroConf") + val securityManager = factory.getInstance() + SecurityUtils.setSecurityManager(securityManager) + } + + override protected def doStop() { + super.doStop() + } +} diff --git a/server/src/main/scala/com/cloudera/livy/server/WebServer.scala b/server/src/main/scala/com/cloudera/livy/server/WebServer.scala index 9812873..66ec9c4 100644 --- a/server/src/main/scala/com/cloudera/livy/server/WebServer.scala +++ b/server/src/main/scala/com/cloudera/livy/server/WebServer.scala @@ -18,17 +18,19 @@ package com.cloudera.livy.server -import java.net.{InetAddress, InetSocketAddress} +import java.net.InetAddress import javax.servlet.ServletContextListener -import scala.concurrent.ExecutionContext - -import org.eclipse.jetty.server._ +import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler} +import org.eclipse.jetty.security.authentication.BasicAuthenticator +import org.eclipse.jetty.server.{HttpConfiguration, HttpConnectionFactory, NCSARequestLog, NetworkConnector, SecureRequestCustomizer, Server, ServerConnector, SslConnectionFactory} import org.eclipse.jetty.server.handler.{HandlerCollection, RequestLogHandler} import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler} +import org.eclipse.jetty.util.security.Constraint import org.eclipse.jetty.util.ssl.SslContextFactory import com.cloudera.livy.{LivyConf, Logging} +import com.cloudera.livy.LivyConf.{AUTH_TYPE, BASIC_AUTHENTICATION_REALM} object WebServer { val KeystoreKey = "livy.keystore" @@ -72,9 +74,8 @@ class WebServer(livyConf: LivyConf, var host: String, var port: Int) extends Log context.addServlet(classOf[DefaultServlet], "/") val handlers = new HandlerCollection - handlers.addHandler(context) -// configure the access log + // configure the access log val requestLogHandler = new RequestLogHandler val requestLog = new NCSARequestLog("/jetty-yyyy_mm_dd.request.log") requestLog.setAppend(true) @@ -82,6 +83,37 @@ class WebServer(livyConf: LivyConf, var host: String, var port: Int) extends Log requestLog.setLogTimeZone("GMT") handlers.addHandler(requestLogHandler) + if (!(livyConf.get(AUTH_TYPE) == "basic")) { + handlers.addHandler(context) + } + else { + val loginService = new ShiroLoginService("MyRealm", livyConf.get(BASIC_AUTHENTICATION_REALM)) + server.addBean(loginService) + + val security = new ConstraintSecurityHandler() + val constraint = new Constraint() + constraint.setName("auth") + constraint.setAuthenticate(true) + constraint.setRoles(Array("livy")) + + val mapping1 = new ConstraintMapping() + mapping1.setPathSpec("/*") + mapping1.setConstraint(constraint) + + val mapping2 = new ConstraintMapping() + mapping2.setPathSpec("/sessions/*") + mapping2.setConstraint(constraint) + + import collection.convert.decorateAsJava._ + + security.setConstraintMappings(List(mapping1, mapping2).asJava) + security.setAuthenticator(new BasicAuthenticator()) + security.setLoginService(loginService) + + security.setHandler(context) + + handlers.addHandler(security) + } server.setHandler(handlers) def addEventListener(listener: ServletContextListener): Unit = { @@ -98,7 +130,7 @@ class WebServer(livyConf: LivyConf, var host: String, var port: Int) extends Log } port = connector.getLocalPort - info("Starting server on %s://%s:%d" format (protocol, host, port)) + info("Starting server on %s://%s:%d" format(protocol, host, port)) } def join(): Unit = { diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala index 8f10fd5..f4f9b23 100644 --- a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala +++ b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala @@ -27,29 +27,36 @@ import com.cloudera.livy.sessions.SessionManager import com.cloudera.livy.utils.AppInfo case class BatchSessionView( - id: Long, - state: String, - appId: Option[String], - appInfo: AppInfo, - log: Seq[String]) + id: Long, + state: String, + appId: Option[String], + appInfo: AppInfo, + log: Seq[String]) class BatchSessionServlet( - sessionManager: SessionManager[BatchSession], - sessionStore: SessionStore, - livyConf: LivyConf) - extends SessionServlet[BatchSession](sessionManager, livyConf) -{ + sessionManager: SessionManager[BatchSession], + sessionStore: SessionStore, + livyConf: LivyConf) + extends SessionServlet[BatchSession](sessionManager, livyConf) { override protected def createSession(req: HttpServletRequest): BatchSession = { val createRequest = bodyAs[CreateBatchRequest](req) - val proxyUser = checkImpersonation(createRequest.proxyUser, req) + val proxyUser = { + val rUser = remoteUser(req) + if (rUser == null) { + checkImpersonation(createRequest.proxyUser, req) + } + else { + Some(rUser) + } + } BatchSession.create( sessionManager.nextId(), createRequest, livyConf, remoteUser(req), proxyUser, sessionStore) } override protected[batch] def clientSessionView( - session: BatchSession, - req: HttpServletRequest): Any = { + session: BatchSession, + req: HttpServletRequest): Any = { val logs = if (hasAccess(session.owner, req)) { val lines = session.logLines() diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala index 6f58ae7..8b73d4c 100644 --- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala +++ b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSessionServlet.scala @@ -50,7 +50,15 @@ class InteractiveSessionServlet(livyConf: LivyConf) override protected def createSession(req: HttpServletRequest): InteractiveSession = { val createRequest = bodyAs[CreateInteractiveRequest](req) - val proxyUser = checkImpersonation(createRequest.proxyUser, req) + val proxyUser = { + val rUser = remoteUser(req) + if(rUser == null) { + checkImpersonation(createRequest.proxyUser, req) + } + else { + Some(rUser) + } + } new InteractiveSession(sessionManager.nextId(), remoteUser(req), proxyUser, livyConf, createRequest) }