diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index 56b9981..b21ea52 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -57,14 +57,6 @@
log4j
log4j
-
- org.mortbay.jetty
- jetty-util
-
-
- com.sun.jersey
- jersey-client
-
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
deleted file mode 100644
index de1d3e2..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.client.api;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
-
-/**
- * A client library that can be used to post some information in terms of a
- * number of conceptual entities.
- */
-@Public
-@Unstable
-public abstract class TimelineClient extends AbstractService {
-
- @Public
- public static TimelineClient createTimelineClient() {
- TimelineClient client = new TimelineClientImpl();
- return client;
- }
-
- @Private
- protected TimelineClient(String name) {
- super(name);
- }
-
- /**
- *
- * Send the information of a number of conceptual entities to the timeline
- * server. It is a blocking API. The method will not return until it gets the
- * response from the timeline server.
- *
- *
- * @param entities
- * the collection of {@link TimelineEntity}
- * @return the error information if the sent entities are not correctly stored
- * @throws IOException
- * @throws YarnException
- */
- @Public
- public abstract TimelinePutResponse putEntities(
- TimelineEntity... entities) throws IOException, YarnException;
-
- /**
- *
- * Get a delegation token so as to be able to talk to the timeline server in a
- * secure way.
- *
- *
- * @param renewer
- * Address of the renewer who can renew these tokens when needed by
- * securely talking to the timeline server
- * @return a delegation token ({@link Token}) that can be used to talk to the
- * timeline server
- * @throws IOException
- * @throws YarnException
- */
- @Public
- public abstract Token getDelegationToken(
- String renewer) throws IOException, YarnException;
-
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java
deleted file mode 100644
index 25333c7..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.client.api.impl;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.URLEncoder;
-import java.text.MessageFormat;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.client.Authenticator;
-import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineDelegationTokenResponse;
-import org.apache.hadoop.yarn.security.client.TimelineAuthenticationConsts;
-import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenOperation;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * A KerberosAuthenticator subclass that fallback to
- * {@link TimelineAuthenticationConsts}.
- */
-@Private
-@Unstable
-public class TimelineAuthenticator extends KerberosAuthenticator {
-
- private static ObjectMapper mapper;
-
- static {
- mapper = new ObjectMapper();
- YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
- }
-
- /**
- * Returns the fallback authenticator if the server does not use Kerberos
- * SPNEGO HTTP authentication.
- *
- * @return a {@link TimelineAuthenticationConsts} instance.
- */
- @Override
- protected Authenticator getFallBackAuthenticator() {
- return new TimelineAuthenticator();
- }
-
- public static void injectDelegationToken(Map params,
- Token> dtToken)
- throws IOException {
- if (dtToken != null) {
- params.put(TimelineAuthenticationConsts.DELEGATION_PARAM,
- dtToken.encodeToUrlString());
- }
- }
-
- @Private
- @VisibleForTesting
- boolean hasDelegationToken(URL url) {
- if (url.getQuery() == null) {
- return false;
- } else {
- return url.getQuery().contains(
- TimelineAuthenticationConsts.DELEGATION_PARAM + "=");
- }
- }
-
- @Override
- public void authenticate(URL url, AuthenticatedURL.Token token)
- throws IOException, AuthenticationException {
- if (!hasDelegationToken(url)) {
- super.authenticate(url, token);
- }
- }
-
- public static Token getDelegationToken(
- URL url, AuthenticatedURL.Token token, String renewer) throws IOException {
- TimelineDelegationTokenOperation op =
- TimelineDelegationTokenOperation.GETDELEGATIONTOKEN;
- Map params = new HashMap();
- params.put(TimelineAuthenticationConsts.OP_PARAM, op.toString());
- params.put(TimelineAuthenticationConsts.RENEWER_PARAM, renewer);
- url = appendParams(url, params);
- AuthenticatedURL aUrl =
- new AuthenticatedURL(new TimelineAuthenticator());
- try {
- HttpURLConnection conn = aUrl.openConnection(url, token);
- conn.setRequestMethod(op.getHttpMethod());
- TimelineDelegationTokenResponse dtRes = validateAndParseResponse(conn);
- if (!dtRes.getType().equals(
- TimelineAuthenticationConsts.DELEGATION_TOKEN_URL)) {
- throw new IOException("The response content is not expected: "
- + dtRes.getContent());
- }
- String tokenStr = dtRes.getContent().toString();
- Token dToken =
- new Token();
- dToken.decodeFromUrlString(tokenStr);
- return dToken;
- } catch (AuthenticationException ex) {
- throw new IOException(ex.toString(), ex);
- }
- }
-
- public static long renewDelegationToken(URL url,
- AuthenticatedURL.Token token,
- Token dToken) throws IOException {
- Map params = new HashMap();
- params.put(TimelineAuthenticationConsts.OP_PARAM,
- TimelineDelegationTokenOperation.RENEWDELEGATIONTOKEN.toString());
- params.put(TimelineAuthenticationConsts.TOKEN_PARAM,
- dToken.encodeToUrlString());
- url = appendParams(url, params);
- AuthenticatedURL aUrl =
- new AuthenticatedURL(new TimelineAuthenticator());
- try {
- HttpURLConnection conn = aUrl.openConnection(url, token);
- conn.setRequestMethod(
- TimelineDelegationTokenOperation.RENEWDELEGATIONTOKEN.getHttpMethod());
- TimelineDelegationTokenResponse dtRes = validateAndParseResponse(conn);
- if (!dtRes.getType().equals(
- TimelineAuthenticationConsts.DELEGATION_TOKEN_EXPIRATION_TIME)) {
- throw new IOException("The response content is not expected: "
- + dtRes.getContent());
- }
- return Long.valueOf(dtRes.getContent().toString());
- } catch (AuthenticationException ex) {
- throw new IOException(ex.toString(), ex);
- }
- }
-
- public static void cancelDelegationToken(URL url,
- AuthenticatedURL.Token token,
- Token dToken) throws IOException {
- Map params = new HashMap();
- params.put(TimelineAuthenticationConsts.OP_PARAM,
- TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN.toString());
- params.put(TimelineAuthenticationConsts.TOKEN_PARAM,
- dToken.encodeToUrlString());
- url = appendParams(url, params);
- AuthenticatedURL aUrl =
- new AuthenticatedURL(new TimelineAuthenticator());
- try {
- HttpURLConnection conn = aUrl.openConnection(url, token);
- conn.setRequestMethod(TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN
- .getHttpMethod());
- validateAndParseResponse(conn);
- } catch (AuthenticationException ex) {
- throw new IOException(ex.toString(), ex);
- }
- }
-
- /**
- * Convenience method that appends parameters an HTTP URL.
- *
- * @param url
- * the url.
- * @param params
- * the query string parameters.
- *
- * @return a URL
- *
- * @throws IOException
- * thrown if an IO error occurs.
- */
- public static URL appendParams(URL url, Map params)
- throws IOException {
- StringBuilder sb = new StringBuilder();
- sb.append(url);
- String separator = url.toString().contains("?") ? "&" : "?";
- for (Map.Entry entry : params.entrySet()) {
- sb.append(separator).append(entry.getKey()).append("=").
- append(URLEncoder.encode(entry.getValue(), "UTF8"));
- separator = "&";
- }
- return new URL(sb.toString());
- }
-
- /**
- * Validates the response of an HttpURLConnection. If the current
- * status code is not 200, it will throw an exception with a detail message
- * using Server side error messages if available. Otherwise,
- * {@link TimelineDelegationTokenResponse} will be parsed and returned.
- *
- * @param conn
- * the HttpURLConnection.
- * @return
- * @throws IOException
- * thrown if the current status code is not 200 or the JSON response
- * cannot be parsed correctly
- */
- private static TimelineDelegationTokenResponse validateAndParseResponse(
- HttpURLConnection conn) throws IOException {
- int status = conn.getResponseCode();
- JsonNode json = mapper.readTree(conn.getInputStream());
- if (status == HttpURLConnection.HTTP_OK) {
- return mapper.readValue(json, TimelineDelegationTokenResponse.class);
- } else {
- // If the status code is not 200, some thing wrong should happen at the
- // server side, the JSON content is going to contain exception details.
- // We can use the JSON content to reconstruct the exception object.
- try {
- String message =
- json.get(TimelineAuthenticationConsts.ERROR_MESSAGE_JSON)
- .getTextValue();
- String exception =
- json.get(TimelineAuthenticationConsts.ERROR_EXCEPTION_JSON)
- .getTextValue();
- String className =
- json.get(TimelineAuthenticationConsts.ERROR_CLASSNAME_JSON)
- .getTextValue();
-
- try {
- ClassLoader cl = TimelineAuthenticator.class.getClassLoader();
- Class> klass = cl.loadClass(className);
- Constructor> constr = klass.getConstructor(String.class);
- throw (IOException) constr.newInstance(message);
- } catch (IOException ex) {
- throw ex;
- } catch (Exception ex) {
- throw new IOException(MessageFormat.format("{0} - {1}", exception,
- message));
- }
- } catch (IOException ex) {
- if (ex.getCause() instanceof IOException) {
- throw (IOException) ex.getCause();
- }
- throw new IOException(
- MessageFormat.format("HTTP status [{0}], {1}",
- status, conn.getResponseMessage()));
- }
- }
- }
-
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
deleted file mode 100644
index daf25ea..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.client.api.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URI;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.ws.rs.core.MediaType;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenSelector;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-
-@Private
-@Unstable
-public class TimelineClientImpl extends TimelineClient {
-
- private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
- private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
- private static final String URL_PARAM_USER_NAME = "user.name";
- private static final Joiner JOINER = Joiner.on("");
- private static Options opts;
- static {
- opts = new Options();
- opts.addOption("put", true, "Put the TimelineEntities in a JSON file");
- opts.getOption("put").setArgName("Path to the JSON file");
- opts.addOption("help", false, "Print usage");
- }
-
- private Client client;
- private URI resURI;
- private boolean isEnabled;
- private KerberosAuthenticatedURLConnectionFactory urlFactory;
-
- public TimelineClientImpl() {
- super(TimelineClientImpl.class.getName());
- ClientConfig cc = new DefaultClientConfig();
- cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
- if (UserGroupInformation.isSecurityEnabled()) {
- urlFactory = new KerberosAuthenticatedURLConnectionFactory();
- client = new Client(new URLConnectionClientHandler(urlFactory), cc);
- } else {
- client = new Client(new URLConnectionClientHandler(
- new PseudoAuthenticatedURLConnectionFactory()), cc);
- }
- }
-
- protected void serviceInit(Configuration conf) throws Exception {
- isEnabled = conf.getBoolean(
- YarnConfiguration.TIMELINE_SERVICE_ENABLED,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
- if (!isEnabled) {
- LOG.info("Timeline service is not enabled");
- } else {
- if (YarnConfiguration.useHttps(conf)) {
- resURI = URI
- .create(JOINER.join("https://", conf.get(
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS),
- RESOURCE_URI_STR));
- } else {
- resURI = URI.create(JOINER.join("http://", conf.get(
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
- YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
- RESOURCE_URI_STR));
- }
- if (UserGroupInformation.isSecurityEnabled()) {
- urlFactory.setService(TimelineUtils.buildTimelineTokenService(conf));
- }
- LOG.info("Timeline service address: " + resURI);
- }
- super.serviceInit(conf);
- }
-
- @Override
- public TimelinePutResponse putEntities(
- TimelineEntity... entities) throws IOException, YarnException {
- if (!isEnabled) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Nothing will be put because timeline service is not enabled");
- }
- return new TimelinePutResponse();
- }
- TimelineEntities entitiesContainer = new TimelineEntities();
- entitiesContainer.addEntities(Arrays.asList(entities));
- ClientResponse resp;
- try {
- resp = doPostingEntities(entitiesContainer);
- } catch (RuntimeException re) {
- // runtime exception is expected if the client cannot connect the server
- String msg =
- "Failed to get the response from the timeline server.";
- LOG.error(msg, re);
- throw re;
- }
- if (resp == null ||
- resp.getClientResponseStatus() != ClientResponse.Status.OK) {
- String msg =
- "Failed to get the response from the timeline server.";
- LOG.error(msg);
- if (LOG.isDebugEnabled() && resp != null) {
- String output = resp.getEntity(String.class);
- LOG.debug("HTTP error code: " + resp.getStatus()
- + " Server response : \n" + output);
- }
- throw new YarnException(msg);
- }
- return resp.getEntity(TimelinePutResponse.class);
- }
-
- @Override
- public Token getDelegationToken(
- String renewer) throws IOException, YarnException {
- return TimelineAuthenticator.getDelegationToken(resURI.toURL(),
- urlFactory.token, renewer);
- }
-
- @Private
- @VisibleForTesting
- public ClientResponse doPostingEntities(TimelineEntities entities) {
- WebResource webResource = client.resource(resURI);
- return webResource.accept(MediaType.APPLICATION_JSON)
- .type(MediaType.APPLICATION_JSON)
- .post(ClientResponse.class, entities);
- }
-
- private static class PseudoAuthenticatedURLConnectionFactory
- implements HttpURLConnectionFactory {
-
- @Override
- public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
- Map params = new HashMap();
- params.put(URL_PARAM_USER_NAME,
- UserGroupInformation.getCurrentUser().getShortUserName());
- url = TimelineAuthenticator.appendParams(url, params);
- if (LOG.isDebugEnabled()) {
- LOG.debug("URL with delegation token: " + url);
- }
- return (HttpURLConnection) url.openConnection();
- }
-
- }
- private static class KerberosAuthenticatedURLConnectionFactory
- implements HttpURLConnectionFactory {
-
- private AuthenticatedURL.Token token;
- private TimelineAuthenticator authenticator;
- private Token dToken;
- private Text service;
-
- public KerberosAuthenticatedURLConnectionFactory() {
- token = new AuthenticatedURL.Token();
- authenticator = new TimelineAuthenticator();
- }
-
- @Override
- public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
- try {
- if (dToken == null) {
- //TODO: need to take care of the renew case
- dToken = selectToken();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Timeline delegation token: " + dToken.toString());
- }
- }
- if (dToken != null) {
- Map params = new HashMap();
- TimelineAuthenticator.injectDelegationToken(params, dToken);
- url = TimelineAuthenticator.appendParams(url, params);
- if (LOG.isDebugEnabled()) {
- LOG.debug("URL with delegation token: " + url);
- }
- }
- return new AuthenticatedURL(authenticator).openConnection(url, token);
- } catch (AuthenticationException e) {
- LOG.error("Authentication failed when openning connection [" + url
- + "] with token [" + token + "].", e);
- throw new IOException(e);
- }
- }
-
- private Token selectToken() {
- UserGroupInformation ugi;
- try {
- ugi = UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- String msg = "Error when getting the current user";
- LOG.error(msg, e);
- throw new YarnRuntimeException(msg, e);
- }
- TimelineDelegationTokenSelector tokenSelector =
- new TimelineDelegationTokenSelector();
- return tokenSelector.selectToken(
- service, ugi.getCredentials().getAllTokens());
- }
-
- public void setService(Text service) {
- this.service = service;
- }
-
- }
-
- public static void main(String[] argv) throws Exception {
- CommandLine cliParser = new GnuParser().parse(opts, argv);
- if (cliParser.hasOption("put")) {
- String path = cliParser.getOptionValue("put");
- if (path != null && path.length() > 0) {
- putTimelineEntitiesInJSONFile(path);
- return;
- }
- }
- printUsage();
- }
-
- /**
- * Put timeline data in a JSON file via command line.
- *
- * @param path
- * path to the {@link TimelineEntities} JSON file
- */
- private static void putTimelineEntitiesInJSONFile(String path) {
- File jsonFile = new File(path);
- if (!jsonFile.exists()) {
- System.out.println("Error: File [" + jsonFile.getAbsolutePath()
- + "] doesn't exist");
- return;
- }
- ObjectMapper mapper = new ObjectMapper();
- YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
- TimelineEntities entities = null;
- try {
- entities = mapper.readValue(jsonFile, TimelineEntities.class);
- } catch (Exception e) {
- System.err.println("Error: " + e.getMessage());
- e.printStackTrace(System.err);
- return;
- }
- Configuration conf = new YarnConfiguration();
- TimelineClient client = TimelineClient.createTimelineClient();
- client.init(conf);
- client.start();
- try {
- if (UserGroupInformation.isSecurityEnabled()
- && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false)) {
- Token token =
- client.getDelegationToken(
- UserGroupInformation.getCurrentUser().getUserName());
- UserGroupInformation.getCurrentUser().addToken(token);
- }
- TimelinePutResponse response = client.putEntities(
- entities.getEntities().toArray(
- new TimelineEntity[entities.getEntities().size()]));
- if (response.getErrors().size() == 0) {
- System.out.println("Timeline data is successfully put");
- } else {
- for (TimelinePutResponse.TimelinePutError error : response.getErrors()) {
- System.out.println("TimelineEntity [" + error.getEntityType() + ":" +
- error.getEntityId() + "] is not successfully put. Error code: " +
- error.getErrorCode());
- }
- }
- } catch (Exception e) {
- System.err.println("Error: " + e.getMessage());
- e.printStackTrace(System.err);
- } finally {
- client.stop();
- }
- }
-
- /**
- * Helper function to print out usage
- */
- private static void printUsage() {
- new HelpFormatter().printHelp("TimelineClient", opts);
- }
-
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java
deleted file mode 100644
index 19aaa88..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.client.api.impl;
-
-import java.net.URL;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestTimelineAuthenticator {
-
- @Test
- public void testHasDelegationTokens() throws Exception {
- TimelineAuthenticator authenticator = new TimelineAuthenticator();
- Assert.assertFalse(authenticator.hasDelegationToken(new URL(
- "http://localhost:8/resource")));
- Assert.assertFalse(authenticator.hasDelegationToken(new URL(
- "http://localhost:8/resource?other=xxxx")));
- Assert.assertTrue(authenticator.hasDelegationToken(new URL(
- "http://localhost:8/resource?delegation=yyyy")));
- Assert.assertTrue(authenticator.hasDelegationToken(new URL(
- "http://localhost:8/resource?other=xxxx&delegation=yyyy")));
- }
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
deleted file mode 100644
index 3c5272a..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.client.api.impl;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import java.net.ConnectException;
-
-import org.junit.Assert;
-
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.ClientResponse;
-
-public class TestTimelineClient {
-
- private TimelineClientImpl client;
-
- @Before
- public void setup() {
- YarnConfiguration conf = new YarnConfiguration();
- conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
- client = createTimelineClient(conf);
- }
-
- @After
- public void tearDown() {
- if (client != null) {
- client.stop();
- }
- }
-
- @Test
- public void testPostEntities() throws Exception {
- mockClientResponse(client, ClientResponse.Status.OK, false, false);
- try {
- TimelinePutResponse response = client.putEntities(generateEntity());
- Assert.assertEquals(0, response.getErrors().size());
- } catch (YarnException e) {
- Assert.fail("Exception is not expected");
- }
- }
-
- @Test
- public void testPostEntitiesWithError() throws Exception {
- mockClientResponse(client, ClientResponse.Status.OK, true, false);
- try {
- TimelinePutResponse response = client.putEntities(generateEntity());
- Assert.assertEquals(1, response.getErrors().size());
- Assert.assertEquals("test entity id", response.getErrors().get(0)
- .getEntityId());
- Assert.assertEquals("test entity type", response.getErrors().get(0)
- .getEntityType());
- Assert.assertEquals(TimelinePutResponse.TimelinePutError.IO_EXCEPTION,
- response.getErrors().get(0).getErrorCode());
- } catch (YarnException e) {
- Assert.fail("Exception is not expected");
- }
- }
-
- @Test
- public void testPostEntitiesNoResponse() throws Exception {
- mockClientResponse(
- client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
- try {
- client.putEntities(generateEntity());
- Assert.fail("Exception is expected");
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage().contains(
- "Failed to get the response from the timeline server."));
- }
- }
-
- @Test
- public void testPostEntitiesConnectionRefused() throws Exception {
- mockClientResponse(client, null, false, true);
- try {
- client.putEntities(generateEntity());
- Assert.fail("RuntimeException is expected");
- } catch (RuntimeException re) {
- Assert.assertTrue(re instanceof ClientHandlerException);
- }
- }
-
- @Test
- public void testPostEntitiesTimelineServiceNotEnabled() throws Exception {
- YarnConfiguration conf = new YarnConfiguration();
- conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
- TimelineClientImpl client = createTimelineClient(conf);
- mockClientResponse(
- client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
- try {
- TimelinePutResponse response = client.putEntities(generateEntity());
- Assert.assertEquals(0, response.getErrors().size());
- } catch (YarnException e) {
- Assert.fail(
- "putEntities should already return before throwing the exception");
- }
- }
-
- @Test
- public void testPostEntitiesTimelineServiceDefaultNotEnabled()
- throws Exception {
- YarnConfiguration conf = new YarnConfiguration();
- // Unset the timeline service's enabled properties.
- // Make sure default value is pickup up
- conf.unset(YarnConfiguration.TIMELINE_SERVICE_ENABLED);
- TimelineClientImpl client = createTimelineClient(conf);
- mockClientResponse(client, ClientResponse.Status.INTERNAL_SERVER_ERROR,
- false, false);
- try {
- TimelinePutResponse response = client.putEntities(generateEntity());
- Assert.assertEquals(0, response.getErrors().size());
- } catch (YarnException e) {
- Assert
- .fail("putEntities should already return before throwing the exception");
- }
- }
-
- private static ClientResponse mockClientResponse(TimelineClientImpl client,
- ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) {
- ClientResponse response = mock(ClientResponse.class);
- if (hasRuntimeError) {
- doThrow(new ClientHandlerException(new ConnectException())).when(client)
- .doPostingEntities(any(TimelineEntities.class));
- return response;
- }
- doReturn(response).when(client)
- .doPostingEntities(any(TimelineEntities.class));
- when(response.getClientResponseStatus()).thenReturn(status);
- TimelinePutResponse.TimelinePutError error =
- new TimelinePutResponse.TimelinePutError();
- error.setEntityId("test entity id");
- error.setEntityType("test entity type");
- error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION);
- TimelinePutResponse putResponse = new TimelinePutResponse();
- if (hasError) {
- putResponse.addError(error);
- }
- when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse);
- return response;
- }
-
- private static TimelineEntity generateEntity() {
- TimelineEntity entity = new TimelineEntity();
- entity.setEntityId("entity id");
- entity.setEntityType("entity type");
- entity.setStartTime(System.currentTimeMillis());
- for (int i = 0; i < 2; ++i) {
- TimelineEvent event = new TimelineEvent();
- event.setTimestamp(System.currentTimeMillis());
- event.setEventType("test event type " + i);
- event.addEventInfo("key1", "val1");
- event.addEventInfo("key2", "val2");
- entity.addEvent(event);
- }
- entity.addRelatedEntity("test ref type 1", "test ref id 1");
- entity.addRelatedEntity("test ref type 2", "test ref id 2");
- entity.addPrimaryFilter("pkey1", "pval1");
- entity.addPrimaryFilter("pkey2", "pval2");
- entity.addOtherInfo("okey1", "oval1");
- entity.addOtherInfo("okey2", "oval2");
- return entity;
- }
-
- private static TimelineClientImpl createTimelineClient(
- YarnConfiguration conf) {
- TimelineClientImpl client =
- spy((TimelineClientImpl) TimelineClient.createTimelineClient());
- client.init(conf);
- client.start();
- return client;
- }
-
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
index aa12b3f..04cf50d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
@@ -67,10 +67,18 @@
commons-codec
+ org.mortbay.jetty
+ jetty-util
+
+
com.sun.jersey
jersey-core
+ com.sun.jersey
+ jersey-client
+
+
org.codehaus.jackson
jackson-core-asl
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
new file mode 100644
index 0000000..de1d3e2
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+
+/**
+ * A client library that can be used to post some information in terms of a
+ * number of conceptual entities.
+ */
+@Public
+@Unstable
+public abstract class TimelineClient extends AbstractService {
+
+ @Public
+ public static TimelineClient createTimelineClient() {
+ TimelineClient client = new TimelineClientImpl();
+ return client;
+ }
+
+ @Private
+ protected TimelineClient(String name) {
+ super(name);
+ }
+
+ /**
+ *
+ * Send the information of a number of conceptual entities to the timeline
+ * server. It is a blocking API. The method will not return until it gets the
+ * response from the timeline server.
+ *
+ *
+ * @param entities
+ * the collection of {@link TimelineEntity}
+ * @return the error information if the sent entities are not correctly stored
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ public abstract TimelinePutResponse putEntities(
+ TimelineEntity... entities) throws IOException, YarnException;
+
+ /**
+ *
+ * Get a delegation token so as to be able to talk to the timeline server in a
+ * secure way.
+ *
+ *
+ * @param renewer
+ * Address of the renewer who can renew these tokens when needed by
+ * securely talking to the timeline server
+ * @return a delegation token ({@link Token}) that can be used to talk to the
+ * timeline server
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ public abstract Token getDelegationToken(
+ String renewer) throws IOException, YarnException;
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java
new file mode 100644
index 0000000..25333c7
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.client.Authenticator;
+import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDelegationTokenResponse;
+import org.apache.hadoop.yarn.security.client.TimelineAuthenticationConsts;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenOperation;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A KerberosAuthenticator subclass that fallback to
+ * {@link TimelineAuthenticationConsts}.
+ */
+@Private
+@Unstable
+public class TimelineAuthenticator extends KerberosAuthenticator {
+
+ private static ObjectMapper mapper;
+
+ static {
+ mapper = new ObjectMapper();
+ YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
+ }
+
+ /**
+ * Returns the fallback authenticator if the server does not use Kerberos
+ * SPNEGO HTTP authentication.
+ *
+ * @return a {@link TimelineAuthenticationConsts} instance.
+ */
+ @Override
+ protected Authenticator getFallBackAuthenticator() {
+ return new TimelineAuthenticator();
+ }
+
+ public static void injectDelegationToken(Map params,
+ Token> dtToken)
+ throws IOException {
+ if (dtToken != null) {
+ params.put(TimelineAuthenticationConsts.DELEGATION_PARAM,
+ dtToken.encodeToUrlString());
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ boolean hasDelegationToken(URL url) {
+ if (url.getQuery() == null) {
+ return false;
+ } else {
+ return url.getQuery().contains(
+ TimelineAuthenticationConsts.DELEGATION_PARAM + "=");
+ }
+ }
+
+ @Override
+ public void authenticate(URL url, AuthenticatedURL.Token token)
+ throws IOException, AuthenticationException {
+ if (!hasDelegationToken(url)) {
+ super.authenticate(url, token);
+ }
+ }
+
+ public static Token getDelegationToken(
+ URL url, AuthenticatedURL.Token token, String renewer) throws IOException {
+ TimelineDelegationTokenOperation op =
+ TimelineDelegationTokenOperation.GETDELEGATIONTOKEN;
+ Map params = new HashMap();
+ params.put(TimelineAuthenticationConsts.OP_PARAM, op.toString());
+ params.put(TimelineAuthenticationConsts.RENEWER_PARAM, renewer);
+ url = appendParams(url, params);
+ AuthenticatedURL aUrl =
+ new AuthenticatedURL(new TimelineAuthenticator());
+ try {
+ HttpURLConnection conn = aUrl.openConnection(url, token);
+ conn.setRequestMethod(op.getHttpMethod());
+ TimelineDelegationTokenResponse dtRes = validateAndParseResponse(conn);
+ if (!dtRes.getType().equals(
+ TimelineAuthenticationConsts.DELEGATION_TOKEN_URL)) {
+ throw new IOException("The response content is not expected: "
+ + dtRes.getContent());
+ }
+ String tokenStr = dtRes.getContent().toString();
+ Token dToken =
+ new Token();
+ dToken.decodeFromUrlString(tokenStr);
+ return dToken;
+ } catch (AuthenticationException ex) {
+ throw new IOException(ex.toString(), ex);
+ }
+ }
+
+ public static long renewDelegationToken(URL url,
+ AuthenticatedURL.Token token,
+ Token dToken) throws IOException {
+ Map params = new HashMap();
+ params.put(TimelineAuthenticationConsts.OP_PARAM,
+ TimelineDelegationTokenOperation.RENEWDELEGATIONTOKEN.toString());
+ params.put(TimelineAuthenticationConsts.TOKEN_PARAM,
+ dToken.encodeToUrlString());
+ url = appendParams(url, params);
+ AuthenticatedURL aUrl =
+ new AuthenticatedURL(new TimelineAuthenticator());
+ try {
+ HttpURLConnection conn = aUrl.openConnection(url, token);
+ conn.setRequestMethod(
+ TimelineDelegationTokenOperation.RENEWDELEGATIONTOKEN.getHttpMethod());
+ TimelineDelegationTokenResponse dtRes = validateAndParseResponse(conn);
+ if (!dtRes.getType().equals(
+ TimelineAuthenticationConsts.DELEGATION_TOKEN_EXPIRATION_TIME)) {
+ throw new IOException("The response content is not expected: "
+ + dtRes.getContent());
+ }
+ return Long.valueOf(dtRes.getContent().toString());
+ } catch (AuthenticationException ex) {
+ throw new IOException(ex.toString(), ex);
+ }
+ }
+
+ public static void cancelDelegationToken(URL url,
+ AuthenticatedURL.Token token,
+ Token dToken) throws IOException {
+ Map params = new HashMap();
+ params.put(TimelineAuthenticationConsts.OP_PARAM,
+ TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN.toString());
+ params.put(TimelineAuthenticationConsts.TOKEN_PARAM,
+ dToken.encodeToUrlString());
+ url = appendParams(url, params);
+ AuthenticatedURL aUrl =
+ new AuthenticatedURL(new TimelineAuthenticator());
+ try {
+ HttpURLConnection conn = aUrl.openConnection(url, token);
+ conn.setRequestMethod(TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN
+ .getHttpMethod());
+ validateAndParseResponse(conn);
+ } catch (AuthenticationException ex) {
+ throw new IOException(ex.toString(), ex);
+ }
+ }
+
+ /**
+ * Convenience method that appends parameters an HTTP URL.
+ *
+ * @param url
+ * the url.
+ * @param params
+ * the query string parameters.
+ *
+ * @return a URL
+ *
+ * @throws IOException
+ * thrown if an IO error occurs.
+ */
+ public static URL appendParams(URL url, Map params)
+ throws IOException {
+ StringBuilder sb = new StringBuilder();
+ sb.append(url);
+ String separator = url.toString().contains("?") ? "&" : "?";
+ for (Map.Entry entry : params.entrySet()) {
+ sb.append(separator).append(entry.getKey()).append("=").
+ append(URLEncoder.encode(entry.getValue(), "UTF8"));
+ separator = "&";
+ }
+ return new URL(sb.toString());
+ }
+
+ /**
+ * Validates the response of an HttpURLConnection. If the current
+ * status code is not 200, it will throw an exception with a detail message
+ * using Server side error messages if available. Otherwise,
+ * {@link TimelineDelegationTokenResponse} will be parsed and returned.
+ *
+ * @param conn
+ * the HttpURLConnection.
+ * @return
+ * @throws IOException
+ * thrown if the current status code is not 200 or the JSON response
+ * cannot be parsed correctly
+ */
+ private static TimelineDelegationTokenResponse validateAndParseResponse(
+ HttpURLConnection conn) throws IOException {
+ int status = conn.getResponseCode();
+ JsonNode json = mapper.readTree(conn.getInputStream());
+ if (status == HttpURLConnection.HTTP_OK) {
+ return mapper.readValue(json, TimelineDelegationTokenResponse.class);
+ } else {
+ // If the status code is not 200, some thing wrong should happen at the
+ // server side, the JSON content is going to contain exception details.
+ // We can use the JSON content to reconstruct the exception object.
+ try {
+ String message =
+ json.get(TimelineAuthenticationConsts.ERROR_MESSAGE_JSON)
+ .getTextValue();
+ String exception =
+ json.get(TimelineAuthenticationConsts.ERROR_EXCEPTION_JSON)
+ .getTextValue();
+ String className =
+ json.get(TimelineAuthenticationConsts.ERROR_CLASSNAME_JSON)
+ .getTextValue();
+
+ try {
+ ClassLoader cl = TimelineAuthenticator.class.getClassLoader();
+ Class> klass = cl.loadClass(className);
+ Constructor> constr = klass.getConstructor(String.class);
+ throw (IOException) constr.newInstance(message);
+ } catch (IOException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new IOException(MessageFormat.format("{0} - {1}", exception,
+ message));
+ }
+ } catch (IOException ex) {
+ if (ex.getCause() instanceof IOException) {
+ throw (IOException) ex.getCause();
+ }
+ throw new IOException(
+ MessageFormat.format("HTTP status [{0}], {1}",
+ status, conn.getResponseMessage()));
+ }
+ }
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
new file mode 100644
index 0000000..daf25ea
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenSelector;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+
+@Private
+@Unstable
+public class TimelineClientImpl extends TimelineClient {
+
+ private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
+ private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
+ private static final String URL_PARAM_USER_NAME = "user.name";
+ private static final Joiner JOINER = Joiner.on("");
+ private static Options opts;
+ static {
+ opts = new Options();
+ opts.addOption("put", true, "Put the TimelineEntities in a JSON file");
+ opts.getOption("put").setArgName("Path to the JSON file");
+ opts.addOption("help", false, "Print usage");
+ }
+
+ private Client client;
+ private URI resURI;
+ private boolean isEnabled;
+ private KerberosAuthenticatedURLConnectionFactory urlFactory;
+
+ public TimelineClientImpl() {
+ super(TimelineClientImpl.class.getName());
+ ClientConfig cc = new DefaultClientConfig();
+ cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ urlFactory = new KerberosAuthenticatedURLConnectionFactory();
+ client = new Client(new URLConnectionClientHandler(urlFactory), cc);
+ } else {
+ client = new Client(new URLConnectionClientHandler(
+ new PseudoAuthenticatedURLConnectionFactory()), cc);
+ }
+ }
+
+ protected void serviceInit(Configuration conf) throws Exception {
+ isEnabled = conf.getBoolean(
+ YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
+ if (!isEnabled) {
+ LOG.info("Timeline service is not enabled");
+ } else {
+ if (YarnConfiguration.useHttps(conf)) {
+ resURI = URI
+ .create(JOINER.join("https://", conf.get(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS),
+ RESOURCE_URI_STR));
+ } else {
+ resURI = URI.create(JOINER.join("http://", conf.get(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
+ RESOURCE_URI_STR));
+ }
+ if (UserGroupInformation.isSecurityEnabled()) {
+ urlFactory.setService(TimelineUtils.buildTimelineTokenService(conf));
+ }
+ LOG.info("Timeline service address: " + resURI);
+ }
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public TimelinePutResponse putEntities(
+ TimelineEntity... entities) throws IOException, YarnException {
+ if (!isEnabled) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Nothing will be put because timeline service is not enabled");
+ }
+ return new TimelinePutResponse();
+ }
+ TimelineEntities entitiesContainer = new TimelineEntities();
+ entitiesContainer.addEntities(Arrays.asList(entities));
+ ClientResponse resp;
+ try {
+ resp = doPostingEntities(entitiesContainer);
+ } catch (RuntimeException re) {
+ // runtime exception is expected if the client cannot connect the server
+ String msg =
+ "Failed to get the response from the timeline server.";
+ LOG.error(msg, re);
+ throw re;
+ }
+ if (resp == null ||
+ resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+ String msg =
+ "Failed to get the response from the timeline server.";
+ LOG.error(msg);
+ if (LOG.isDebugEnabled() && resp != null) {
+ String output = resp.getEntity(String.class);
+ LOG.debug("HTTP error code: " + resp.getStatus()
+ + " Server response : \n" + output);
+ }
+ throw new YarnException(msg);
+ }
+ return resp.getEntity(TimelinePutResponse.class);
+ }
+
+ @Override
+ public Token getDelegationToken(
+ String renewer) throws IOException, YarnException {
+ return TimelineAuthenticator.getDelegationToken(resURI.toURL(),
+ urlFactory.token, renewer);
+ }
+
+ @Private
+ @VisibleForTesting
+ public ClientResponse doPostingEntities(TimelineEntities entities) {
+ WebResource webResource = client.resource(resURI);
+ return webResource.accept(MediaType.APPLICATION_JSON)
+ .type(MediaType.APPLICATION_JSON)
+ .post(ClientResponse.class, entities);
+ }
+
+ private static class PseudoAuthenticatedURLConnectionFactory
+ implements HttpURLConnectionFactory {
+
+ @Override
+ public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+ Map params = new HashMap();
+ params.put(URL_PARAM_USER_NAME,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ url = TimelineAuthenticator.appendParams(url, params);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("URL with delegation token: " + url);
+ }
+ return (HttpURLConnection) url.openConnection();
+ }
+
+ }
+ private static class KerberosAuthenticatedURLConnectionFactory
+ implements HttpURLConnectionFactory {
+
+ private AuthenticatedURL.Token token;
+ private TimelineAuthenticator authenticator;
+ private Token dToken;
+ private Text service;
+
+ public KerberosAuthenticatedURLConnectionFactory() {
+ token = new AuthenticatedURL.Token();
+ authenticator = new TimelineAuthenticator();
+ }
+
+ @Override
+ public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+ try {
+ if (dToken == null) {
+ //TODO: need to take care of the renew case
+ dToken = selectToken();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Timeline delegation token: " + dToken.toString());
+ }
+ }
+ if (dToken != null) {
+ Map params = new HashMap();
+ TimelineAuthenticator.injectDelegationToken(params, dToken);
+ url = TimelineAuthenticator.appendParams(url, params);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("URL with delegation token: " + url);
+ }
+ }
+ return new AuthenticatedURL(authenticator).openConnection(url, token);
+ } catch (AuthenticationException e) {
+ LOG.error("Authentication failed when openning connection [" + url
+ + "] with token [" + token + "].", e);
+ throw new IOException(e);
+ }
+ }
+
+ private Token selectToken() {
+ UserGroupInformation ugi;
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ String msg = "Error when getting the current user";
+ LOG.error(msg, e);
+ throw new YarnRuntimeException(msg, e);
+ }
+ TimelineDelegationTokenSelector tokenSelector =
+ new TimelineDelegationTokenSelector();
+ return tokenSelector.selectToken(
+ service, ugi.getCredentials().getAllTokens());
+ }
+
+ public void setService(Text service) {
+ this.service = service;
+ }
+
+ }
+
+ public static void main(String[] argv) throws Exception {
+ CommandLine cliParser = new GnuParser().parse(opts, argv);
+ if (cliParser.hasOption("put")) {
+ String path = cliParser.getOptionValue("put");
+ if (path != null && path.length() > 0) {
+ putTimelineEntitiesInJSONFile(path);
+ return;
+ }
+ }
+ printUsage();
+ }
+
+ /**
+ * Put timeline data in a JSON file via command line.
+ *
+ * @param path
+ * path to the {@link TimelineEntities} JSON file
+ */
+ private static void putTimelineEntitiesInJSONFile(String path) {
+ File jsonFile = new File(path);
+ if (!jsonFile.exists()) {
+ System.out.println("Error: File [" + jsonFile.getAbsolutePath()
+ + "] doesn't exist");
+ return;
+ }
+ ObjectMapper mapper = new ObjectMapper();
+ YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
+ TimelineEntities entities = null;
+ try {
+ entities = mapper.readValue(jsonFile, TimelineEntities.class);
+ } catch (Exception e) {
+ System.err.println("Error: " + e.getMessage());
+ e.printStackTrace(System.err);
+ return;
+ }
+ Configuration conf = new YarnConfiguration();
+ TimelineClient client = TimelineClient.createTimelineClient();
+ client.init(conf);
+ client.start();
+ try {
+ if (UserGroupInformation.isSecurityEnabled()
+ && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false)) {
+ Token token =
+ client.getDelegationToken(
+ UserGroupInformation.getCurrentUser().getUserName());
+ UserGroupInformation.getCurrentUser().addToken(token);
+ }
+ TimelinePutResponse response = client.putEntities(
+ entities.getEntities().toArray(
+ new TimelineEntity[entities.getEntities().size()]));
+ if (response.getErrors().size() == 0) {
+ System.out.println("Timeline data is successfully put");
+ } else {
+ for (TimelinePutResponse.TimelinePutError error : response.getErrors()) {
+ System.out.println("TimelineEntity [" + error.getEntityType() + ":" +
+ error.getEntityId() + "] is not successfully put. Error code: " +
+ error.getErrorCode());
+ }
+ }
+ } catch (Exception e) {
+ System.err.println("Error: " + e.getMessage());
+ e.printStackTrace(System.err);
+ } finally {
+ client.stop();
+ }
+ }
+
+ /**
+ * Helper function to print out usage
+ */
+ private static void printUsage() {
+ new HelpFormatter().printHelp("TimelineClient", opts);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java
new file mode 100644
index 0000000..89b5b6b
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.impl;
+import org.apache.hadoop.classification.InterfaceAudience;
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java
new file mode 100644
index 0000000..efd7229
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api;
+import org.apache.hadoop.classification.InterfaceAudience;
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java
new file mode 100644
index 0000000..19aaa88
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.net.URL;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTimelineAuthenticator {
+
+ @Test
+ public void testHasDelegationTokens() throws Exception {
+ TimelineAuthenticator authenticator = new TimelineAuthenticator();
+ Assert.assertFalse(authenticator.hasDelegationToken(new URL(
+ "http://localhost:8/resource")));
+ Assert.assertFalse(authenticator.hasDelegationToken(new URL(
+ "http://localhost:8/resource?other=xxxx")));
+ Assert.assertTrue(authenticator.hasDelegationToken(new URL(
+ "http://localhost:8/resource?delegation=yyyy")));
+ Assert.assertTrue(authenticator.hasDelegationToken(new URL(
+ "http://localhost:8/resource?other=xxxx&delegation=yyyy")));
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
new file mode 100644
index 0000000..3c5272a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.net.ConnectException;
+
+import org.junit.Assert;
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+
+public class TestTimelineClient {
+
+ private TimelineClientImpl client;
+
+ @Before
+ public void setup() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ client = createTimelineClient(conf);
+ }
+
+ @After
+ public void tearDown() {
+ if (client != null) {
+ client.stop();
+ }
+ }
+
+ @Test
+ public void testPostEntities() throws Exception {
+ mockClientResponse(client, ClientResponse.Status.OK, false, false);
+ try {
+ TimelinePutResponse response = client.putEntities(generateEntity());
+ Assert.assertEquals(0, response.getErrors().size());
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected");
+ }
+ }
+
+ @Test
+ public void testPostEntitiesWithError() throws Exception {
+ mockClientResponse(client, ClientResponse.Status.OK, true, false);
+ try {
+ TimelinePutResponse response = client.putEntities(generateEntity());
+ Assert.assertEquals(1, response.getErrors().size());
+ Assert.assertEquals("test entity id", response.getErrors().get(0)
+ .getEntityId());
+ Assert.assertEquals("test entity type", response.getErrors().get(0)
+ .getEntityType());
+ Assert.assertEquals(TimelinePutResponse.TimelinePutError.IO_EXCEPTION,
+ response.getErrors().get(0).getErrorCode());
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected");
+ }
+ }
+
+ @Test
+ public void testPostEntitiesNoResponse() throws Exception {
+ mockClientResponse(
+ client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
+ try {
+ client.putEntities(generateEntity());
+ Assert.fail("Exception is expected");
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().contains(
+ "Failed to get the response from the timeline server."));
+ }
+ }
+
+ @Test
+ public void testPostEntitiesConnectionRefused() throws Exception {
+ mockClientResponse(client, null, false, true);
+ try {
+ client.putEntities(generateEntity());
+ Assert.fail("RuntimeException is expected");
+ } catch (RuntimeException re) {
+ Assert.assertTrue(re instanceof ClientHandlerException);
+ }
+ }
+
+ @Test
+ public void testPostEntitiesTimelineServiceNotEnabled() throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
+ TimelineClientImpl client = createTimelineClient(conf);
+ mockClientResponse(
+ client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false);
+ try {
+ TimelinePutResponse response = client.putEntities(generateEntity());
+ Assert.assertEquals(0, response.getErrors().size());
+ } catch (YarnException e) {
+ Assert.fail(
+ "putEntities should already return before throwing the exception");
+ }
+ }
+
+ @Test
+ public void testPostEntitiesTimelineServiceDefaultNotEnabled()
+ throws Exception {
+ YarnConfiguration conf = new YarnConfiguration();
+ // Unset the timeline service's enabled properties.
+ // Make sure default value is pickup up
+ conf.unset(YarnConfiguration.TIMELINE_SERVICE_ENABLED);
+ TimelineClientImpl client = createTimelineClient(conf);
+ mockClientResponse(client, ClientResponse.Status.INTERNAL_SERVER_ERROR,
+ false, false);
+ try {
+ TimelinePutResponse response = client.putEntities(generateEntity());
+ Assert.assertEquals(0, response.getErrors().size());
+ } catch (YarnException e) {
+ Assert
+ .fail("putEntities should already return before throwing the exception");
+ }
+ }
+
+ private static ClientResponse mockClientResponse(TimelineClientImpl client,
+ ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) {
+ ClientResponse response = mock(ClientResponse.class);
+ if (hasRuntimeError) {
+ doThrow(new ClientHandlerException(new ConnectException())).when(client)
+ .doPostingEntities(any(TimelineEntities.class));
+ return response;
+ }
+ doReturn(response).when(client)
+ .doPostingEntities(any(TimelineEntities.class));
+ when(response.getClientResponseStatus()).thenReturn(status);
+ TimelinePutResponse.TimelinePutError error =
+ new TimelinePutResponse.TimelinePutError();
+ error.setEntityId("test entity id");
+ error.setEntityType("test entity type");
+ error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION);
+ TimelinePutResponse putResponse = new TimelinePutResponse();
+ if (hasError) {
+ putResponse.addError(error);
+ }
+ when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse);
+ return response;
+ }
+
+ private static TimelineEntity generateEntity() {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityId("entity id");
+ entity.setEntityType("entity type");
+ entity.setStartTime(System.currentTimeMillis());
+ for (int i = 0; i < 2; ++i) {
+ TimelineEvent event = new TimelineEvent();
+ event.setTimestamp(System.currentTimeMillis());
+ event.setEventType("test event type " + i);
+ event.addEventInfo("key1", "val1");
+ event.addEventInfo("key2", "val2");
+ entity.addEvent(event);
+ }
+ entity.addRelatedEntity("test ref type 1", "test ref id 1");
+ entity.addRelatedEntity("test ref type 2", "test ref id 2");
+ entity.addPrimaryFilter("pkey1", "pval1");
+ entity.addPrimaryFilter("pkey2", "pval2");
+ entity.addOtherInfo("okey1", "oval1");
+ entity.addOtherInfo("okey2", "oval2");
+ return entity;
+ }
+
+ private static TimelineClientImpl createTimelineClient(
+ YarnConfiguration conf) {
+ TimelineClientImpl client =
+ spy((TimelineClientImpl) TimelineClient.createTimelineClient());
+ client.init(conf);
+ client.start();
+ return client;
+ }
+
+}