timelineDT)
throws IOException, YarnException;
+
+ /**
+ *
+ * Send the information of a number of conceptual entities to the timeline
+ * aggregator. It is a blocking API. The method will not return until all the
+ * put entities have been persisted.
+ *
+ *
+ * @param entities
+ * the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ public abstract void putEntities(
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+ throws IOException, YarnException;
+
+ /**
+ *
+ * Send the information of a number of conceptual entities to the timeline
+ * aggregator. It is an asynchronous API. The method will return once all the
+ * entities are received.
+ *
+ *
+ * @param entities
+ * the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * @throws IOException
+ * @throws YarnException
+ */
+ @Public
+ public abstract void putEntitiesAsync(
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+ throws IOException, YarnException;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index af68492..a6536e2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -35,6 +35,7 @@
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
@@ -54,6 +55,7 @@
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -80,13 +82,15 @@
import com.sun.jersey.api.client.filter.ClientFilter;
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
@Private
@Unstable
public class TimelineClientImpl extends TimelineClient {
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
- private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
+ private static final String RESOURCE_URI_STR = "/ws/v2/timeline/";
private static final Joiner JOINER = Joiner.on("");
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
@@ -107,6 +111,7 @@
private ConnectionConfigurator connConfigurator;
private DelegationTokenAuthenticator authenticator;
private DelegationTokenAuthenticatedURL.Token token;
+ //TODO: It needs to be updated by the discovery service
private URI resURI;
@Private
@@ -248,7 +253,11 @@ public boolean shouldRetryOn(Exception e) {
}
public TimelineClientImpl() {
- super(TimelineClientImpl.class.getName());
+ super(TimelineClientImpl.class.getName(), null);
+ }
+
+ public TimelineClientImpl(ApplicationId applicationId) {
+ super(TimelineClientImpl.class.getName(), applicationId);
}
protected void serviceInit(Configuration conf) throws Exception {
@@ -294,6 +303,38 @@ public TimelinePutResponse putEntities(
return resp.getEntity(TimelinePutResponse.class);
}
+ @Override
+ public void putEntities(
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+ throws IOException, YarnException {
+ putEntities(false, entities);
+ }
+
+ @Override
+ public void putEntitiesAsync(
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+ throws IOException, YarnException {
+ putEntities(true, entities);
+ }
+
+ private void putEntities(boolean async,
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+ throws IOException, YarnException {
+ org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
+ entitiesContainer =
+ new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();
+ for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) {
+ entitiesContainer.addEntity(entity);
+ }
+ MultivaluedMap params = new MultivaluedMapImpl();
+ if (contextAppId != null) {
+ params.add("appid", contextAppId.toString());
+ }
+ if (async) {
+ params.add("async", Boolean.TRUE.toString());
+ }
+ putObjects(resURI, "entities", params, entitiesContainer);
+ }
@Override
public void putDomain(TimelineDomain domain) throws IOException,
@@ -301,6 +342,36 @@ public void putDomain(TimelineDomain domain) throws IOException,
doPosting(domain, "domain");
}
+ private void putObjects(
+ URI base, String path, MultivaluedMap params, Object obj)
+ throws IOException, YarnException {
+ ClientResponse resp;
+ try {
+ resp = client.resource(resURI).path(path).queryParams(params)
+ .accept(MediaType.APPLICATION_JSON)
+ .type(MediaType.APPLICATION_JSON)
+ .put(ClientResponse.class, obj);
+ } catch (RuntimeException re) {
+ // runtime exception is expected if the client cannot connect the server
+ String msg =
+ "Failed to get the response from the timeline server.";
+ LOG.error(msg, re);
+ throw new IOException(re);
+ }
+ if (resp == null ||
+ resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+ String msg =
+ "Failed to get the response from the timeline server.";
+ LOG.error(msg);
+ if (LOG.isDebugEnabled() && resp != null) {
+ String output = resp.getEntity(String.class);
+ LOG.debug("HTTP error code: " + resp.getStatus()
+ + " Server response : \n" + output);
+ }
+ throw new YarnException(msg);
+ }
+ }
+
private ClientResponse doPosting(Object obj, String path) throws IOException, YarnException {
ClientResponse resp;
try {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/timeline/.keep hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/timeline/.keep
new file mode 100644
index 0000000..e69de29
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
index 6bab239..4f8ab94 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
@@ -76,6 +76,13 @@ public void testTimelineEntities() throws Exception {
entity.addIsRelatedToEntity("test type 4", "test id 4");
entity.addIsRelatedToEntity("test type 5", "test id 5");
LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entity, true));
+
+ TimelineEntities entities = new TimelineEntities();
+ TimelineEntity entity1 = new TimelineEntity();
+ entities.addEntity(entity1);
+ TimelineEntity entity2 = new TimelineEntity();
+ entities.addEntity(entity2);
+ LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entities, true));
}
@Test
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
index 2ac274d..ae5efa5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
@@ -88,6 +88,18 @@
org.apache.hadoop
+ hadoop-yarn-server-timelineservice
+
+
+
+ org.apache.hadoop
+ hadoop-yarn-server-timelineservice
+ test-jar
+ test
+
+
+
+ org.apache.hadoop
hadoop-minikdc
test
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
new file mode 100644
index 0000000..920f337
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
@@ -0,0 +1,55 @@
+package org.apache.hadoop.yarn.server.timelineservice;
+
+
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestTimelineServiceClientIntegration {
+ private static PerNodeAggregatorServer server;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ try {
+ server = PerNodeAggregatorServer.launchServer(new String[0]);
+ server.addApplication(ApplicationId.newInstance(0, 1));
+ } catch (ExitUtil.ExitException e) {
+ fail();
+ }
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ if (server != null) {
+ server.stop();
+ }
+ }
+
+ @Test
+ public void testPutEntities() throws Exception {
+ TimelineClient client =
+ TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
+ try {
+ client.init(new YarnConfiguration());
+ client.start();
+ TimelineEntity entity = new TimelineEntity();
+ entity.setType("test entity type");
+ entity.setId("test entity id");
+ client.putEntities(entity);
+ client.putEntitiesAsync(entity);
+ } catch(Exception e) {
+ fail();
+ } finally {
+ client.stop();
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index 3154ca3..26790f1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -114,6 +114,17 @@
+
+ maven-jar-plugin
+
+
+
+ test-jar
+
+ test-compile
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
index 994c66f..46e5574 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
@@ -25,8 +25,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
/**
* Service that handles writes to the timeline service and writes them to the
@@ -70,16 +69,14 @@ protected void serviceStop() throws Exception {
*
* @param entities entities to post
* @param callerUgi the caller UGI
- * @return the response that contains the result of the post.
*/
- public TimelinePutResponse postEntities(TimelineEntities entities,
+ public void postEntities(TimelineEntities entities,
UserGroupInformation callerUgi) {
// TODO implement
if (LOG.isDebugEnabled()) {
LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
callerUgi + ")");
}
- return null;
}
/**
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
index 6371e82..ef30b22 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
+import com.google.inject.Inject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -39,9 +40,7 @@
import org.apache.hadoop.yarn.server.api.ContainerContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-import org.apache.hadoop.yarn.webapp.WebApp;
-import org.apache.hadoop.yarn.webapp.WebApps;
-import org.apache.hadoop.yarn.webapp.YarnWebParams;
+import org.apache.hadoop.yarn.webapp.*;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -120,6 +119,8 @@ private void startWebApp() {
extends WebApp implements YarnWebParams {
@Override
public void setup() {
+ bind(YarnJacksonJaxbJsonProvider.class);
+ bind(GenericExceptionHandler.class);
bind(PerNodeAggregatorWebService.class);
// bind to the global singleton
bind(AppLevelServiceManager.class).
@@ -214,7 +215,7 @@ public ByteBuffer getMetaData() {
}
@VisibleForTesting
- static PerNodeAggregatorServer launchServer(String[] args) {
+ public static PerNodeAggregatorServer launchServer(String[] args) {
Thread
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args,
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
index 2d96699..28e6a52 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
@@ -20,12 +20,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -40,14 +35,17 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import java.net.URI;
+
/**
* The main per-node REST end point for timeline service writes. It is
* essentially a container service that routes requests to the appropriate
@@ -112,11 +110,14 @@ public AboutInfo about(
* the request to the app level aggregator. It expects an application as a
* context.
*/
- @POST
+ @PUT
+ @Path("/entities")
@Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
- public TimelinePutResponse postEntities(
+ public Response putEntities(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
+ @QueryParam("async") String async,
+ @QueryParam("appid") String appId,
TimelineEntities entities) {
init(res);
UserGroupInformation callerUgi = getUser(req);
@@ -127,13 +128,20 @@ public TimelinePutResponse postEntities(
}
// TODO how to express async posts and handle them
+ boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
+
try {
- AppLevelAggregatorService service = getAggregatorService(req);
+ appId = parseApplicationId(appId);
+ if (appId == null) {
+ return Response.status(Response.Status.BAD_REQUEST).build();
+ }
+ AppLevelAggregatorService service = serviceManager.getService(appId);
if (service == null) {
LOG.error("Application not found");
throw new NotFoundException(); // different exception?
}
- return service.postEntities(entities, callerUgi);
+ service.postEntities(entities, callerUgi);
+ return Response.ok().build();
} catch (Exception e) {
LOG.error("Error putting entities", e);
throw new WebApplicationException(e,
@@ -141,16 +149,18 @@ public TimelinePutResponse postEntities(
}
}
- private AppLevelAggregatorService
- getAggregatorService(HttpServletRequest req) {
- String appIdString = getApplicationId(req);
- return serviceManager.getService(appIdString);
- }
-
- private String getApplicationId(HttpServletRequest req) {
- // TODO the application id from the request
- // (most likely from the URI)
- return null;
+ private String parseApplicationId(String appId) {
+ // Make sure the appId is not null and is valid
+ ApplicationId appID;
+ try {
+ if (appId != null) {
+ return ConverterUtils.toApplicationId(appId.trim()).toString();
+ } else {
+ return null;
+ }
+ } catch (Exception e) {
+ return null;
+ }
}
private void init(HttpServletResponse response) {