diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml index 56b9981..b21ea52 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml @@ -57,14 +57,6 @@ log4j log4j - - org.mortbay.jetty - jetty-util - - - com.sun.jersey - jersey-client - diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java deleted file mode 100644 index de1d3e2..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.client.api; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; - -/** - * A client library that can be used to post some information in terms of a - * number of conceptual entities. - */ -@Public -@Unstable -public abstract class TimelineClient extends AbstractService { - - @Public - public static TimelineClient createTimelineClient() { - TimelineClient client = new TimelineClientImpl(); - return client; - } - - @Private - protected TimelineClient(String name) { - super(name); - } - - /** - *

- * Send the information of a number of conceptual entities to the timeline - * server. It is a blocking API. The method will not return until it gets the - * response from the timeline server. - *

- * - * @param entities - * the collection of {@link TimelineEntity} - * @return the error information if the sent entities are not correctly stored - * @throws IOException - * @throws YarnException - */ - @Public - public abstract TimelinePutResponse putEntities( - TimelineEntity... entities) throws IOException, YarnException; - - /** - *

- * Get a delegation token so as to be able to talk to the timeline server in a - * secure way. - *

- * - * @param renewer - * Address of the renewer who can renew these tokens when needed by - * securely talking to the timeline server - * @return a delegation token ({@link Token}) that can be used to talk to the - * timeline server - * @throws IOException - * @throws YarnException - */ - @Public - public abstract Token getDelegationToken( - String renewer) throws IOException, YarnException; - -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java deleted file mode 100644 index 25333c7..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java +++ /dev/null @@ -1,260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.client.api.impl; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.net.HttpURLConnection; -import java.net.URL; -import java.net.URLEncoder; -import java.text.MessageFormat; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.security.authentication.client.AuthenticatedURL; -import org.apache.hadoop.security.authentication.client.AuthenticationException; -import org.apache.hadoop.security.authentication.client.Authenticator; -import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.records.timeline.TimelineDelegationTokenResponse; -import org.apache.hadoop.yarn.security.client.TimelineAuthenticationConsts; -import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; -import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenOperation; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.map.ObjectMapper; - -import com.google.common.annotations.VisibleForTesting; - -/** - * A KerberosAuthenticator subclass that fallback to - * {@link TimelineAuthenticationConsts}. - */ -@Private -@Unstable -public class TimelineAuthenticator extends KerberosAuthenticator { - - private static ObjectMapper mapper; - - static { - mapper = new ObjectMapper(); - YarnJacksonJaxbJsonProvider.configObjectMapper(mapper); - } - - /** - * Returns the fallback authenticator if the server does not use Kerberos - * SPNEGO HTTP authentication. - * - * @return a {@link TimelineAuthenticationConsts} instance. - */ - @Override - protected Authenticator getFallBackAuthenticator() { - return new TimelineAuthenticator(); - } - - public static void injectDelegationToken(Map params, - Token dtToken) - throws IOException { - if (dtToken != null) { - params.put(TimelineAuthenticationConsts.DELEGATION_PARAM, - dtToken.encodeToUrlString()); - } - } - - @Private - @VisibleForTesting - boolean hasDelegationToken(URL url) { - if (url.getQuery() == null) { - return false; - } else { - return url.getQuery().contains( - TimelineAuthenticationConsts.DELEGATION_PARAM + "="); - } - } - - @Override - public void authenticate(URL url, AuthenticatedURL.Token token) - throws IOException, AuthenticationException { - if (!hasDelegationToken(url)) { - super.authenticate(url, token); - } - } - - public static Token getDelegationToken( - URL url, AuthenticatedURL.Token token, String renewer) throws IOException { - TimelineDelegationTokenOperation op = - TimelineDelegationTokenOperation.GETDELEGATIONTOKEN; - Map params = new HashMap(); - params.put(TimelineAuthenticationConsts.OP_PARAM, op.toString()); - params.put(TimelineAuthenticationConsts.RENEWER_PARAM, renewer); - url = appendParams(url, params); - AuthenticatedURL aUrl = - new AuthenticatedURL(new TimelineAuthenticator()); - try { - HttpURLConnection conn = aUrl.openConnection(url, token); - conn.setRequestMethod(op.getHttpMethod()); - TimelineDelegationTokenResponse dtRes = validateAndParseResponse(conn); - if (!dtRes.getType().equals( - TimelineAuthenticationConsts.DELEGATION_TOKEN_URL)) { - throw new IOException("The response content is not expected: " - + dtRes.getContent()); - } - String tokenStr = dtRes.getContent().toString(); - Token dToken = - new Token(); - dToken.decodeFromUrlString(tokenStr); - return dToken; - } catch (AuthenticationException ex) { - throw new IOException(ex.toString(), ex); - } - } - - public static long renewDelegationToken(URL url, - AuthenticatedURL.Token token, - Token dToken) throws IOException { - Map params = new HashMap(); - params.put(TimelineAuthenticationConsts.OP_PARAM, - TimelineDelegationTokenOperation.RENEWDELEGATIONTOKEN.toString()); - params.put(TimelineAuthenticationConsts.TOKEN_PARAM, - dToken.encodeToUrlString()); - url = appendParams(url, params); - AuthenticatedURL aUrl = - new AuthenticatedURL(new TimelineAuthenticator()); - try { - HttpURLConnection conn = aUrl.openConnection(url, token); - conn.setRequestMethod( - TimelineDelegationTokenOperation.RENEWDELEGATIONTOKEN.getHttpMethod()); - TimelineDelegationTokenResponse dtRes = validateAndParseResponse(conn); - if (!dtRes.getType().equals( - TimelineAuthenticationConsts.DELEGATION_TOKEN_EXPIRATION_TIME)) { - throw new IOException("The response content is not expected: " - + dtRes.getContent()); - } - return Long.valueOf(dtRes.getContent().toString()); - } catch (AuthenticationException ex) { - throw new IOException(ex.toString(), ex); - } - } - - public static void cancelDelegationToken(URL url, - AuthenticatedURL.Token token, - Token dToken) throws IOException { - Map params = new HashMap(); - params.put(TimelineAuthenticationConsts.OP_PARAM, - TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN.toString()); - params.put(TimelineAuthenticationConsts.TOKEN_PARAM, - dToken.encodeToUrlString()); - url = appendParams(url, params); - AuthenticatedURL aUrl = - new AuthenticatedURL(new TimelineAuthenticator()); - try { - HttpURLConnection conn = aUrl.openConnection(url, token); - conn.setRequestMethod(TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN - .getHttpMethod()); - validateAndParseResponse(conn); - } catch (AuthenticationException ex) { - throw new IOException(ex.toString(), ex); - } - } - - /** - * Convenience method that appends parameters an HTTP URL. - * - * @param url - * the url. - * @param params - * the query string parameters. - * - * @return a URL - * - * @throws IOException - * thrown if an IO error occurs. - */ - public static URL appendParams(URL url, Map params) - throws IOException { - StringBuilder sb = new StringBuilder(); - sb.append(url); - String separator = url.toString().contains("?") ? "&" : "?"; - for (Map.Entry entry : params.entrySet()) { - sb.append(separator).append(entry.getKey()).append("="). - append(URLEncoder.encode(entry.getValue(), "UTF8")); - separator = "&"; - } - return new URL(sb.toString()); - } - - /** - * Validates the response of an HttpURLConnection. If the current - * status code is not 200, it will throw an exception with a detail message - * using Server side error messages if available. Otherwise, - * {@link TimelineDelegationTokenResponse} will be parsed and returned. - * - * @param conn - * the HttpURLConnection. - * @return - * @throws IOException - * thrown if the current status code is not 200 or the JSON response - * cannot be parsed correctly - */ - private static TimelineDelegationTokenResponse validateAndParseResponse( - HttpURLConnection conn) throws IOException { - int status = conn.getResponseCode(); - JsonNode json = mapper.readTree(conn.getInputStream()); - if (status == HttpURLConnection.HTTP_OK) { - return mapper.readValue(json, TimelineDelegationTokenResponse.class); - } else { - // If the status code is not 200, some thing wrong should happen at the - // server side, the JSON content is going to contain exception details. - // We can use the JSON content to reconstruct the exception object. - try { - String message = - json.get(TimelineAuthenticationConsts.ERROR_MESSAGE_JSON) - .getTextValue(); - String exception = - json.get(TimelineAuthenticationConsts.ERROR_EXCEPTION_JSON) - .getTextValue(); - String className = - json.get(TimelineAuthenticationConsts.ERROR_CLASSNAME_JSON) - .getTextValue(); - - try { - ClassLoader cl = TimelineAuthenticator.class.getClassLoader(); - Class klass = cl.loadClass(className); - Constructor constr = klass.getConstructor(String.class); - throw (IOException) constr.newInstance(message); - } catch (IOException ex) { - throw ex; - } catch (Exception ex) { - throw new IOException(MessageFormat.format("{0} - {1}", exception, - message)); - } - } catch (IOException ex) { - if (ex.getCause() instanceof IOException) { - throw (IOException) ex.getCause(); - } - throw new IOException( - MessageFormat.format("HTTP status [{0}], {1}", - status, conn.getResponseMessage())); - } - } - } - -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java deleted file mode 100644 index daf25ea..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ /dev/null @@ -1,332 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.client.api.impl; - -import java.io.File; -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.URI; -import java.net.URL; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import javax.ws.rs.core.MediaType; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.client.AuthenticatedURL; -import org.apache.hadoop.security.authentication.client.AuthenticationException; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.client.api.TimelineClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; -import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenSelector; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; -import org.codehaus.jackson.map.ObjectMapper; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; -import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; - -@Private -@Unstable -public class TimelineClientImpl extends TimelineClient { - - private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class); - private static final String RESOURCE_URI_STR = "/ws/v1/timeline/"; - private static final String URL_PARAM_USER_NAME = "user.name"; - private static final Joiner JOINER = Joiner.on(""); - private static Options opts; - static { - opts = new Options(); - opts.addOption("put", true, "Put the TimelineEntities in a JSON file"); - opts.getOption("put").setArgName("Path to the JSON file"); - opts.addOption("help", false, "Print usage"); - } - - private Client client; - private URI resURI; - private boolean isEnabled; - private KerberosAuthenticatedURLConnectionFactory urlFactory; - - public TimelineClientImpl() { - super(TimelineClientImpl.class.getName()); - ClientConfig cc = new DefaultClientConfig(); - cc.getClasses().add(YarnJacksonJaxbJsonProvider.class); - if (UserGroupInformation.isSecurityEnabled()) { - urlFactory = new KerberosAuthenticatedURLConnectionFactory(); - client = new Client(new URLConnectionClientHandler(urlFactory), cc); - } else { - client = new Client(new URLConnectionClientHandler( - new PseudoAuthenticatedURLConnectionFactory()), cc); - } - } - - protected void serviceInit(Configuration conf) throws Exception { - isEnabled = conf.getBoolean( - YarnConfiguration.TIMELINE_SERVICE_ENABLED, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); - if (!isEnabled) { - LOG.info("Timeline service is not enabled"); - } else { - if (YarnConfiguration.useHttps(conf)) { - resURI = URI - .create(JOINER.join("https://", conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS), - RESOURCE_URI_STR)); - } else { - resURI = URI.create(JOINER.join("http://", conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS), - RESOURCE_URI_STR)); - } - if (UserGroupInformation.isSecurityEnabled()) { - urlFactory.setService(TimelineUtils.buildTimelineTokenService(conf)); - } - LOG.info("Timeline service address: " + resURI); - } - super.serviceInit(conf); - } - - @Override - public TimelinePutResponse putEntities( - TimelineEntity... entities) throws IOException, YarnException { - if (!isEnabled) { - if (LOG.isDebugEnabled()) { - LOG.debug("Nothing will be put because timeline service is not enabled"); - } - return new TimelinePutResponse(); - } - TimelineEntities entitiesContainer = new TimelineEntities(); - entitiesContainer.addEntities(Arrays.asList(entities)); - ClientResponse resp; - try { - resp = doPostingEntities(entitiesContainer); - } catch (RuntimeException re) { - // runtime exception is expected if the client cannot connect the server - String msg = - "Failed to get the response from the timeline server."; - LOG.error(msg, re); - throw re; - } - if (resp == null || - resp.getClientResponseStatus() != ClientResponse.Status.OK) { - String msg = - "Failed to get the response from the timeline server."; - LOG.error(msg); - if (LOG.isDebugEnabled() && resp != null) { - String output = resp.getEntity(String.class); - LOG.debug("HTTP error code: " + resp.getStatus() - + " Server response : \n" + output); - } - throw new YarnException(msg); - } - return resp.getEntity(TimelinePutResponse.class); - } - - @Override - public Token getDelegationToken( - String renewer) throws IOException, YarnException { - return TimelineAuthenticator.getDelegationToken(resURI.toURL(), - urlFactory.token, renewer); - } - - @Private - @VisibleForTesting - public ClientResponse doPostingEntities(TimelineEntities entities) { - WebResource webResource = client.resource(resURI); - return webResource.accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON) - .post(ClientResponse.class, entities); - } - - private static class PseudoAuthenticatedURLConnectionFactory - implements HttpURLConnectionFactory { - - @Override - public HttpURLConnection getHttpURLConnection(URL url) throws IOException { - Map params = new HashMap(); - params.put(URL_PARAM_USER_NAME, - UserGroupInformation.getCurrentUser().getShortUserName()); - url = TimelineAuthenticator.appendParams(url, params); - if (LOG.isDebugEnabled()) { - LOG.debug("URL with delegation token: " + url); - } - return (HttpURLConnection) url.openConnection(); - } - - } - private static class KerberosAuthenticatedURLConnectionFactory - implements HttpURLConnectionFactory { - - private AuthenticatedURL.Token token; - private TimelineAuthenticator authenticator; - private Token dToken; - private Text service; - - public KerberosAuthenticatedURLConnectionFactory() { - token = new AuthenticatedURL.Token(); - authenticator = new TimelineAuthenticator(); - } - - @Override - public HttpURLConnection getHttpURLConnection(URL url) throws IOException { - try { - if (dToken == null) { - //TODO: need to take care of the renew case - dToken = selectToken(); - if (LOG.isDebugEnabled()) { - LOG.debug("Timeline delegation token: " + dToken.toString()); - } - } - if (dToken != null) { - Map params = new HashMap(); - TimelineAuthenticator.injectDelegationToken(params, dToken); - url = TimelineAuthenticator.appendParams(url, params); - if (LOG.isDebugEnabled()) { - LOG.debug("URL with delegation token: " + url); - } - } - return new AuthenticatedURL(authenticator).openConnection(url, token); - } catch (AuthenticationException e) { - LOG.error("Authentication failed when openning connection [" + url - + "] with token [" + token + "].", e); - throw new IOException(e); - } - } - - private Token selectToken() { - UserGroupInformation ugi; - try { - ugi = UserGroupInformation.getCurrentUser(); - } catch (IOException e) { - String msg = "Error when getting the current user"; - LOG.error(msg, e); - throw new YarnRuntimeException(msg, e); - } - TimelineDelegationTokenSelector tokenSelector = - new TimelineDelegationTokenSelector(); - return tokenSelector.selectToken( - service, ugi.getCredentials().getAllTokens()); - } - - public void setService(Text service) { - this.service = service; - } - - } - - public static void main(String[] argv) throws Exception { - CommandLine cliParser = new GnuParser().parse(opts, argv); - if (cliParser.hasOption("put")) { - String path = cliParser.getOptionValue("put"); - if (path != null && path.length() > 0) { - putTimelineEntitiesInJSONFile(path); - return; - } - } - printUsage(); - } - - /** - * Put timeline data in a JSON file via command line. - * - * @param path - * path to the {@link TimelineEntities} JSON file - */ - private static void putTimelineEntitiesInJSONFile(String path) { - File jsonFile = new File(path); - if (!jsonFile.exists()) { - System.out.println("Error: File [" + jsonFile.getAbsolutePath() - + "] doesn't exist"); - return; - } - ObjectMapper mapper = new ObjectMapper(); - YarnJacksonJaxbJsonProvider.configObjectMapper(mapper); - TimelineEntities entities = null; - try { - entities = mapper.readValue(jsonFile, TimelineEntities.class); - } catch (Exception e) { - System.err.println("Error: " + e.getMessage()); - e.printStackTrace(System.err); - return; - } - Configuration conf = new YarnConfiguration(); - TimelineClient client = TimelineClient.createTimelineClient(); - client.init(conf); - client.start(); - try { - if (UserGroupInformation.isSecurityEnabled() - && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false)) { - Token token = - client.getDelegationToken( - UserGroupInformation.getCurrentUser().getUserName()); - UserGroupInformation.getCurrentUser().addToken(token); - } - TimelinePutResponse response = client.putEntities( - entities.getEntities().toArray( - new TimelineEntity[entities.getEntities().size()])); - if (response.getErrors().size() == 0) { - System.out.println("Timeline data is successfully put"); - } else { - for (TimelinePutResponse.TimelinePutError error : response.getErrors()) { - System.out.println("TimelineEntity [" + error.getEntityType() + ":" + - error.getEntityId() + "] is not successfully put. Error code: " + - error.getErrorCode()); - } - } - } catch (Exception e) { - System.err.println("Error: " + e.getMessage()); - e.printStackTrace(System.err); - } finally { - client.stop(); - } - } - - /** - * Helper function to print out usage - */ - private static void printUsage() { - new HelpFormatter().printHelp("TimelineClient", opts); - } - -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java deleted file mode 100644 index 19aaa88..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.client.api.impl; - -import java.net.URL; - -import org.junit.Assert; -import org.junit.Test; - -public class TestTimelineAuthenticator { - - @Test - public void testHasDelegationTokens() throws Exception { - TimelineAuthenticator authenticator = new TimelineAuthenticator(); - Assert.assertFalse(authenticator.hasDelegationToken(new URL( - "http://localhost:8/resource"))); - Assert.assertFalse(authenticator.hasDelegationToken(new URL( - "http://localhost:8/resource?other=xxxx"))); - Assert.assertTrue(authenticator.hasDelegationToken(new URL( - "http://localhost:8/resource?delegation=yyyy"))); - Assert.assertTrue(authenticator.hasDelegationToken(new URL( - "http://localhost:8/resource?other=xxxx&delegation=yyyy"))); - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java deleted file mode 100644 index 3c5272a..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.client.api.impl; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import java.net.ConnectException; - -import org.junit.Assert; - -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.client.api.TimelineClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.ClientResponse; - -public class TestTimelineClient { - - private TimelineClientImpl client; - - @Before - public void setup() { - YarnConfiguration conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - client = createTimelineClient(conf); - } - - @After - public void tearDown() { - if (client != null) { - client.stop(); - } - } - - @Test - public void testPostEntities() throws Exception { - mockClientResponse(client, ClientResponse.Status.OK, false, false); - try { - TimelinePutResponse response = client.putEntities(generateEntity()); - Assert.assertEquals(0, response.getErrors().size()); - } catch (YarnException e) { - Assert.fail("Exception is not expected"); - } - } - - @Test - public void testPostEntitiesWithError() throws Exception { - mockClientResponse(client, ClientResponse.Status.OK, true, false); - try { - TimelinePutResponse response = client.putEntities(generateEntity()); - Assert.assertEquals(1, response.getErrors().size()); - Assert.assertEquals("test entity id", response.getErrors().get(0) - .getEntityId()); - Assert.assertEquals("test entity type", response.getErrors().get(0) - .getEntityType()); - Assert.assertEquals(TimelinePutResponse.TimelinePutError.IO_EXCEPTION, - response.getErrors().get(0).getErrorCode()); - } catch (YarnException e) { - Assert.fail("Exception is not expected"); - } - } - - @Test - public void testPostEntitiesNoResponse() throws Exception { - mockClientResponse( - client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false); - try { - client.putEntities(generateEntity()); - Assert.fail("Exception is expected"); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage().contains( - "Failed to get the response from the timeline server.")); - } - } - - @Test - public void testPostEntitiesConnectionRefused() throws Exception { - mockClientResponse(client, null, false, true); - try { - client.putEntities(generateEntity()); - Assert.fail("RuntimeException is expected"); - } catch (RuntimeException re) { - Assert.assertTrue(re instanceof ClientHandlerException); - } - } - - @Test - public void testPostEntitiesTimelineServiceNotEnabled() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false); - TimelineClientImpl client = createTimelineClient(conf); - mockClientResponse( - client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false); - try { - TimelinePutResponse response = client.putEntities(generateEntity()); - Assert.assertEquals(0, response.getErrors().size()); - } catch (YarnException e) { - Assert.fail( - "putEntities should already return before throwing the exception"); - } - } - - @Test - public void testPostEntitiesTimelineServiceDefaultNotEnabled() - throws Exception { - YarnConfiguration conf = new YarnConfiguration(); - // Unset the timeline service's enabled properties. - // Make sure default value is pickup up - conf.unset(YarnConfiguration.TIMELINE_SERVICE_ENABLED); - TimelineClientImpl client = createTimelineClient(conf); - mockClientResponse(client, ClientResponse.Status.INTERNAL_SERVER_ERROR, - false, false); - try { - TimelinePutResponse response = client.putEntities(generateEntity()); - Assert.assertEquals(0, response.getErrors().size()); - } catch (YarnException e) { - Assert - .fail("putEntities should already return before throwing the exception"); - } - } - - private static ClientResponse mockClientResponse(TimelineClientImpl client, - ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) { - ClientResponse response = mock(ClientResponse.class); - if (hasRuntimeError) { - doThrow(new ClientHandlerException(new ConnectException())).when(client) - .doPostingEntities(any(TimelineEntities.class)); - return response; - } - doReturn(response).when(client) - .doPostingEntities(any(TimelineEntities.class)); - when(response.getClientResponseStatus()).thenReturn(status); - TimelinePutResponse.TimelinePutError error = - new TimelinePutResponse.TimelinePutError(); - error.setEntityId("test entity id"); - error.setEntityType("test entity type"); - error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION); - TimelinePutResponse putResponse = new TimelinePutResponse(); - if (hasError) { - putResponse.addError(error); - } - when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse); - return response; - } - - private static TimelineEntity generateEntity() { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityId("entity id"); - entity.setEntityType("entity type"); - entity.setStartTime(System.currentTimeMillis()); - for (int i = 0; i < 2; ++i) { - TimelineEvent event = new TimelineEvent(); - event.setTimestamp(System.currentTimeMillis()); - event.setEventType("test event type " + i); - event.addEventInfo("key1", "val1"); - event.addEventInfo("key2", "val2"); - entity.addEvent(event); - } - entity.addRelatedEntity("test ref type 1", "test ref id 1"); - entity.addRelatedEntity("test ref type 2", "test ref id 2"); - entity.addPrimaryFilter("pkey1", "pval1"); - entity.addPrimaryFilter("pkey2", "pval2"); - entity.addOtherInfo("okey1", "oval1"); - entity.addOtherInfo("okey2", "oval2"); - return entity; - } - - private static TimelineClientImpl createTimelineClient( - YarnConfiguration conf) { - TimelineClientImpl client = - spy((TimelineClientImpl) TimelineClient.createTimelineClient()); - client.init(conf); - client.start(); - return client; - } - -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml index aa12b3f..04cf50d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml @@ -67,10 +67,18 @@ commons-codec
+ org.mortbay.jetty + jetty-util + + com.sun.jersey jersey-core + com.sun.jersey + jersey-client + + org.codehaus.jackson jackson-core-asl diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java new file mode 100644 index 0000000..de1d3e2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; + +/** + * A client library that can be used to post some information in terms of a + * number of conceptual entities. + */ +@Public +@Unstable +public abstract class TimelineClient extends AbstractService { + + @Public + public static TimelineClient createTimelineClient() { + TimelineClient client = new TimelineClientImpl(); + return client; + } + + @Private + protected TimelineClient(String name) { + super(name); + } + + /** + *

+ * Send the information of a number of conceptual entities to the timeline + * server. It is a blocking API. The method will not return until it gets the + * response from the timeline server. + *

+ * + * @param entities + * the collection of {@link TimelineEntity} + * @return the error information if the sent entities are not correctly stored + * @throws IOException + * @throws YarnException + */ + @Public + public abstract TimelinePutResponse putEntities( + TimelineEntity... entities) throws IOException, YarnException; + + /** + *

+ * Get a delegation token so as to be able to talk to the timeline server in a + * secure way. + *

+ * + * @param renewer + * Address of the renewer who can renew these tokens when needed by + * securely talking to the timeline server + * @return a delegation token ({@link Token}) that can be used to talk to the + * timeline server + * @throws IOException + * @throws YarnException + */ + @Public + public abstract Token getDelegationToken( + String renewer) throws IOException, YarnException; + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java new file mode 100644 index 0000000..25333c7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineAuthenticator.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.HttpURLConnection; +import java.net.URL; +import java.net.URLEncoder; +import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.authentication.client.AuthenticatedURL; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.security.authentication.client.Authenticator; +import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDelegationTokenResponse; +import org.apache.hadoop.yarn.security.client.TimelineAuthenticationConsts; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenOperation; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A KerberosAuthenticator subclass that fallback to + * {@link TimelineAuthenticationConsts}. + */ +@Private +@Unstable +public class TimelineAuthenticator extends KerberosAuthenticator { + + private static ObjectMapper mapper; + + static { + mapper = new ObjectMapper(); + YarnJacksonJaxbJsonProvider.configObjectMapper(mapper); + } + + /** + * Returns the fallback authenticator if the server does not use Kerberos + * SPNEGO HTTP authentication. + * + * @return a {@link TimelineAuthenticationConsts} instance. + */ + @Override + protected Authenticator getFallBackAuthenticator() { + return new TimelineAuthenticator(); + } + + public static void injectDelegationToken(Map params, + Token dtToken) + throws IOException { + if (dtToken != null) { + params.put(TimelineAuthenticationConsts.DELEGATION_PARAM, + dtToken.encodeToUrlString()); + } + } + + @Private + @VisibleForTesting + boolean hasDelegationToken(URL url) { + if (url.getQuery() == null) { + return false; + } else { + return url.getQuery().contains( + TimelineAuthenticationConsts.DELEGATION_PARAM + "="); + } + } + + @Override + public void authenticate(URL url, AuthenticatedURL.Token token) + throws IOException, AuthenticationException { + if (!hasDelegationToken(url)) { + super.authenticate(url, token); + } + } + + public static Token getDelegationToken( + URL url, AuthenticatedURL.Token token, String renewer) throws IOException { + TimelineDelegationTokenOperation op = + TimelineDelegationTokenOperation.GETDELEGATIONTOKEN; + Map params = new HashMap(); + params.put(TimelineAuthenticationConsts.OP_PARAM, op.toString()); + params.put(TimelineAuthenticationConsts.RENEWER_PARAM, renewer); + url = appendParams(url, params); + AuthenticatedURL aUrl = + new AuthenticatedURL(new TimelineAuthenticator()); + try { + HttpURLConnection conn = aUrl.openConnection(url, token); + conn.setRequestMethod(op.getHttpMethod()); + TimelineDelegationTokenResponse dtRes = validateAndParseResponse(conn); + if (!dtRes.getType().equals( + TimelineAuthenticationConsts.DELEGATION_TOKEN_URL)) { + throw new IOException("The response content is not expected: " + + dtRes.getContent()); + } + String tokenStr = dtRes.getContent().toString(); + Token dToken = + new Token(); + dToken.decodeFromUrlString(tokenStr); + return dToken; + } catch (AuthenticationException ex) { + throw new IOException(ex.toString(), ex); + } + } + + public static long renewDelegationToken(URL url, + AuthenticatedURL.Token token, + Token dToken) throws IOException { + Map params = new HashMap(); + params.put(TimelineAuthenticationConsts.OP_PARAM, + TimelineDelegationTokenOperation.RENEWDELEGATIONTOKEN.toString()); + params.put(TimelineAuthenticationConsts.TOKEN_PARAM, + dToken.encodeToUrlString()); + url = appendParams(url, params); + AuthenticatedURL aUrl = + new AuthenticatedURL(new TimelineAuthenticator()); + try { + HttpURLConnection conn = aUrl.openConnection(url, token); + conn.setRequestMethod( + TimelineDelegationTokenOperation.RENEWDELEGATIONTOKEN.getHttpMethod()); + TimelineDelegationTokenResponse dtRes = validateAndParseResponse(conn); + if (!dtRes.getType().equals( + TimelineAuthenticationConsts.DELEGATION_TOKEN_EXPIRATION_TIME)) { + throw new IOException("The response content is not expected: " + + dtRes.getContent()); + } + return Long.valueOf(dtRes.getContent().toString()); + } catch (AuthenticationException ex) { + throw new IOException(ex.toString(), ex); + } + } + + public static void cancelDelegationToken(URL url, + AuthenticatedURL.Token token, + Token dToken) throws IOException { + Map params = new HashMap(); + params.put(TimelineAuthenticationConsts.OP_PARAM, + TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN.toString()); + params.put(TimelineAuthenticationConsts.TOKEN_PARAM, + dToken.encodeToUrlString()); + url = appendParams(url, params); + AuthenticatedURL aUrl = + new AuthenticatedURL(new TimelineAuthenticator()); + try { + HttpURLConnection conn = aUrl.openConnection(url, token); + conn.setRequestMethod(TimelineDelegationTokenOperation.CANCELDELEGATIONTOKEN + .getHttpMethod()); + validateAndParseResponse(conn); + } catch (AuthenticationException ex) { + throw new IOException(ex.toString(), ex); + } + } + + /** + * Convenience method that appends parameters an HTTP URL. + * + * @param url + * the url. + * @param params + * the query string parameters. + * + * @return a URL + * + * @throws IOException + * thrown if an IO error occurs. + */ + public static URL appendParams(URL url, Map params) + throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append(url); + String separator = url.toString().contains("?") ? "&" : "?"; + for (Map.Entry entry : params.entrySet()) { + sb.append(separator).append(entry.getKey()).append("="). + append(URLEncoder.encode(entry.getValue(), "UTF8")); + separator = "&"; + } + return new URL(sb.toString()); + } + + /** + * Validates the response of an HttpURLConnection. If the current + * status code is not 200, it will throw an exception with a detail message + * using Server side error messages if available. Otherwise, + * {@link TimelineDelegationTokenResponse} will be parsed and returned. + * + * @param conn + * the HttpURLConnection. + * @return + * @throws IOException + * thrown if the current status code is not 200 or the JSON response + * cannot be parsed correctly + */ + private static TimelineDelegationTokenResponse validateAndParseResponse( + HttpURLConnection conn) throws IOException { + int status = conn.getResponseCode(); + JsonNode json = mapper.readTree(conn.getInputStream()); + if (status == HttpURLConnection.HTTP_OK) { + return mapper.readValue(json, TimelineDelegationTokenResponse.class); + } else { + // If the status code is not 200, some thing wrong should happen at the + // server side, the JSON content is going to contain exception details. + // We can use the JSON content to reconstruct the exception object. + try { + String message = + json.get(TimelineAuthenticationConsts.ERROR_MESSAGE_JSON) + .getTextValue(); + String exception = + json.get(TimelineAuthenticationConsts.ERROR_EXCEPTION_JSON) + .getTextValue(); + String className = + json.get(TimelineAuthenticationConsts.ERROR_CLASSNAME_JSON) + .getTextValue(); + + try { + ClassLoader cl = TimelineAuthenticator.class.getClassLoader(); + Class klass = cl.loadClass(className); + Constructor constr = klass.getConstructor(String.class); + throw (IOException) constr.newInstance(message); + } catch (IOException ex) { + throw ex; + } catch (Exception ex) { + throw new IOException(MessageFormat.format("{0} - {1}", exception, + message)); + } + } catch (IOException ex) { + if (ex.getCause() instanceof IOException) { + throw (IOException) ex.getCause(); + } + throw new IOException( + MessageFormat.format("HTTP status [{0}], {1}", + status, conn.getResponseMessage())); + } + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java new file mode 100644 index 0000000..daf25ea --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.io.File; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import javax.ws.rs.core.MediaType; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.client.AuthenticatedURL; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenSelector; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.codehaus.jackson.map.ObjectMapper; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; + +@Private +@Unstable +public class TimelineClientImpl extends TimelineClient { + + private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class); + private static final String RESOURCE_URI_STR = "/ws/v1/timeline/"; + private static final String URL_PARAM_USER_NAME = "user.name"; + private static final Joiner JOINER = Joiner.on(""); + private static Options opts; + static { + opts = new Options(); + opts.addOption("put", true, "Put the TimelineEntities in a JSON file"); + opts.getOption("put").setArgName("Path to the JSON file"); + opts.addOption("help", false, "Print usage"); + } + + private Client client; + private URI resURI; + private boolean isEnabled; + private KerberosAuthenticatedURLConnectionFactory urlFactory; + + public TimelineClientImpl() { + super(TimelineClientImpl.class.getName()); + ClientConfig cc = new DefaultClientConfig(); + cc.getClasses().add(YarnJacksonJaxbJsonProvider.class); + if (UserGroupInformation.isSecurityEnabled()) { + urlFactory = new KerberosAuthenticatedURLConnectionFactory(); + client = new Client(new URLConnectionClientHandler(urlFactory), cc); + } else { + client = new Client(new URLConnectionClientHandler( + new PseudoAuthenticatedURLConnectionFactory()), cc); + } + } + + protected void serviceInit(Configuration conf) throws Exception { + isEnabled = conf.getBoolean( + YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); + if (!isEnabled) { + LOG.info("Timeline service is not enabled"); + } else { + if (YarnConfiguration.useHttps(conf)) { + resURI = URI + .create(JOINER.join("https://", conf.get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS), + RESOURCE_URI_STR)); + } else { + resURI = URI.create(JOINER.join("http://", conf.get( + YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS), + RESOURCE_URI_STR)); + } + if (UserGroupInformation.isSecurityEnabled()) { + urlFactory.setService(TimelineUtils.buildTimelineTokenService(conf)); + } + LOG.info("Timeline service address: " + resURI); + } + super.serviceInit(conf); + } + + @Override + public TimelinePutResponse putEntities( + TimelineEntity... entities) throws IOException, YarnException { + if (!isEnabled) { + if (LOG.isDebugEnabled()) { + LOG.debug("Nothing will be put because timeline service is not enabled"); + } + return new TimelinePutResponse(); + } + TimelineEntities entitiesContainer = new TimelineEntities(); + entitiesContainer.addEntities(Arrays.asList(entities)); + ClientResponse resp; + try { + resp = doPostingEntities(entitiesContainer); + } catch (RuntimeException re) { + // runtime exception is expected if the client cannot connect the server + String msg = + "Failed to get the response from the timeline server."; + LOG.error(msg, re); + throw re; + } + if (resp == null || + resp.getClientResponseStatus() != ClientResponse.Status.OK) { + String msg = + "Failed to get the response from the timeline server."; + LOG.error(msg); + if (LOG.isDebugEnabled() && resp != null) { + String output = resp.getEntity(String.class); + LOG.debug("HTTP error code: " + resp.getStatus() + + " Server response : \n" + output); + } + throw new YarnException(msg); + } + return resp.getEntity(TimelinePutResponse.class); + } + + @Override + public Token getDelegationToken( + String renewer) throws IOException, YarnException { + return TimelineAuthenticator.getDelegationToken(resURI.toURL(), + urlFactory.token, renewer); + } + + @Private + @VisibleForTesting + public ClientResponse doPostingEntities(TimelineEntities entities) { + WebResource webResource = client.resource(resURI); + return webResource.accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON) + .post(ClientResponse.class, entities); + } + + private static class PseudoAuthenticatedURLConnectionFactory + implements HttpURLConnectionFactory { + + @Override + public HttpURLConnection getHttpURLConnection(URL url) throws IOException { + Map params = new HashMap(); + params.put(URL_PARAM_USER_NAME, + UserGroupInformation.getCurrentUser().getShortUserName()); + url = TimelineAuthenticator.appendParams(url, params); + if (LOG.isDebugEnabled()) { + LOG.debug("URL with delegation token: " + url); + } + return (HttpURLConnection) url.openConnection(); + } + + } + private static class KerberosAuthenticatedURLConnectionFactory + implements HttpURLConnectionFactory { + + private AuthenticatedURL.Token token; + private TimelineAuthenticator authenticator; + private Token dToken; + private Text service; + + public KerberosAuthenticatedURLConnectionFactory() { + token = new AuthenticatedURL.Token(); + authenticator = new TimelineAuthenticator(); + } + + @Override + public HttpURLConnection getHttpURLConnection(URL url) throws IOException { + try { + if (dToken == null) { + //TODO: need to take care of the renew case + dToken = selectToken(); + if (LOG.isDebugEnabled()) { + LOG.debug("Timeline delegation token: " + dToken.toString()); + } + } + if (dToken != null) { + Map params = new HashMap(); + TimelineAuthenticator.injectDelegationToken(params, dToken); + url = TimelineAuthenticator.appendParams(url, params); + if (LOG.isDebugEnabled()) { + LOG.debug("URL with delegation token: " + url); + } + } + return new AuthenticatedURL(authenticator).openConnection(url, token); + } catch (AuthenticationException e) { + LOG.error("Authentication failed when openning connection [" + url + + "] with token [" + token + "].", e); + throw new IOException(e); + } + } + + private Token selectToken() { + UserGroupInformation ugi; + try { + ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + String msg = "Error when getting the current user"; + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); + } + TimelineDelegationTokenSelector tokenSelector = + new TimelineDelegationTokenSelector(); + return tokenSelector.selectToken( + service, ugi.getCredentials().getAllTokens()); + } + + public void setService(Text service) { + this.service = service; + } + + } + + public static void main(String[] argv) throws Exception { + CommandLine cliParser = new GnuParser().parse(opts, argv); + if (cliParser.hasOption("put")) { + String path = cliParser.getOptionValue("put"); + if (path != null && path.length() > 0) { + putTimelineEntitiesInJSONFile(path); + return; + } + } + printUsage(); + } + + /** + * Put timeline data in a JSON file via command line. + * + * @param path + * path to the {@link TimelineEntities} JSON file + */ + private static void putTimelineEntitiesInJSONFile(String path) { + File jsonFile = new File(path); + if (!jsonFile.exists()) { + System.out.println("Error: File [" + jsonFile.getAbsolutePath() + + "] doesn't exist"); + return; + } + ObjectMapper mapper = new ObjectMapper(); + YarnJacksonJaxbJsonProvider.configObjectMapper(mapper); + TimelineEntities entities = null; + try { + entities = mapper.readValue(jsonFile, TimelineEntities.class); + } catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + e.printStackTrace(System.err); + return; + } + Configuration conf = new YarnConfiguration(); + TimelineClient client = TimelineClient.createTimelineClient(); + client.init(conf); + client.start(); + try { + if (UserGroupInformation.isSecurityEnabled() + && conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false)) { + Token token = + client.getDelegationToken( + UserGroupInformation.getCurrentUser().getUserName()); + UserGroupInformation.getCurrentUser().addToken(token); + } + TimelinePutResponse response = client.putEntities( + entities.getEntities().toArray( + new TimelineEntity[entities.getEntities().size()])); + if (response.getErrors().size() == 0) { + System.out.println("Timeline data is successfully put"); + } else { + for (TimelinePutResponse.TimelinePutError error : response.getErrors()) { + System.out.println("TimelineEntity [" + error.getEntityType() + ":" + + error.getEntityId() + "] is not successfully put. Error code: " + + error.getErrorCode()); + } + } + } catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + e.printStackTrace(System.err); + } finally { + client.stop(); + } + } + + /** + * Helper function to print out usage + */ + private static void printUsage() { + new HelpFormatter().printHelp("TimelineClient", opts); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java new file mode 100644 index 0000000..89b5b6b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.client.api.impl; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java new file mode 100644 index 0000000..efd7229 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Public +package org.apache.hadoop.yarn.client.api; +import org.apache.hadoop.classification.InterfaceAudience; + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java new file mode 100644 index 0000000..19aaa88 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineAuthenticator.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import java.net.URL; + +import org.junit.Assert; +import org.junit.Test; + +public class TestTimelineAuthenticator { + + @Test + public void testHasDelegationTokens() throws Exception { + TimelineAuthenticator authenticator = new TimelineAuthenticator(); + Assert.assertFalse(authenticator.hasDelegationToken(new URL( + "http://localhost:8/resource"))); + Assert.assertFalse(authenticator.hasDelegationToken(new URL( + "http://localhost:8/resource?other=xxxx"))); + Assert.assertTrue(authenticator.hasDelegationToken(new URL( + "http://localhost:8/resource?delegation=yyyy"))); + Assert.assertTrue(authenticator.hasDelegationToken(new URL( + "http://localhost:8/resource?other=xxxx&delegation=yyyy"))); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java new file mode 100644 index 0000000..3c5272a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.net.ConnectException; + +import org.junit.Assert; + +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; + +public class TestTimelineClient { + + private TimelineClientImpl client; + + @Before + public void setup() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + client = createTimelineClient(conf); + } + + @After + public void tearDown() { + if (client != null) { + client.stop(); + } + } + + @Test + public void testPostEntities() throws Exception { + mockClientResponse(client, ClientResponse.Status.OK, false, false); + try { + TimelinePutResponse response = client.putEntities(generateEntity()); + Assert.assertEquals(0, response.getErrors().size()); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + } + + @Test + public void testPostEntitiesWithError() throws Exception { + mockClientResponse(client, ClientResponse.Status.OK, true, false); + try { + TimelinePutResponse response = client.putEntities(generateEntity()); + Assert.assertEquals(1, response.getErrors().size()); + Assert.assertEquals("test entity id", response.getErrors().get(0) + .getEntityId()); + Assert.assertEquals("test entity type", response.getErrors().get(0) + .getEntityType()); + Assert.assertEquals(TimelinePutResponse.TimelinePutError.IO_EXCEPTION, + response.getErrors().get(0).getErrorCode()); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + } + + @Test + public void testPostEntitiesNoResponse() throws Exception { + mockClientResponse( + client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false); + try { + client.putEntities(generateEntity()); + Assert.fail("Exception is expected"); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().contains( + "Failed to get the response from the timeline server.")); + } + } + + @Test + public void testPostEntitiesConnectionRefused() throws Exception { + mockClientResponse(client, null, false, true); + try { + client.putEntities(generateEntity()); + Assert.fail("RuntimeException is expected"); + } catch (RuntimeException re) { + Assert.assertTrue(re instanceof ClientHandlerException); + } + } + + @Test + public void testPostEntitiesTimelineServiceNotEnabled() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false); + TimelineClientImpl client = createTimelineClient(conf); + mockClientResponse( + client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false); + try { + TimelinePutResponse response = client.putEntities(generateEntity()); + Assert.assertEquals(0, response.getErrors().size()); + } catch (YarnException e) { + Assert.fail( + "putEntities should already return before throwing the exception"); + } + } + + @Test + public void testPostEntitiesTimelineServiceDefaultNotEnabled() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + // Unset the timeline service's enabled properties. + // Make sure default value is pickup up + conf.unset(YarnConfiguration.TIMELINE_SERVICE_ENABLED); + TimelineClientImpl client = createTimelineClient(conf); + mockClientResponse(client, ClientResponse.Status.INTERNAL_SERVER_ERROR, + false, false); + try { + TimelinePutResponse response = client.putEntities(generateEntity()); + Assert.assertEquals(0, response.getErrors().size()); + } catch (YarnException e) { + Assert + .fail("putEntities should already return before throwing the exception"); + } + } + + private static ClientResponse mockClientResponse(TimelineClientImpl client, + ClientResponse.Status status, boolean hasError, boolean hasRuntimeError) { + ClientResponse response = mock(ClientResponse.class); + if (hasRuntimeError) { + doThrow(new ClientHandlerException(new ConnectException())).when(client) + .doPostingEntities(any(TimelineEntities.class)); + return response; + } + doReturn(response).when(client) + .doPostingEntities(any(TimelineEntities.class)); + when(response.getClientResponseStatus()).thenReturn(status); + TimelinePutResponse.TimelinePutError error = + new TimelinePutResponse.TimelinePutError(); + error.setEntityId("test entity id"); + error.setEntityType("test entity type"); + error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION); + TimelinePutResponse putResponse = new TimelinePutResponse(); + if (hasError) { + putResponse.addError(error); + } + when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse); + return response; + } + + private static TimelineEntity generateEntity() { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId("entity id"); + entity.setEntityType("entity type"); + entity.setStartTime(System.currentTimeMillis()); + for (int i = 0; i < 2; ++i) { + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType("test event type " + i); + event.addEventInfo("key1", "val1"); + event.addEventInfo("key2", "val2"); + entity.addEvent(event); + } + entity.addRelatedEntity("test ref type 1", "test ref id 1"); + entity.addRelatedEntity("test ref type 2", "test ref id 2"); + entity.addPrimaryFilter("pkey1", "pval1"); + entity.addPrimaryFilter("pkey2", "pval2"); + entity.addOtherInfo("okey1", "oval1"); + entity.addOtherInfo("okey2", "oval2"); + return entity; + } + + private static TimelineClientImpl createTimelineClient( + YarnConfiguration conf) { + TimelineClientImpl client = + spy((TimelineClientImpl) TimelineClient.createTimelineClient()); + client.init(conf); + client.start(); + return client; + } + +}