From 850e8f7b3dd1cbbba931e8a11fcaec79b0afae12 Mon Sep 17 00:00:00 2001 From: "hzfengyu@corp.netease.com" Date: Wed, 30 Dec 2015 12:09:22 +0800 Subject: [PATCH 1/2] getting mr job status with kerberos authentication Signed-off-by: fengyu --- build/conf/kylin.properties | 2 + core-common/pom.xml | 4 + .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../engine/mr/common/HadoopStatusChecker.java | 6 +- .../kylin/engine/mr/common/HadoopStatusGetter.java | 104 ++++++++++++++++++++- .../engine/mr/common/MapReduceExecutable.java | 3 +- pom.xml | 6 ++ 7 files changed, 124 insertions(+), 5 deletions(-) diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index f2170bd..d029b4c 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -82,6 +82,8 @@ kylin.hbase.region.cut.small=5 kylin.hbase.region.cut.medium=10 kylin.hbase.region.cut.large=50 +# whether get job status from resource manager with kerberos authentication +kylin.job.status.with.kerberos=false ## kylin security configurations diff --git a/core-common/pom.xml b/core-common/pom.xml index d02ddd3..403829d 100644 --- a/core-common/pom.xml +++ b/core-common/pom.xml @@ -78,6 +78,10 @@ commons-httpclient + org.apache.httpcomponents + httpclient + + com.google.guava guava diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 5968411..ee11158 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -466,6 +466,10 @@ public class KylinConfigBase implements Serializable { public String getHiveDatabaseForIntermediateTable() { return this.getOptional("kylin.job.hive.database.for.intermediatetable", "default"); } + + public boolean isGetJobStatusWithKerberos() { + return Boolean.valueOf(this.getOptional("kylin.job.status.with.kerberos", "false")); + } public String getKylinOwner() { return this.getOptional("kylin.owner", ""); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java index 1b71b92..ef45ed1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusChecker.java @@ -39,11 +39,13 @@ public class HadoopStatusChecker { private final String yarnUrl; private final String mrJobID; private final StringBuilder output; + private final boolean useKerberosAuth; - public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder output) { + public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder output, boolean useKerberosAuth) { this.yarnUrl = yarnUrl; this.mrJobID = mrJobID; this.output = output; + this.useKerberosAuth = useKerberosAuth; } public JobStepStatusEnum checkStatus() { @@ -53,7 +55,7 @@ public class HadoopStatusChecker { } JobStepStatusEnum status = null; try { - final Pair result = new HadoopStatusGetter(yarnUrl, mrJobID).get(); + final Pair result = new HadoopStatusGetter(yarnUrl, mrJobID).get(useKerberosAuth); logger.debug("State of Hadoop job: " + mrJobID + ":" + result.getLeft() + "-" + result.getRight()); output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + result.getLeft() + " - " + result.getRight() + "\n"); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java index fd7afd3..081070e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopStatusGetter.java @@ -24,10 +24,24 @@ import org.apache.commons.httpclient.HttpMethod; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.protocol.Protocol; import org.apache.commons.httpclient.protocol.ProtocolSocketFactory; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.Credentials; +import org.apache.http.client.config.AuthSchemes; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.config.Lookup; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.impl.auth.SPNegoSchemeFactory; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; @@ -35,6 +49,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.MalformedURLException; +import java.security.Principal; /** */ @@ -50,14 +65,99 @@ public class HadoopStatusGetter { this.mrJobId = mrJobId; } - public Pair get() throws IOException { + public Pair get(boolean useKerberosAuth) throws IOException { String applicationId = mrJobId.replace("job", "application"); String url = yarnUrl.replace("${job_id}", applicationId); - JsonNode root = new ObjectMapper().readTree(getHttpResponse(url)); + String response = useKerberosAuth ? getHttpResponseWithKerberosAuth(url) : getHttpResponse(url); + logger.debug("Hadoop job " + mrJobId + " status : " + response); + JsonNode root = new ObjectMapper().readTree(response); RMAppState state = RMAppState.valueOf(root.findValue("state").getTextValue()); FinalApplicationStatus finalStatus = FinalApplicationStatus.valueOf(root.findValue("finalStatus").getTextValue()); return Pair.of(state, finalStatus); } + + private static String DEFAULT_KRB5_CONFIG_LOCATION = "/etc/krb5.conf"; + private String getHttpResponseWithKerberosAuth(String url) throws IOException { + String krb5ConfigPath = System.getProperty("java.security.krb5.conf"); + if(krb5ConfigPath == null) { + krb5ConfigPath = DEFAULT_KRB5_CONFIG_LOCATION; + } + boolean skipPortAtKerberosDatabaseLookup = true; + System.setProperty("java.security.krb5.conf", krb5ConfigPath); + System.setProperty("sun.security.krb5.debug", "true"); + System.setProperty("javax.security.auth.useSubjectCredsOnly","false"); + Lookup authSchemeRegistry = RegistryBuilder.create() + .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(skipPortAtKerberosDatabaseLookup)) + .build(); + CloseableHttpClient client = HttpClients.custom().setDefaultAuthSchemeRegistry(authSchemeRegistry).build(); + HttpClientContext context = HttpClientContext.create(); + BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + Credentials useJaasCreds = new Credentials() { + public String getPassword() { + return null; + } + public Principal getUserPrincipal() { + return null; + } + }; + + credentialsProvider.setCredentials(new AuthScope(null, -1, null), useJaasCreds); + context.setCredentialsProvider(credentialsProvider); + String response = null; + while(response == null) { + if (url.startsWith("https://")) { + registerEasyHttps(); + } + if (url.contains("anonymous=true") == false) { + url = url.contains("?") ? "&" : "?"; + url = "anonymous=true"; + } + HttpGet httpget = new HttpGet(url); + httpget.addHeader("accept", "application/json"); + try { + CloseableHttpResponse httpResponse = client.execute(httpget,context); + String redirect = null; + org.apache.http.Header h = httpResponse.getFirstHeader("Location"); + if (h != null) { + redirect = h.getValue(); + if (isValidURL(redirect) == false) { + logger.info("Get invalid redirect url, skip it: " + redirect); + Thread.sleep(1000l); + continue; + } + } else { + h = httpResponse.getFirstHeader("Refresh"); + if (h != null) { + String s = h.getValue(); + int cut = s.indexOf("url="); + if (cut >= 0) { + redirect = s.substring(cut + 4); + + if (isValidURL(redirect) == false) { + logger.info("Get invalid redirect url, skip it: " + redirect); + Thread.sleep(1000l); + continue; + } + } + } + } + + if (redirect == null) { + response = IOUtils.toString(httpResponse.getEntity().getContent()); + logger.debug("Job " + mrJobId + " get status check result.\n"); + } else { + url = redirect; + logger.debug("Job " + mrJobId + " check redirect url " + url + ".\n"); + } + } catch (InterruptedException e) { + logger.error(e.getMessage()); + } finally { + httpget.releaseConnection(); + } + } + + return response; + } private String getHttpResponse(String url) throws IOException { HttpClient client = new HttpClient(); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index 1449fa4..585e0e1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -141,7 +141,8 @@ public class MapReduceExecutable extends AbstractExecutable { return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); } String mrJobId = hadoopCmdOutput.getMrJobId(); - HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output); + boolean useKerberosAuth = context.getConfig().isGetJobStatusWithKerberos(); + HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth); JobStepStatusEnum status = JobStepStatusEnum.NEW; while (!isDiscarded()) { JobStepStatusEnum newStatus = statusChecker.checkStatus(); diff --git a/pom.xml b/pom.xml index c3c43f4..ee529de 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,7 @@ 1.3.4 9.2.7.v20150116 3.0.3 + 4.5 3.1.2.RELEASE @@ -462,6 +463,11 @@ curator-recipes ${curator.version} + + org.apache.httpcomponents + httpclient + ${apache-httpclient.version} + -- 1.9.4.msysgit.2