diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java new file mode 100644 index 0000000..39504cc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.timelineservice; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.HashSet; +import java.util.Set; + +@XmlRootElement(name = "entities") +@XmlAccessorType(XmlAccessType.NONE) +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TimelineEntities { + + private Set entities = new HashSet<>(); + + public TimelineEntities() { + + } + + @XmlElement(name = "entities") + public Set getEntities() { + return entities; + } + + public void setEntities(Set entities) { + this.entities = entities; + } + + public void addEntities(Set entities) { + this.entities.addAll(entities); + } + + public void addEntity(TimelineEntity entity) { + entities.add(entity); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index 0313f9e..1168d4b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -132,4 +132,38 @@ public abstract long renewDelegationToken( public abstract void cancelDelegationToken( Token timelineDT) throws IOException, YarnException; + + /** + *

+ * Send the information of a number of conceptual entities to the timeline + * aggregator. It is a blocking API. The method will not return until all the + * put entities have been persisted. + *

+ * + * @param entities + * the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * @throws IOException + * @throws YarnException + */ + @Public + public abstract void putEntities( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException; + + /** + *

+ * Send the information of a number of conceptual entities to the timeline + * aggregator. It is an asynchronous API. The method will return once all the + * entities are received. + *

+ * + * @param entities + * the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * @throws IOException + * @throws YarnException + */ + @Public + public abstract void putEntitiesAsync( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index af68492..680992a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -35,6 +35,7 @@ import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -54,6 +55,7 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -80,6 +82,8 @@ import com.sun.jersey.api.client.filter.ClientFilter; import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; +import com.sun.jersey.core.util.MultivaluedMapImpl; + @Private @Unstable @@ -107,7 +111,10 @@ private ConnectionConfigurator connConfigurator; private DelegationTokenAuthenticator authenticator; private DelegationTokenAuthenticatedURL.Token token; + //TODO: It needs to be updated by the discovery service private URI resURI; + //TODO: If TimelineClient is used by the app, the context appId needs to be set + private ApplicationId contextAppId; @Private @VisibleForTesting @@ -294,6 +301,30 @@ public TimelinePutResponse putEntities( return resp.getEntity(TimelinePutResponse.class); } + @Override + public void putEntities( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities + entitiesContainer = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities(); + MultivaluedMap params = new MultivaluedMapImpl(); + params.add("appid", contextAppId.toString()); + putObjects(resURI, "entities", params, entitiesContainer); + } + + @Override + public void putEntitiesAsync( + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities) + throws IOException, YarnException { + org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities + entitiesContainer = + new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities(); + MultivaluedMap params = new MultivaluedMapImpl(); + params.add("async", "true"); + params.add("appid", contextAppId.toString()); + putObjects(resURI, "entities", params, entitiesContainer); + } @Override public void putDomain(TimelineDomain domain) throws IOException, @@ -301,6 +332,36 @@ public void putDomain(TimelineDomain domain) throws IOException, doPosting(domain, "domain"); } + private void putObjects( + URI base, String path, MultivaluedMap params, Object obj) + throws IOException, YarnException { + ClientResponse resp; + try { + resp = client.resource(resURI).path(path).queryParams(params) + .accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON) + .put(ClientResponse.class, obj); + } catch (RuntimeException re) { + // runtime exception is expected if the client cannot connect the server + String msg = + "Failed to get the response from the timeline server."; + LOG.error(msg, re); + throw new IOException(re); + } + if (resp == null || + resp.getClientResponseStatus() != ClientResponse.Status.OK) { + String msg = + "Failed to get the response from the timeline server."; + LOG.error(msg); + if (LOG.isDebugEnabled() && resp != null) { + String output = resp.getEntity(String.class); + LOG.debug("HTTP error code: " + resp.getStatus() + + " Server response : \n" + output); + } + throw new YarnException(msg); + } + } + private ClientResponse doPosting(Object obj, String path) throws IOException, YarnException { ClientResponse resp; try { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/timeline/.keep hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/timeline/.keep new file mode 100644 index 0000000..e69de29 diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java index 6bab239..4f8ab94 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java @@ -76,6 +76,13 @@ public void testTimelineEntities() throws Exception { entity.addIsRelatedToEntity("test type 4", "test id 4"); entity.addIsRelatedToEntity("test type 5", "test id 5"); LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entity, true)); + + TimelineEntities entities = new TimelineEntities(); + TimelineEntity entity1 = new TimelineEntity(); + entities.addEntity(entity1); + TimelineEntity entity2 = new TimelineEntity(); + entities.addEntity(entity2); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entities, true)); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java index 994c66f..46e5574 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java @@ -25,8 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; /** * Service that handles writes to the timeline service and writes them to the @@ -70,16 +69,14 @@ protected void serviceStop() throws Exception { * * @param entities entities to post * @param callerUgi the caller UGI - * @return the response that contains the result of the post. */ - public TimelinePutResponse postEntities(TimelineEntities entities, + public void postEntities(TimelineEntities entities, UserGroupInformation callerUgi) { // TODO implement if (LOG.isDebugEnabled()) { LOG.debug("postEntities(entities=" + entities + ", callerUgi=" + callerUgi + ")"); } - return null; } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java index 2d96699..28e6a52 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java @@ -20,12 +20,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.Consumes; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.WebApplicationException; +import javax.ws.rs.*; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -40,14 +35,17 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.NotFoundException; import com.google.inject.Inject; import com.google.inject.Singleton; +import java.net.URI; + /** * The main per-node REST end point for timeline service writes. It is * essentially a container service that routes requests to the appropriate @@ -112,11 +110,14 @@ public AboutInfo about( * the request to the app level aggregator. It expects an application as a * context. */ - @POST + @PUT + @Path("/entities") @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) - public TimelinePutResponse postEntities( + public Response putEntities( @Context HttpServletRequest req, @Context HttpServletResponse res, + @QueryParam("async") String async, + @QueryParam("appid") String appId, TimelineEntities entities) { init(res); UserGroupInformation callerUgi = getUser(req); @@ -127,13 +128,20 @@ public TimelinePutResponse postEntities( } // TODO how to express async posts and handle them + boolean isAsync = async != null && async.trim().equalsIgnoreCase("true"); + try { - AppLevelAggregatorService service = getAggregatorService(req); + appId = parseApplicationId(appId); + if (appId == null) { + return Response.status(Response.Status.BAD_REQUEST).build(); + } + AppLevelAggregatorService service = serviceManager.getService(appId); if (service == null) { LOG.error("Application not found"); throw new NotFoundException(); // different exception? } - return service.postEntities(entities, callerUgi); + service.postEntities(entities, callerUgi); + return Response.ok().build(); } catch (Exception e) { LOG.error("Error putting entities", e); throw new WebApplicationException(e, @@ -141,16 +149,18 @@ public TimelinePutResponse postEntities( } } - private AppLevelAggregatorService - getAggregatorService(HttpServletRequest req) { - String appIdString = getApplicationId(req); - return serviceManager.getService(appIdString); - } - - private String getApplicationId(HttpServletRequest req) { - // TODO the application id from the request - // (most likely from the URI) - return null; + private String parseApplicationId(String appId) { + // Make sure the appId is not null and is valid + ApplicationId appID; + try { + if (appId != null) { + return ConverterUtils.toApplicationId(appId.trim()).toString(); + } else { + return null; + } + } catch (Exception e) { + return null; + } } private void init(HttpServletResponse response) {