From e42a41e98e1b30037d258b42aca403854d1ac4fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AF=E5=AE=87?= Date: Mon, 7 Sep 2015 20:32:23 +0800 Subject: [PATCH 1/8] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BD=BF=E7=94=A8kerbero?= =?UTF-8?q?s=E8=AE=A4=E8=AF=81=E7=9A=84=E6=96=B9=E5=BC=8F=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E4=BD=9C=E4=B8=9A=E6=89=A7=E8=A1=8C=E7=8A=B6=E6=80=81?= =?UTF-8?q?=EF=BC=8C=E5=8F=AF=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: hzfengyu@corp.netease.com --- .../java/org/apache/kylin/common/KylinConfig.java | 6 ++ conf/kylin.properties | 3 + job/pom.xml | 4 + .../kylin/job/common/MapReduceExecutable.java | 2 +- .../kylin/job/tools/HadoopStatusChecker.java | 8 +- .../apache/kylin/job/tools/HadoopStatusGetter.java | 90 +++++++++++++++++++++- pom.xml | 6 ++ 7 files changed, 114 insertions(+), 5 deletions(-) diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java index 0e4cc07..764986b 100644 --- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -74,6 +74,8 @@ public class KylinConfig { public static final String KYLIN_JOB_REMOTE_CLI_WORKING_DIR = "kylin.job.remote.cli.working.dir"; public static final String KYLIN_JOB_CMD_EXTRA_ARGS = "kylin.job.cmd.extra.args"; + + public static final String KYLIN_GET_JOB_STATUS_WITH_KERBEROS = "kylin.job.status.with.kerberos"; /** * Toggle to indicate whether to use hive for table flattening. Default * true. @@ -372,6 +374,10 @@ public class KylinConfig { public String getMapReduceCmdExtraArgs() { return getOptional(KYLIN_JOB_CMD_EXTRA_ARGS); } + + public boolean getKylinUseKerberosAuth() { + return Boolean.valueOf(getOptional(KYLIN_GET_JOB_STATUS_WITH_KERBEROS, "false")); + } public String getOverrideHiveTableLocation(String table) { return getOptional(HIVE_TABLE_LOCATION_PREFIX + table.toUpperCase()); diff --git a/conf/kylin.properties b/conf/kylin.properties index cee23b2..d42c277 100644 --- a/conf/kylin.properties +++ b/conf/kylin.properties @@ -115,3 +115,6 @@ ext.log.base.dir = /tmp/kylin_log1,/tmp/kylin_log2 #will create external hive table to query result csv file #will set to kylin_query_log by default if not config here query.log.parse.result.table = kylin_query_log + +#if you should getting job status from RM with kerberos, set it true.. +kylin.job.status.with.kerberos=false \ No newline at end of file diff --git a/job/pom.xml b/job/pom.xml index e1e6d86..f7cecf2 100644 --- a/job/pom.xml +++ b/job/pom.xml @@ -85,6 +85,10 @@ commons-httpclient + org.apache.httpcomponents + httpclient + + com.google.guava guava diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java index f763794..a20110e 100644 --- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java +++ b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java @@ -131,7 +131,7 @@ public class MapReduceExecutable extends AbstractExecutable { return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); } String mrJobId = hadoopCmdOutput.getMrJobId(); - HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output); + HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, context.getConfig()); JobStepStatusEnum status = JobStepStatusEnum.NEW; while (!isDiscarded()) { JobStepStatusEnum newStatus = statusChecker.checkStatus(); diff --git a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java index 3c2c283..c7e8ff4 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java +++ b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusChecker.java @@ -24,6 +24,7 @@ import java.util.Date; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.constant.JobStepStatusEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,11 +40,13 @@ public class HadoopStatusChecker { private final String yarnUrl; private final String mrJobID; private final StringBuilder output; + private final KylinConfig config; - public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder output) { + public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder output, KylinConfig config) { this.yarnUrl = yarnUrl; this.mrJobID = mrJobID; this.output = output; + this.config = config; } public JobStepStatusEnum checkStatus() { @@ -53,7 +56,8 @@ public class HadoopStatusChecker { } JobStepStatusEnum status = null; try { - final Pair result = new HadoopStatusGetter(yarnUrl, mrJobID).get(); + boolean useKerberosAuth = config.getKylinUseKerberosAuth(); + final Pair result = new HadoopStatusGetter(yarnUrl, mrJobID).get(useKerberosAuth); logger.debug("State of Hadoop job: " + mrJobID + ":" + result.getLeft() + "-" + result.getRight()); output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + result.getLeft() + " - " + result.getRight() + "\n"); diff --git a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java index 721a2b7..c9d11a1 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java +++ b/job/src/main/java/org/apache/kylin/job/tools/HadoopStatusGetter.java @@ -19,6 +19,7 @@ package org.apache.kylin.job.tools; import java.io.IOException; +import java.security.Principal; import org.apache.commons.httpclient.Header; import org.apache.commons.httpclient.HttpClient; @@ -26,9 +27,23 @@ import org.apache.commons.httpclient.HttpMethod; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.protocol.Protocol; import org.apache.commons.httpclient.protocol.ProtocolSocketFactory; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.http.auth.AuthSchemeProvider; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.Credentials; +import org.apache.http.client.config.AuthSchemes; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.config.Lookup; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.impl.auth.SPNegoSchemeFactory; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; import org.codehaus.jackson.JsonNode; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; @@ -49,10 +64,11 @@ public class HadoopStatusGetter { this.mrJobId = mrJobId; } - public Pair get() throws IOException { + public Pair get(boolean useKerberos) throws IOException { String applicationId = mrJobId.replace("job", "application"); String url = yarnUrl.replace("${job_id}", applicationId); - JsonNode root = new ObjectMapper().readTree(getHttpResponse(url)); + String response = useKerberos ? getHttpResponseWithKerberosAuth(url) : getHttpResponse(url); + JsonNode root = new ObjectMapper().readTree(response); RMAppState state = RMAppState.valueOf(root.findValue("state").getTextValue()); FinalApplicationStatus finalStatus = FinalApplicationStatus.valueOf(root.findValue("finalStatus").getTextValue()); return Pair.of(state, finalStatus); @@ -101,6 +117,76 @@ public class HadoopStatusGetter { return response; } + + private static String DEFAULT_KRB5_CONFIG_LOCATION = "/etc/krb5.conf"; + private String getHttpResponseWithKerberosAuth(String url) throws IOException { + String krb5ConfigPath = System.getProperty("java.security.krb5.conf"); + if (krb5ConfigPath == null) { + krb5ConfigPath = DEFAULT_KRB5_CONFIG_LOCATION; + } + log.debug("krb5 config file is " + krb5ConfigPath); + + boolean skipPortAtKerberosDatabaseLookup = true; + System.setProperty("java.security.krb5.conf", krb5ConfigPath); + System.setProperty("sun.security.krb5.debug", "true"); + System.setProperty("javax.security.auth.useSubjectCredsOnly","false"); + Lookup authSchemeRegistry = RegistryBuilder.create() + .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(skipPortAtKerberosDatabaseLookup)) + .build(); + + CloseableHttpClient client = HttpClients.custom().setDefaultAuthSchemeRegistry(authSchemeRegistry).build(); + HttpClientContext context = HttpClientContext.create(); + BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + // This may seem odd, but specifying 'null' as principal tells java to use the logged in user's credentials + Credentials useJaasCreds = new Credentials() { + public String getPassword() { + return null; + } + public Principal getUserPrincipal() { + return null; + } + }; + credentialsProvider.setCredentials( new AuthScope(null, -1, null), useJaasCreds ); + context.setCredentialsProvider(credentialsProvider); + String responseString = null; + int count = 0; + int MAX_RETRY_TIME = 3; + while(responseString == null && count ++ < MAX_RETRY_TIME) { + if (url.startsWith("https://")) { + registerEasyHttps(); + } + if (url.contains("anonymous=true") == false) { + url += url.contains("?") ? "&" : "?"; + url += "anonymous=true"; + } + HttpGet httpget = new HttpGet(url); + try { + httpget.addHeader("accept", "application/json"); + CloseableHttpResponse response = client.execute(httpget,context); + String redirect = null; + org.apache.http.Header h = response.getFirstHeader("Refresh"); + if (h != null) { + String s = h.getValue(); + int cut = s.indexOf("url="); + if (cut >= 0) { + redirect = s.substring(cut + 4); + } + } + + if (redirect == null) { + responseString = IOUtils.toString(response.getEntity().getContent()); + log.debug("Job " + mrJobId + " get status check result.\n"); + } else { + url = redirect; + log.debug("Job " + mrJobId + " check redirect url " + url + ".\n"); + } + } finally { + httpget.releaseConnection(); + } + } + + return responseString; + } private static Protocol EASY_HTTPS = null; diff --git a/pom.xml b/pom.xml index ecc10b9..3cbfe64 100644 --- a/pom.xml +++ b/pom.xml @@ -67,6 +67,7 @@ 1.9 1.0.15 3.1 + 4.5 1.2.17 @@ -335,6 +336,11 @@ ${commons-httpclient.version} + org.apache.httpcomponents + httpclient + ${httpclient.version} + + com.google.guava guava ${guava.version} -- 1.9.4.msysgit.2