diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index fda0a4f..e2e85e1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -18,16 +18,24 @@ package org.apache.hadoop.yarn.client.api; +import java.io.File; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.codehaus.jackson.map.ObjectMapper; /** * A client library that can be used to post some information in terms of a @@ -67,4 +75,78 @@ protected TimelineClient(String name) { public abstract TimelinePutResponse putEntities( TimelineEntity... entities) throws IOException, YarnException; + /** + *

+ * Get a delegation token so as to be able to talk to the timeline server in a + * secure way. + *

+ * + * @param renewer + * Address of the renewer who can renew these tokens when needed by + * securely talking to the timeline server + * @return a delegation token ({@link Token}) that can be used to talk to the + * timeline server + * @throws IOException + * @throws YarnException + */ + @Public + public abstract Token getDelegationToken( + String renewer) throws IOException, YarnException; + + /** + * Submit timeline data in a JSON file via command line. + * + * @param argv path to the {@link TimelineEntities} JSON file + */ + public static void main(String[] argv) { + if (argv.length != 1) { + System.err.println("Usage: "); + System.exit(-1); + } + File jsonFile = new File(argv[0]); + if (!jsonFile.exists()) { + System.out.println("Error: File [" + jsonFile.getAbsolutePath() + + "] doesn't exist"); + return; + } + ObjectMapper mapper = new ObjectMapper(); + YarnJacksonJaxbJsonProvider.configObjectMapper(mapper); + TimelineEntities entities = null; + try { + entities = mapper.readValue(jsonFile, TimelineEntities.class); + } catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + e.printStackTrace(System.err); + return; + } + TimelineClient client = TimelineClient.createTimelineClient(); + client.init(new YarnConfiguration()); + client.start(); + try { + if (UserGroupInformation.isSecurityEnabled()) { + Token token = + client.getDelegationToken( + UserGroupInformation.getCurrentUser().getUserName()); + UserGroupInformation.getCurrentUser().addToken(token); + } + TimelinePutResponse response = client.putEntities( + entities.getEntities().toArray( + new TimelineEntity[entities.getEntities().size()])); + if (response.getErrors().size() == 0) { + System.out.println("Timeline data is successfully put"); + } else { + for (TimelinePutResponse.TimelinePutError error : response.getErrors()) { + System.out.println("TimelineEntity [" + error.getEntityType() + ":" + + error.getEntityId() + "] is not successfully put. Error code: " + + error.getErrorCode()); + } + } + } catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + e.printStackTrace(System.err); + } finally { + client.stop(); + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 64cc041..04d2bd9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -19,8 +19,13 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.net.HttpURLConnection; import java.net.URI; +import java.net.URL; import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import javax.ws.rs.core.MediaType; @@ -29,12 +34,20 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticatedURL; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.hadoop.yarn.security.client.TimelineKerberosAuthenticator; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import com.google.common.annotations.VisibleForTesting; @@ -44,6 +57,8 @@ import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; @Private @Unstable @@ -56,12 +71,18 @@ private Client client; private URI resURI; private boolean isEnabled; + private TimelineAuthenticatedURLConnectionFactory urlFactory; public TimelineClientImpl() { super(TimelineClientImpl.class.getName()); ClientConfig cc = new DefaultClientConfig(); cc.getClasses().add(YarnJacksonJaxbJsonProvider.class); - client = Client.create(cc); + if (UserGroupInformation.isSecurityEnabled()) { + urlFactory = new TimelineAuthenticatedURLConnectionFactory(); + client = new Client(new URLConnectionClientHandler(urlFactory), cc); + } else { + client = Client.create(cc); + } } protected void serviceInit(Configuration conf) throws Exception { @@ -124,6 +145,13 @@ public TimelinePutResponse putEntities( return resp.getEntity(TimelinePutResponse.class); } + @Override + public Token getDelegationToken( + String renewer) throws IOException, YarnException { + return TimelineKerberosAuthenticator.getDelegationToken(resURI.toURL(), + urlFactory.token, renewer); + } + @Private @VisibleForTesting public ClientResponse doPostingEntities(TimelineEntities entities) { @@ -133,4 +161,70 @@ public ClientResponse doPostingEntities(TimelineEntities entities) { .post(ClientResponse.class, entities); } + private static class TimelineAuthenticatedURLConnectionFactory + implements HttpURLConnectionFactory { + + private AuthenticatedURL.Token token; + private TimelineKerberosAuthenticator authenticator; + private Token dToken; + + public TimelineAuthenticatedURLConnectionFactory() { + token = new AuthenticatedURL.Token(); + authenticator = new TimelineKerberosAuthenticator(); + } + + @Override + public HttpURLConnection getHttpURLConnection(URL url) throws IOException { + try { + if (dToken == null) { + //TODO: need to take care of the renew case + dToken = selectToken(); + if (LOG.isDebugEnabled()) { + LOG.debug("Timeline delegation token: " + dToken.toString()); + } + } + if (dToken != null) { + Map params = new HashMap(); + TimelineKerberosAuthenticator.injectDelegationToken(params, dToken); + url = TimelineKerberosAuthenticator.appendParams(url, params); + if (LOG.isDebugEnabled()) { + LOG.debug("URL with delegation token: " + url); + } + } + return new AuthenticatedURL(authenticator).openConnection(url, token); + } catch (AuthenticationException e) { + LOG.error("Authentication failed when openning connection [" + url + + "] with token [" + token + "].", e); + throw new IOException(e); + } + } + + @SuppressWarnings("unchecked") + private Token selectToken() { + UserGroupInformation ugi; + try { + ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + String msg = "Error when getting the current user"; + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); + } + Collection> tokens = ugi.getTokens(); + for (Token token : tokens) { + TokenIdentifier identifier = null; + try { + identifier = token.decodeIdentifier(); + } catch (IOException e) { + String msg = "Error when getting the token identifier"; + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); + } + if (identifier instanceof TimelineDelegationTokenIdentifier) { + return (Token) token; + } + } + return null; + } + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 8a0348b..700f79a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.client.api.impl; import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; @@ -29,8 +31,14 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse; @@ -64,6 +72,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; @@ -74,6 +83,7 @@ import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.AHSClient; +import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -82,6 +92,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; @@ -97,8 +108,11 @@ protected long submitPollIntervalMillis; private long asyncApiPollIntervalMillis; private long asyncApiPollTimeoutMillis; - protected AHSClient historyClient; + private AHSClient historyClient; private boolean historyServiceEnabled; + private TimelineClient timelineClient; + private Text timelineService; + private boolean timelineServiceEnabled; private static final String ROOT = "root"; @@ -126,10 +140,30 @@ protected void serviceInit(Configuration conf) throws Exception { if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { historyServiceEnabled = true; - historyClient = AHSClientImpl.createAHSClient(); - historyClient.init(getConfig()); + historyClient = AHSClient.createAHSClient(); + historyClient.init(conf); } + if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) { + timelineServiceEnabled = true; + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(conf); + + InetSocketAddress timelineServiceAddr = null; + if (YarnConfiguration.useHttps(getConfig())) { + timelineServiceAddr = getConfig().getSocketAddr( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_PORT); + } else { + timelineServiceAddr = getConfig().getSocketAddr( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_PORT); + } + timelineService = SecurityUtil.buildTokenService(timelineServiceAddr); + } super.serviceInit(conf); } @@ -141,6 +175,9 @@ protected void serviceStart() throws Exception { if (historyServiceEnabled) { historyClient.start(); } + if (timelineServiceEnabled) { + timelineClient.start(); + } } catch (IOException e) { throw new YarnRuntimeException(e); } @@ -155,6 +192,9 @@ protected void serviceStop() throws Exception { if (historyServiceEnabled) { historyClient.stop(); } + if (timelineServiceEnabled) { + timelineClient.stop(); + } super.serviceStop(); } @@ -189,6 +229,12 @@ public YarnClientApplication createApplication() Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); + // Automatically add the timeline DT into the CLC + // Only when the security and the timeline service are both enabled + if (UserGroupInformation.isSecurityEnabled() && timelineServiceEnabled) { + addTimelineDelegationToken(appContext.getAMContainerSpec()); + } + //TODO: YARN-1763:Handle RM failovers during the submitApplication call. rmClient.submitApplication(request); @@ -238,6 +284,42 @@ public YarnClientApplication createApplication() return applicationId; } + private void addTimelineDelegationToken( + ContainerLaunchContext clc) throws YarnException, IOException { + org.apache.hadoop.security.token.Token timelineDelegationToken = + timelineClient.getDelegationToken( + UserGroupInformation.getCurrentUser().getUserName()); + if (timelineDelegationToken == null) { + return; + } + Credentials credentials = new Credentials(); + DataInputByteBuffer dibb = new DataInputByteBuffer(); + ByteBuffer tokens = clc.getTokens(); + if (tokens != null) { + dibb.reset(tokens); + credentials.readTokenStorageStream(dibb); + tokens.rewind(); + } + // If the timeline delegation token is already in the CLC, no need to add + // one more + for (org.apache.hadoop.security.token.Token token : credentials + .getAllTokens()) { + TokenIdentifier tokenIdentifier = token.decodeIdentifier(); + if (tokenIdentifier instanceof TimelineDelegationTokenIdentifier) { + return; + } + } + credentials.addToken(timelineService, timelineDelegationToken); + if (LOG.isDebugEnabled()) { + LOG.debug("Add timline delegation token into credentials: " + + timelineDelegationToken); + } + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + clc.setTokens(tokens); + } + @Override public void killApplication(ApplicationId applicationId) throws YarnException, IOException {