diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java index 0313f9e..3c2c7cb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.client.api; import java.io.IOException; +import java.util.Collection; +import java.util.Set; +import java.util.SortedSet; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -27,10 +30,14 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.hadoop.yarn.util.timeline.NameValuePair; /** * A client library that can be used to post some information in terms of a @@ -132,4 +139,133 @@ public abstract long renewDelegationToken( public abstract void cancelDelegationToken( Token timelineDT) throws IOException, YarnException; + + /** + *

+ * Retrieves the entity information for a given entity. + *

+ * + * @param entityType + * The type of the entity. + * @param entityId + * The entity whose information will be retrieved. + * @param fieldsToRetrieve + * Specifies which fields of the entity object to retrieve ("EVENTS", + * "RELATEDENTITIES", "PRIMARYFILTERS", "OTHERINFO", "LASTEVENTONLY") + * . If null, retrieves all fields. + * @return An {@link TimelineEntity} object. + */ + @Public + public abstract TimelineEntity getEntity( + String entityType, String entityId, Set fieldsToRetrieve) + throws IOException, YarnException; + + /** + * This method retrieves a list of entity information, {@link TimelineEntity}, + * sorted by the starting timestamp for the entity, descending. The starting + * timestamp of an entity is a timestamp specified by the client. + * + * @param entityType + * The type of entities to return (required). + * @param limit + * A limit on the number of entities to return. + * @param windowStart + * The earliest start timestamp to retrieve (exclusive). If null, + * defaults to retrieving all entities until the limit is reached. + * @param windowEnd + * The latest start timestamp to retrieve (inclusive). If null, + * defaults to {@link Long#MAX_VALUE} + * @param fromId + * If fromId is not null, retrieve entities earlier than and + * including the specified ID. If no start time is found for the + * specified ID, an empty list of entities will be returned. The + * windowEnd parameter will take precedence if the start time of this + * entity falls later than windowEnd. + * @param fromTs + * If fromTs is not null, ignore entities that were inserted into the + * store after the given timestamp. The entity's insert timestamp + * used for this comparison is the store's system time when the first + * put for the entity was received (not the entity's start time). + * @param primaryFilter + * Retrieves only entities that have the specified primary filter. If + * null, retrieves all entities. This is an indexed retrieval, and no + * entities that do not match the filter are scanned. + * @param secondaryFilters + * Retrieves only entities that have exact matches for all the + * specified filters in their primary filters or other info. This is + * not an indexed retrieval, so all entities are scanned but only + * those matching the filters are returned. + * @param fieldsToRetrieve + * Specifies which fields of the entity object to retrieve ("EVENTS", + * "RELATEDENTITIES", "PRIMARYFILTERS", "OTHERINFO", "LASTEVENTONLY") + * . If null, retrieves all fields. + * @return An {@link TimelineEntities} object. + * @throws IOException + * @throws YarnException + */ + @Public + public abstract TimelineEntities getEntities(String entityType, + Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, + NameValuePair primaryFilter, Collection secondaryFilters, + Set fieldsToRetrieve) throws IOException, YarnException; + + /** + * This method retrieves the events for a list of entities all of the same + * entity type. The events for each entity are sorted in order of their + * timestamps, descending. + * + * @param entityType + * The type of entities to retrieve events for. + * @param entityIds + * The entity IDs to retrieve events for. + * @param limit + * A limit on the number of events to return for each entity. + * @param windowStart + * If not null, retrieves only events later than the given time + * (exclusive) + * @param windowEnd + * If not null, retrieves only events earlier than the given time + * (inclusive) + * @param eventTypes + * Restricts the events returned to the given types. If null, events + * of all types will be returned. + * @return An {@link TimelineEvents} object. + * @throws IOException + */ + @Public + public abstract TimelineEvents getEvents(String entityType, + SortedSet entityIds, Long limit, Long windowStart, + Long windowEnd, Set eventTypes) throws IOException, YarnException; + + /** + *

+ * Retrieves the domain information for a given ID. + *

+ * + * @param domainId + * The domain id + * @return a {@link TimelineDomain} object. + * @throws IOException + * @throws org.apache.hadoop.yarn.exceptions.YarnException + */ + @Public + public abstract TimelineDomain getDomain( + String domainId) throws IOException, YarnException; + + /** + *

+ * Retrieves all the domains that belong to a given owner. + * The domains are sorted according to the created time firstly and the + * modified time secondly in descending order. + *

+ * + * @param owner + * The domain owner. + * @return a {@link TimelineDomain} object. + * @throws IOException + * @throws org.apache.hadoop.yarn.exceptions.YarnException + */ + @Public + public abstract TimelineDomains getDomains(String owner) + throws IOException, YarnException; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java index 605d60b..05df0d6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java @@ -29,11 +29,15 @@ import java.security.GeneralSecurityException; import java.security.PrivilegedExceptionAction; import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.SortedSet; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; @@ -56,14 +60,18 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.hadoop.yarn.util.timeline.NameValuePair; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -78,6 +86,7 @@ import com.sun.jersey.api.client.filter.ClientFilter; import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; +import com.sun.jersey.core.util.MultivaluedMapImpl; @Private @Unstable @@ -308,6 +317,104 @@ public void putDomain(TimelineDomain domain) throws IOException, doPosting(domain, "domain"); } + @Override + public TimelineEntity getEntity(String entityType, String entityId, + Set fieldsToRetrieve) throws IOException, YarnException { + MultivaluedMap queryParams = new MultivaluedMapImpl(); + if (fieldsToRetrieve != null) { + queryParams.add("fields", setToCommaSeparated(fieldsToRetrieve)); + } + ClientResponse response = + doGettingJson(entityType + "/" + entityId, queryParams); + return response.getEntity(TimelineEntity.class); + } + + @Override + public TimelineEntities getEntities(String entityType, + Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, + NameValuePair primaryFilter, Collection secondaryFilters, + Set fieldsToRetrieve) throws IOException, YarnException { + MultivaluedMap queryParams = new MultivaluedMapImpl(); + if (limit != null) { + queryParams.add("limit", limit.toString()); + } + if (windowStart != null) { + queryParams.add("windowStart", windowStart.toString()); + } + if (windowEnd != null) { + queryParams.add("windowEnd", windowEnd.toString()); + } + if (fromId != null) { + queryParams.add("fromid", fromId); + } + if (fromTs != null) { + queryParams.add("fromTs", fromTs.toString()); + } + if (primaryFilter != null) { + queryParams.add("primaryFilter", + primaryFilter.getName() + ":" + primaryFilter.getValue()); + } + if (secondaryFilters != null) { + StringBuilder sb = new StringBuilder(); + for (NameValuePair nvp : secondaryFilters) { + sb.append(nvp.getName()).append(":").append(nvp.getValue()).append(","); + } + if (sb.length() > 0) { + sb.setLength(sb.length() - 1); + } + queryParams.add("secondaryFilter", sb.toString()); + } + if (fieldsToRetrieve != null) { + queryParams.add("fields", setToCommaSeparated(fieldsToRetrieve)); + } + ClientResponse response = doGettingJson(entityType, queryParams); + return response.getEntity(TimelineEntities.class); + } + + @Override + public TimelineEvents getEvents(String entityType, + SortedSet entityIds, Long limit, Long windowStart, + Long windowEnd, Set eventTypes) + throws IOException, YarnException { + MultivaluedMap queryParams = new MultivaluedMapImpl(); + if (entityIds != null) { + queryParams.add("entityId", setToCommaSeparated(entityIds)); + } + if (limit != null) { + queryParams.add("limit", limit.toString()); + } + if (windowStart != null) { + queryParams.add("windowStart", windowStart.toString()); + } + if (windowEnd != null) { + queryParams.add("windowEnd", windowEnd.toString()); + } + if (eventTypes != null) { + queryParams.add("eventType", setToCommaSeparated(eventTypes)); + } + ClientResponse response = + doGettingJson(entityType + "/events", queryParams); + return response.getEntity(TimelineEvents.class); + } + + @Override + public TimelineDomain getDomain(String domainId) + throws IOException,YarnException { + ClientResponse response = doGettingJson("domain/" + domainId, + new MultivaluedMapImpl()); + return response.getEntity(TimelineDomain.class); + } + + @Override + public TimelineDomains getDomains(String owner) throws IOException, YarnException { + MultivaluedMap queryParams = new MultivaluedMapImpl(); + if (owner != null) { + queryParams.add("owner", owner); + } + ClientResponse response = doGettingJson("domain", queryParams); + return response.getEntity(TimelineDomains.class); + } + private ClientResponse doPosting(Object obj, String path) throws IOException, YarnException { ClientResponse resp; try { @@ -474,6 +581,47 @@ public ClientResponse doPostingObject(Object object, String path) { } } + @Private + @VisibleForTesting + public ClientResponse doGettingJson(String path, + MultivaluedMap queryParams) throws YarnException { + WebResource webResource = client.resource(resURI); + if (path != null) { + ClientResponse response = null; + try { + response = webResource.path(path).queryParams(queryParams) + .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + } catch (RuntimeException re) { + // runtime exception is expected if the client cannot connect the server + String msg = + "Failed to get the response from the timeline server."; + LOG.error(msg, re); + throw re; + } + if (response == null || + response.getClientResponseStatus() != ClientResponse.Status.OK) { + if (response != null) { + try { + JSONObject json = new JSONObject(response.getEntity(String.class)); + if (LOG.isDebugEnabled()) { + LOG.debug("HTTP error code: " + response.getStatus() + + " Server response : \n" + json.toString()); + } + throw new YarnException(json.getString("message")); + } catch (JSONException je) { + // couldn't grab the actual exception, so fall back to a generic + // -sounding one + } + } + String msg = "Failed to get the response from the timeline server."; + LOG.error(msg); + throw new YarnException(msg); + } + return response; + } + throw new YarnRuntimeException("Unknown resource type"); + } + private class TimelineURLConnectionFactory implements HttpURLConnectionFactory { @@ -657,4 +805,15 @@ private static void printUsage() { new HelpFormatter().printHelp("TimelineClient", opts); } + private static String setToCommaSeparated(Set fieldsToRetrieve) { + StringBuilder sb = new StringBuilder(); + for (String field : fieldsToRetrieve) { + sb.append(field).append(","); + } + if (sb.length() > 0) { + sb.setLength(sb.length() - 1); + } + return sb.toString(); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/NameValuePair.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/NameValuePair.java new file mode 100644 index 0000000..35c4e80 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/NameValuePair.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.util.timeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A class holding a name and value pair, used for specifying filters. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class NameValuePair { + String name; + Object value; + + public NameValuePair(String name, Object value) { + this.name = name; + this.value = value; + } + + /** + * Get the name. + * @return The name. + */ + public String getName() { + + return name; + } + + /** + * Get the value. + * @return The value. + */ + public Object getValue() { + return value; + } + + @Override + public String toString() { + return "{ name: " + name + ", value: " + value + " }"; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java deleted file mode 100644 index 7da3bbf..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java +++ /dev/null @@ -1,375 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.client.api.impl; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import java.net.ConnectException; - -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; -import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.apache.hadoop.yarn.client.api.TimelineClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.ClientResponse; - -public class TestTimelineClient { - - private TimelineClientImpl client; - - @Before - public void setup() { - YarnConfiguration conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - client = createTimelineClient(conf); - } - - @After - public void tearDown() { - if (client != null) { - client.stop(); - } - } - - @Test - public void testPostEntities() throws Exception { - mockEntityClientResponse(client, ClientResponse.Status.OK, false, false); - try { - TimelinePutResponse response = client.putEntities(generateEntity()); - Assert.assertEquals(0, response.getErrors().size()); - } catch (YarnException e) { - Assert.fail("Exception is not expected"); - } - } - - @Test - public void testPostEntitiesWithError() throws Exception { - mockEntityClientResponse(client, ClientResponse.Status.OK, true, false); - try { - TimelinePutResponse response = client.putEntities(generateEntity()); - Assert.assertEquals(1, response.getErrors().size()); - Assert.assertEquals("test entity id", response.getErrors().get(0) - .getEntityId()); - Assert.assertEquals("test entity type", response.getErrors().get(0) - .getEntityType()); - Assert.assertEquals(TimelinePutResponse.TimelinePutError.IO_EXCEPTION, - response.getErrors().get(0).getErrorCode()); - } catch (YarnException e) { - Assert.fail("Exception is not expected"); - } - } - - @Test - public void testPostEntitiesNoResponse() throws Exception { - mockEntityClientResponse( - client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false); - try { - client.putEntities(generateEntity()); - Assert.fail("Exception is expected"); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage().contains( - "Failed to get the response from the timeline server.")); - } - } - - @Test - public void testPostEntitiesConnectionRefused() throws Exception { - mockEntityClientResponse(client, null, false, true); - try { - client.putEntities(generateEntity()); - Assert.fail("RuntimeException is expected"); - } catch (RuntimeException re) { - Assert.assertTrue(re instanceof ClientHandlerException); - } - } - - @Test - public void testPutDomain() throws Exception { - mockDomainClientResponse(client, ClientResponse.Status.OK, false); - try { - client.putDomain(generateDomain()); - } catch (YarnException e) { - Assert.fail("Exception is not expected"); - } - } - - @Test - public void testPutDomainNoResponse() throws Exception { - mockDomainClientResponse(client, ClientResponse.Status.FORBIDDEN, false); - try { - client.putDomain(generateDomain()); - Assert.fail("Exception is expected"); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage().contains( - "Failed to get the response from the timeline server.")); - } - } - - @Test - public void testPutDomainConnectionRefused() throws Exception { - mockDomainClientResponse(client, null, true); - try { - client.putDomain(generateDomain()); - Assert.fail("RuntimeException is expected"); - } catch (RuntimeException re) { - Assert.assertTrue(re instanceof ClientHandlerException); - } - } - - @Test - public void testCheckRetryCount() throws Exception { - try { - YarnConfiguration conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, - -2); - createTimelineClient(conf); - Assert.fail(); - } catch(IllegalArgumentException e) { - Assert.assertTrue(e.getMessage().contains( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES)); - } - - try { - YarnConfiguration conf = new YarnConfiguration(); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, - 0); - createTimelineClient(conf); - Assert.fail(); - } catch(IllegalArgumentException e) { - Assert.assertTrue(e.getMessage().contains( - YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS)); - } - int newMaxRetries = 5; - long newIntervalMs = 500; - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, - newMaxRetries); - conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, - newIntervalMs); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - TimelineClientImpl client = createTimelineClient(conf); - try { - // This call should fail because there is no timeline server - client.putEntities(generateEntity()); - Assert.fail("Exception expected! " - + "Timeline server should be off to run this test. "); - } catch (RuntimeException ce) { - Assert.assertTrue( - "Handler exception for reason other than retry: " + ce.getMessage(), - ce.getMessage().contains("Connection retries limit exceeded")); - // we would expect this exception here, check if the client has retried - Assert.assertTrue("Retry filter didn't perform any retries! ", client - .connectionRetry.retried); - } - } - - @Test - public void testDelegationTokenOperationsRetry() throws Exception { - int newMaxRetries = 5; - long newIntervalMs = 500; - YarnConfiguration conf = new YarnConfiguration(); - conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, - newMaxRetries); - conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, - newIntervalMs); - conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - // use kerberos to bypass the issue in HADOOP-11215 - conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, - "kerberos"); - UserGroupInformation.setConfiguration(conf); - - TimelineClientImpl client = createTimelineClient(conf); - TestTimlineDelegationTokenSecretManager dtManager = - new TestTimlineDelegationTokenSecretManager(); - try { - dtManager.startThreads(); - Thread.sleep(3000); - - try { - // try getting a delegation token - client.getDelegationToken( - UserGroupInformation.getCurrentUser().getShortUserName()); - assertFail(); - } catch (RuntimeException ce) { - assertException(client, ce); - } - - try { - // try renew a delegation token - TimelineDelegationTokenIdentifier timelineDT = - new TimelineDelegationTokenIdentifier( - new Text("tester"), new Text("tester"), new Text("tester")); - client.renewDelegationToken( - new Token(timelineDT, dtManager)); - assertFail(); - } catch (RuntimeException ce) { - assertException(client, ce); - } - - try { - // try cancel a delegation token - TimelineDelegationTokenIdentifier timelineDT = - new TimelineDelegationTokenIdentifier( - new Text("tester"), new Text("tester"), new Text("tester")); - client.cancelDelegationToken( - new Token(timelineDT, dtManager)); - assertFail(); - } catch (RuntimeException ce) { - assertException(client, ce); - } - } finally { - client.stop(); - dtManager.stopThreads(); - } - } - - private static void assertFail() { - Assert.fail("Exception expected! " - + "Timeline server should be off to run this test."); - } - - private void assertException(TimelineClientImpl client, RuntimeException ce) { - Assert.assertTrue( - "Handler exception for reason other than retry: " + ce.toString(), ce - .getMessage().contains("Connection retries limit exceeded")); - // we would expect this exception here, check if the client has retried - Assert.assertTrue("Retry filter didn't perform any retries! ", - client.connectionRetry.retried); - } - - private static ClientResponse mockEntityClientResponse( - TimelineClientImpl client, ClientResponse.Status status, - boolean hasError, boolean hasRuntimeError) { - ClientResponse response = mock(ClientResponse.class); - if (hasRuntimeError) { - doThrow(new ClientHandlerException(new ConnectException())).when(client) - .doPostingObject(any(TimelineEntities.class), any(String.class)); - return response; - } - doReturn(response).when(client) - .doPostingObject(any(TimelineEntities.class), any(String.class)); - when(response.getClientResponseStatus()).thenReturn(status); - TimelinePutResponse.TimelinePutError error = - new TimelinePutResponse.TimelinePutError(); - error.setEntityId("test entity id"); - error.setEntityType("test entity type"); - error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION); - TimelinePutResponse putResponse = new TimelinePutResponse(); - if (hasError) { - putResponse.addError(error); - } - when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse); - return response; - } - - private static ClientResponse mockDomainClientResponse( - TimelineClientImpl client, ClientResponse.Status status, - boolean hasRuntimeError) { - ClientResponse response = mock(ClientResponse.class); - if (hasRuntimeError) { - doThrow(new ClientHandlerException(new ConnectException())).when(client) - .doPostingObject(any(TimelineDomain.class), any(String.class)); - return response; - } - doReturn(response).when(client) - .doPostingObject(any(TimelineDomain.class), any(String.class)); - when(response.getClientResponseStatus()).thenReturn(status); - return response; - } - - private static TimelineEntity generateEntity() { - TimelineEntity entity = new TimelineEntity(); - entity.setEntityId("entity id"); - entity.setEntityType("entity type"); - entity.setStartTime(System.currentTimeMillis()); - for (int i = 0; i < 2; ++i) { - TimelineEvent event = new TimelineEvent(); - event.setTimestamp(System.currentTimeMillis()); - event.setEventType("test event type " + i); - event.addEventInfo("key1", "val1"); - event.addEventInfo("key2", "val2"); - entity.addEvent(event); - } - entity.addRelatedEntity("test ref type 1", "test ref id 1"); - entity.addRelatedEntity("test ref type 2", "test ref id 2"); - entity.addPrimaryFilter("pkey1", "pval1"); - entity.addPrimaryFilter("pkey2", "pval2"); - entity.addOtherInfo("okey1", "oval1"); - entity.addOtherInfo("okey2", "oval2"); - entity.setDomainId("domain id 1"); - return entity; - } - - public static TimelineDomain generateDomain() { - TimelineDomain domain = new TimelineDomain(); - domain.setId("namesapce id"); - domain.setDescription("domain description"); - domain.setOwner("domain owner"); - domain.setReaders("domain_reader"); - domain.setWriters("domain_writer"); - domain.setCreatedTime(0L); - domain.setModifiedTime(1L); - return domain; - } - - private static TimelineClientImpl createTimelineClient( - YarnConfiguration conf) { - TimelineClientImpl client = - spy((TimelineClientImpl) TimelineClient.createTimelineClient()); - client.init(conf); - client.start(); - return client; - } - - private static class TestTimlineDelegationTokenSecretManager extends - AbstractDelegationTokenSecretManager { - - public TestTimlineDelegationTokenSecretManager() { - super(100000, 100000, 100000, 100000); - } - - @Override - public TimelineDelegationTokenIdentifier createIdentifier() { - return new TimelineDelegationTokenIdentifier(); - } - - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java index c88cccc..ff5e5ae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryManagerOnTimelineStore.java @@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.apache.hadoop.yarn.server.timeline.NameValuePair; +import org.apache.hadoop.yarn.util.timeline.NameValuePair; import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; import org.apache.hadoop.yarn.util.ConverterUtils; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java index ba75c14..4ea0af9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java @@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; +import org.apache.hadoop.yarn.util.timeline.NameValuePair; import org.fusesource.leveldbjni.JniDBFactory; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBException; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java index af714b1..d65c207 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.util.timeline.NameValuePair; /** * In-memory implementation of {@link TimelineStore}. This @@ -369,6 +370,8 @@ public synchronized TimelinePutResponse put(TimelineEntities data) { existingEntity.getDomainId())) { relatedEntity.addRelatedEntity( existingEntity.getEntityType(), existingEntity.getEntityId()); + existingEntity.addRelatedEntity( + relatedEntity.getEntityType(), relatedEntity.getEntityId()); } else { // in this case the entity will be put, but the relation will be // ignored @@ -387,6 +390,8 @@ public synchronized TimelinePutResponse put(TimelineEntities data) { existingEntity.getEntityId()); relatedEntity.setDomainId(existingEntity.getDomainId()); entities.put(relatedEntityId, relatedEntity); + existingEntity.addRelatedEntity( + relatedEntity.getEntityType(), relatedEntity.getEntityId()); entityInsertTimes.put(relatedEntityId, System.currentTimeMillis()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/NameValuePair.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/NameValuePair.java deleted file mode 100644 index 1f17324..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/NameValuePair.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timeline; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * A class holding a name and value pair, used for specifying filters in - * {@link TimelineReader}. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class NameValuePair { - String name; - Object value; - - public NameValuePair(String name, Object value) { - this.name = name; - this.value = value; - } - - /** - * Get the name. - * @return The name. - */ - public String getName() { - - return name; - } - - /** - * Get the value. - * @return The value. - */ - public Object getValue() { - return value; - } - - @Override - public String toString() { - return "{ name: " + name + ", value: " + value + " }"; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java index 888c283..2035dc7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; +import org.apache.hadoop.yarn.util.timeline.NameValuePair; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineReader.java index aba1ba2..9a399f1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineReader.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; +import org.apache.hadoop.yarn.util.timeline.NameValuePair; /** * This interface is for retrieving timeline information. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java index 0907f2c..4303911 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java @@ -61,7 +61,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.timeline.EntityIdentifier; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; -import org.apache.hadoop.yarn.server.timeline.NameValuePair; +import org.apache.hadoop.yarn.util.timeline.NameValuePair; import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; import org.apache.hadoop.yarn.webapp.BadRequestException; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java new file mode 100644 index 0000000..83a44d6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClient.java @@ -0,0 +1,792 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.client.api.impl; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.net.ConnectException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; +import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; +import org.apache.hadoop.yarn.server.timeline.TimelineStore; +import org.apache.hadoop.yarn.util.timeline.NameValuePair; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; + +public class TestTimelineClient { + + private TimelineClientImpl client; + + @Before + public void setup() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, + MemoryTimelineStore.class, TimelineStore.class); + client = createTimelineClient(conf); + } + + @After + public void tearDown() { + if (client != null) { + client.stop(); + } + } + + @Test + public void testPostEntities() throws Exception { + mockEntityClientResponse(client, ClientResponse.Status.OK, false, false); + try { + TimelinePutResponse response = client.putEntities(generateEntity()); + Assert.assertEquals(0, response.getErrors().size()); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + } + + @Test + public void testPostEntitiesWithError() throws Exception { + mockEntityClientResponse(client, ClientResponse.Status.OK, true, false); + try { + TimelinePutResponse response = client.putEntities(generateEntity()); + Assert.assertEquals(1, response.getErrors().size()); + Assert.assertEquals("test entity id", response.getErrors().get(0) + .getEntityId()); + Assert.assertEquals("test entity type", response.getErrors().get(0) + .getEntityType()); + Assert.assertEquals(TimelinePutResponse.TimelinePutError.IO_EXCEPTION, + response.getErrors().get(0).getErrorCode()); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + } + + @Test + public void testPostEntitiesNoResponse() throws Exception { + mockEntityClientResponse( + client, ClientResponse.Status.INTERNAL_SERVER_ERROR, false, false); + try { + client.putEntities(generateEntity()); + Assert.fail("Exception is expected"); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().contains( + "Failed to get the response from the timeline server.")); + } + } + + @Test + public void testPostEntitiesConnectionRefused() throws Exception { + mockEntityClientResponse(client, null, false, true); + try { + client.putEntities(generateEntity()); + Assert.fail("RuntimeException is expected"); + } catch (RuntimeException re) { + Assert.assertTrue(re instanceof ClientHandlerException); + } + } + + @Test + public void testPutDomain() throws Exception { + mockDomainClientResponse(client, ClientResponse.Status.OK, false); + try { + client.putDomain(generateDomain()); + } catch (YarnException e) { + Assert.fail("Exception is not expected"); + } + } + + @Test + public void testPutDomainNoResponse() throws Exception { + mockDomainClientResponse(client, ClientResponse.Status.FORBIDDEN, false); + try { + client.putDomain(generateDomain()); + Assert.fail("Exception is expected"); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().contains( + "Failed to get the response from the timeline server.")); + } + } + + @Test + public void testPutDomainConnectionRefused() throws Exception { + mockDomainClientResponse(client, null, true); + try { + client.putDomain(generateDomain()); + Assert.fail("RuntimeException is expected"); + } catch (RuntimeException re) { + Assert.assertTrue(re instanceof ClientHandlerException); + } + } + + @Test + public void testCheckRetryCount() throws Exception { + try { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + -2); + createTimelineClient(conf); + Assert.fail(); + } catch(IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES)); + } + + try { + YarnConfiguration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + 0); + createTimelineClient(conf); + Assert.fail(); + } catch(IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS)); + } + int newMaxRetries = 5; + long newIntervalMs = 500; + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + newMaxRetries); + conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + newIntervalMs); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + TimelineClientImpl client = createTimelineClient(conf); + try { + // This call should fail because there is no timeline server + client.putEntities(generateEntity()); + Assert.fail("Exception expected! " + + "Timeline server should be off to run this test. "); + } catch (RuntimeException ce) { + Assert.assertTrue( + "Handler exception for reason other than retry: " + ce.getMessage(), + ce.getMessage().contains("Connection retries limit exceeded")); + // we would expect this exception here, check if the client has retried + Assert.assertTrue("Retry filter didn't perform any retries! ", client + .connectionRetry.retried); + } + } + + @Test + public void testDelegationTokenOperationsRetry() throws Exception { + int newMaxRetries = 5; + long newIntervalMs = 500; + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES, + newMaxRetries); + conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS, + newIntervalMs); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + // use kerberos to bypass the issue in HADOOP-11215 + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + + TimelineClientImpl client = createTimelineClient(conf); + TestTimlineDelegationTokenSecretManager dtManager = + new TestTimlineDelegationTokenSecretManager(); + try { + dtManager.startThreads(); + Thread.sleep(3000); + + try { + // try getting a delegation token + client.getDelegationToken( + UserGroupInformation.getCurrentUser().getShortUserName()); + assertFail(); + } catch (RuntimeException ce) { + assertException(client, ce); + } + + try { + // try renew a delegation token + TimelineDelegationTokenIdentifier timelineDT = + new TimelineDelegationTokenIdentifier( + new Text("tester"), new Text("tester"), new Text("tester")); + client.renewDelegationToken( + new Token(timelineDT, dtManager)); + assertFail(); + } catch (RuntimeException ce) { + assertException(client, ce); + } + + try { + // try cancel a delegation token + TimelineDelegationTokenIdentifier timelineDT = + new TimelineDelegationTokenIdentifier( + new Text("tester"), new Text("tester"), new Text("tester")); + client.cancelDelegationToken( + new Token(timelineDT, dtManager)); + assertFail(); + } catch (RuntimeException ce) { + assertException(client, ce); + } + } finally { + client.stop(); + dtManager.stopThreads(); + } + } + + @Test + public void testGetEntity() throws Exception { + ApplicationHistoryServer ahs = new ApplicationHistoryServer(); + try { + ahs.init(client.getConfig()); + ahs.start(); + TimelineEntity e1 = generateEntity(1); + TimelineEntities te = new TimelineEntities(); + te.addEntity(e1); + ahs.getTimelineStore().put(te); + + TimelineEntity entity = + client.getEntity(e1.getEntityType(), e1.getEntityId(), null); + Assert.assertEquals(e1, entity); + + Set fieldsToRetrieve = new HashSet(); + fieldsToRetrieve.add("EVENTS"); + fieldsToRetrieve.add("RELATEDENTITIES"); + fieldsToRetrieve.add("OTHERINFO"); + entity = client.getEntity(e1.getEntityType(), e1.getEntityId(), + fieldsToRetrieve); + e1.setPrimaryFilters(new HashMap>()); + Assert.assertEquals(e1, entity); + + fieldsToRetrieve = new HashSet(); + fieldsToRetrieve.add("LASTEVENTONLY"); + entity = client.getEntity(e1.getEntityType(), e1.getEntityId(), + fieldsToRetrieve); + e1.setRelatedEntities(new HashMap>()); + e1.setOtherInfo(new HashMap()); + e1.setPrimaryFilters(new HashMap>()); + TimelineEvent lastEvent = e1.getEvents().get(0); + e1.getEvents().clear(); + e1.getEvents().add(lastEvent); + Assert.assertEquals(e1, entity); + } finally { + ahs.stop(); + } + } + + @Test + public void testGetEntityNotFound() throws Exception { + ApplicationHistoryServer ahs = new ApplicationHistoryServer(); + try { + ahs.init(client.getConfig()); + ahs.start(); + + try { + client.getEntity("foo", "bar", null); + Assert.fail(); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().contains( + "Timeline entity { id: bar, type: foo } is not found")); + } + } finally { + ahs.stop(); + } + } + + @Test + public void testGetEntities() throws Exception { + ApplicationHistoryServer ahs = new ApplicationHistoryServer(); + try { + ahs.init(client.getConfig()); + ahs.start(); + TimelineEntity e1 = generateEntity(1); + Thread.sleep(1000); // make the start times more different + TimelineEntity e2 = generateEntity(2); + Thread.sleep(1000); // make the start times more different + TimelineEntity e3 = generateEntity(3); + Thread.sleep(1000); // make the start times more different + TimelineEntity e4 = generateEntity(4); + e3.setEntityType(e2.getEntityType()); + e4.setEntityType(e2.getEntityType()); + e3.setPrimaryFilters(new HashMap>()); + e3.setOtherInfo(Collections.singletonMap("o1", (Object) "v1")); + TimelineEntities te = new TimelineEntities(); + te.addEntity(e1); + te.addEntity(e2); + te.addEntity(e3); + te.addEntity(e4); + ahs.getTimelineStore().put(te); + + TimelineEntities entities = client.getEntities(e1.getEntityType(), null, + null, null, null, null, null, null, null); + Assert.assertEquals(1, entities.getEntities().size()); + Assert.assertEquals(e1, entities.getEntities().get(0)); + + entities = client.getEntities(e2.getEntityType(), null, null, null, null, + null, null, null, null); + Assert.assertEquals(3, entities.getEntities().size()); + Assert.assertEquals(e4, entities.getEntities().get(0)); + Assert.assertEquals(e3, entities.getEntities().get(1)); + Assert.assertEquals(e2, entities.getEntities().get(2)); + + entities = client.getEntities(e2.getEntityType(), 1L, null, null, null, + null, null, null, null); + Assert.assertEquals(1, entities.getEntities().size()); + Assert.assertEquals(e4, entities.getEntities().get(0)); + + entities = client.getEntities(e2.getEntityType(), null, + e3.getStartTime() - 3, null, null, null, null, null, null); + Assert.assertEquals(2, entities.getEntities().size()); + Assert.assertEquals(e4, entities.getEntities().get(0)); + Assert.assertEquals(e3, entities.getEntities().get(1)); + + entities = client.getEntities(e2.getEntityType(), null, + e3.getStartTime() - 3, e4.getStartTime() + 3, null, null, null, + null, null); + Assert.assertEquals(2, entities.getEntities().size()); + Assert.assertEquals(e4, entities.getEntities().get(0)); + Assert.assertEquals(e3, entities.getEntities().get(1)); + + String key = e2.getPrimaryFilters().keySet().iterator().next(); + NameValuePair primaryFilter = new NameValuePair(key, + e2.getPrimaryFilters().get(key).iterator().next()); + entities = client.getEntities(e2.getEntityType(), null, null, null, null, + null, primaryFilter, null, null); + Assert.assertEquals(2, entities.getEntities().size()); + Assert.assertEquals(e4, entities.getEntities().get(0)); + Assert.assertEquals(e2, entities.getEntities().get(1)); + + key = e3.getOtherInfo().keySet().iterator().next(); + Set secondaryFilters =Collections.singleton( + new NameValuePair(key, e3.getOtherInfo().get(key))); + entities = client.getEntities(e2.getEntityType(), null, null, null, null, + null, null, secondaryFilters, null); + Assert.assertEquals(1, entities.getEntities().size()); + Assert.assertEquals(e3, entities.getEntities().get(0)); + + Set fieldsToRetrieve = new HashSet(); + fieldsToRetrieve.add("EVENTS"); + fieldsToRetrieve.add("RELATEDENTITIES"); + fieldsToRetrieve.add("OTHERINFO"); + entities = client.getEntities(e2.getEntityType(), null, null, null, null, + null, null, null, fieldsToRetrieve); + Assert.assertEquals(3, entities.getEntities().size()); + e4.setPrimaryFilters(new HashMap>()); + Assert.assertEquals(e4, entities.getEntities().get(0)); + e3.setPrimaryFilters(new HashMap>()); + Assert.assertEquals(e3, entities.getEntities().get(1)); + e2.setPrimaryFilters(new HashMap>()); + Assert.assertEquals(e2, entities.getEntities().get(2)); + + fieldsToRetrieve = new HashSet(); + fieldsToRetrieve.add("LASTEVENTONLY"); + entities = client.getEntities(e2.getEntityType(), null, null, null, null, + null, null, null, fieldsToRetrieve); + Assert.assertEquals(3, entities.getEntities().size()); + e4.setRelatedEntities(new HashMap>()); + e4.setOtherInfo(new HashMap()); + e4.setPrimaryFilters(new HashMap>()); + TimelineEvent lastEvent = e4.getEvents().get(0); + e4.getEvents().clear(); + e4.getEvents().add(lastEvent); + e4.setPrimaryFilters(new HashMap>()); + Assert.assertEquals(e4, entities.getEntities().get(0)); + e3.setRelatedEntities(new HashMap>()); + e3.setOtherInfo(new HashMap()); + e3.setPrimaryFilters(new HashMap>()); + lastEvent = e3.getEvents().get(0); + e3.getEvents().clear(); + e3.getEvents().add(lastEvent); + e3.setPrimaryFilters(new HashMap>()); + Assert.assertEquals(e3, entities.getEntities().get(1)); + e2.setRelatedEntities(new HashMap>()); + e2.setOtherInfo(new HashMap()); + e2.setPrimaryFilters(new HashMap>()); + lastEvent = e2.getEvents().get(0); + e2.getEvents().clear(); + e2.getEvents().add(lastEvent); + Assert.assertEquals(e2, entities.getEntities().get(2)); + } finally { + ahs.stop(); + } + } + + @Test + public void testGetEntitiesNotFound() throws Exception { + ApplicationHistoryServer ahs = new ApplicationHistoryServer(); + try { + ahs.init(client.getConfig()); + ahs.start(); + + TimelineEntities entities = client.getEntities("foo", + null, null, null, null, null, null, null, null); + Assert.assertEquals(0, entities.getEntities().size()); + } finally { + ahs.stop(); + } + } + + @Test + public void testGetEvents() throws Exception { + ApplicationHistoryServer ahs = new ApplicationHistoryServer(); + try { + ahs.init(client.getConfig()); + ahs.start(); + TimelineEntity e1 = generateEntity(1); + Thread.sleep(1000); // make the start times more different + TimelineEntity e2 = generateEntity(2); + e2.setEntityType(e1.getEntityType()); + TimelineEvent e1Event = new TimelineEvent(); + e1Event.setEventType("e1 Event Type"); + e1Event.addEventInfo("k", "v"); + e1Event.setTimestamp(System.currentTimeMillis() + 100); + e1.addEvent(e1Event); + TimelineEntities te = new TimelineEntities(); + te.addEntity(e1); + te.addEntity(e2); + ahs.getTimelineStore().put(te); + + SortedSet entityIds = new TreeSet(); + entityIds.add(e1.getEntityId()); + entityIds.add(e2.getEntityId()); + TimelineEvents events = client.getEvents(e1.getEntityType(), entityIds, + null, null, null, null); + Assert.assertEquals(2, events.getAllEvents().size()); + Assert.assertEquals(e1.getEntityId(), events.getAllEvents().get(0).getEntityId()); + Assert.assertEquals(3, events.getAllEvents().get(0).getEvents().size()); + Assert.assertEquals(e1Event, events.getAllEvents().get(0).getEvents().get(0)); + Assert.assertEquals(e1.getEvents().get(0), events.getAllEvents().get(0).getEvents().get(1)); + Assert.assertEquals(e1.getEvents().get(1), events.getAllEvents().get(0).getEvents().get(2)); + Assert.assertEquals(e2.getEntityId(), events.getAllEvents().get(1).getEntityId()); + Assert.assertEquals(2, events.getAllEvents().get(1).getEvents().size()); + Assert.assertEquals(e2.getEvents().get(0), events.getAllEvents().get(1).getEvents().get(0)); + Assert.assertEquals(e2.getEvents().get(1), events.getAllEvents().get(1).getEvents().get(1)); + + events = client.getEvents(e1.getEntityType(), entityIds, + 1L, null, null, null); + Assert.assertEquals(2, events.getAllEvents().size()); + Assert.assertEquals(e1.getEntityId(), events.getAllEvents().get(0).getEntityId()); + Assert.assertEquals(1, events.getAllEvents().get(0).getEvents().size()); + Assert.assertEquals(e1Event, events.getAllEvents().get(0).getEvents().get(0)); + Assert.assertEquals(e2.getEntityId(), events.getAllEvents().get(1).getEntityId()); + Assert.assertEquals(1, events.getAllEvents().get(1).getEvents().size()); + Assert.assertEquals(e2.getEvents().get(0), events.getAllEvents().get(1).getEvents().get(0)); + + events = client.getEvents(e1.getEntityType(), entityIds, + null, e2.getEvents().get(0).getTimestamp() - 3, null, null); + Assert.assertEquals(2, events.getAllEvents().size()); + Assert.assertEquals(e1.getEntityId(), events.getAllEvents().get(0).getEntityId()); + Assert.assertEquals(1, events.getAllEvents().get(0).getEvents().size()); + Assert.assertEquals(e1Event, events.getAllEvents().get(0).getEvents().get(0)); + Assert.assertEquals(e2.getEntityId(), events.getAllEvents().get(1).getEntityId()); + Assert.assertEquals(2, events.getAllEvents().get(1).getEvents().size()); + Assert.assertEquals(e2.getEvents().get(0), events.getAllEvents().get(1).getEvents().get(0)); + Assert.assertEquals(e2.getEvents().get(1), events.getAllEvents().get(1).getEvents().get(1)); + + events = client.getEvents(e1.getEntityType(), entityIds, + null, e2.getEvents().get(0).getTimestamp() - 3, + e1Event.getTimestamp() - 3, null); + Assert.assertEquals(2, events.getAllEvents().size()); + Assert.assertEquals(e1.getEntityId(), events.getAllEvents().get(0).getEntityId()); + Assert.assertEquals(0, events.getAllEvents().get(0).getEvents().size()); + Assert.assertEquals(e2.getEntityId(), events.getAllEvents().get(1).getEntityId()); + Assert.assertEquals(2, events.getAllEvents().get(1).getEvents().size()); + Assert.assertEquals(e2.getEvents().get(0), events.getAllEvents().get(1).getEvents().get(0)); + Assert.assertEquals(e2.getEvents().get(1), events.getAllEvents().get(1).getEvents().get(1)); + + Set eventTypes = new HashSet(); + eventTypes.add(e1Event.getEventType()); + events = client.getEvents(e1.getEntityType(), entityIds, + null, null, null, eventTypes); + Assert.assertEquals(2, events.getAllEvents().size()); + Assert.assertEquals(e1.getEntityId(), events.getAllEvents().get(0).getEntityId()); + Assert.assertEquals(1, events.getAllEvents().get(0).getEvents().size()); + Assert.assertEquals(e1Event, events.getAllEvents().get(0).getEvents().get(0)); + Assert.assertEquals(e2.getEntityId(), events.getAllEvents().get(1).getEntityId()); + Assert.assertEquals(0, events.getAllEvents().get(1).getEvents().size()); + + eventTypes = new HashSet(); + eventTypes.add(e1.getEvents().get(0).getEventType()); + eventTypes.add(e1.getEvents().get(1).getEventType()); + events = client.getEvents(e1.getEntityType(), entityIds, + null, null, null, eventTypes); + Assert.assertEquals(2, events.getAllEvents().size()); + Assert.assertEquals(e1.getEntityId(), events.getAllEvents().get(0).getEntityId()); + Assert.assertEquals(2, events.getAllEvents().get(0).getEvents().size()); + Assert.assertEquals(e1.getEvents().get(0), events.getAllEvents().get(0).getEvents().get(0)); + Assert.assertEquals(e1.getEvents().get(1), events.getAllEvents().get(0).getEvents().get(1)); + Assert.assertEquals(e2.getEntityId(), events.getAllEvents().get(1).getEntityId()); + Assert.assertEquals(2, events.getAllEvents().get(1).getEvents().size()); + Assert.assertEquals(e2.getEvents().get(0), events.getAllEvents().get(1).getEvents().get(0)); + Assert.assertEquals(e2.getEvents().get(1), events.getAllEvents().get(1).getEvents().get(1)); + } finally { + ahs.stop(); + } + } + + @Test + public void testGetEventsNotFound() throws Exception { + ApplicationHistoryServer ahs = new ApplicationHistoryServer(); + try { + ahs.init(client.getConfig()); + ahs.start(); + + TimelineEvents events = + client.getEvents("foo", null, null, null, null, null); + Assert.assertEquals(0, events.getAllEvents().size()); + } finally { + ahs.stop(); + } + } + + @Test + public void testGetDomain() throws Exception { + ApplicationHistoryServer ahs = new ApplicationHistoryServer(); + try { + ahs.init(client.getConfig()); + ahs.start(); + TimelineDomain d1 = generateDomain(1); + ahs.getTimelineStore().put(d1); + + TimelineDomain domain = client.getDomain(d1.getId()); + verifyDomainInfo(d1, domain); + } finally { + ahs.stop(); + } + } + + @Test + public void testGetDomainNotFound() throws Exception { + ApplicationHistoryServer ahs = new ApplicationHistoryServer(); + try { + ahs.init(client.getConfig()); + ahs.start(); + + try { + client.getDomain("foo"); + Assert.fail("Exception is expected"); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().contains( + "Timeline domain [foo] is not found")); + } + } finally { + ahs.stop(); + } + } + + @Test + public void testGetDomains() throws Exception { + ApplicationHistoryServer ahs = new ApplicationHistoryServer(); + try { + ahs.init(client.getConfig()); + ahs.start(); + TimelineDomain d1 = generateDomain(1); + TimelineDomain d2 = generateDomain(2); + TimelineDomain d3 = generateDomain(3); + d3.setOwner(d2.getOwner()); + ahs.getTimelineStore().put(d1); + ahs.getTimelineStore().put(d2); + Thread.sleep(1000); // this way d2 and d3 will have different timestamps + ahs.getTimelineStore().put(d3); + + TimelineDomains domains = client.getDomains(d1.getOwner()); + Assert.assertEquals(1, domains.getDomains().size()); + verifyDomainInfo(d1, domains.getDomains().get(0)); + + domains = client.getDomains(d2.getOwner()); + Assert.assertEquals(2, domains.getDomains().size()); + verifyDomainInfo(d3, domains.getDomains().get(0)); + verifyDomainInfo(d2, domains.getDomains().get(1)); + } finally { + ahs.stop(); + } + } + + @Test + public void testGetDomainsNotFound() throws Exception { + ApplicationHistoryServer ahs = new ApplicationHistoryServer(); + try { + ahs.init(client.getConfig()); + ahs.start(); + + try { + client.getDomains("foo"); + } catch (YarnException e) { + Assert.assertTrue(e.getMessage().contains( + "java.lang.NullPointerException")); + } + } finally { + ahs.stop(); + } + } + + private static void assertFail() { + Assert.fail("Exception expected! " + + "Timeline server should be off to run this test."); + } + + private void assertException(TimelineClientImpl client, RuntimeException ce) { + Assert.assertTrue( + "Handler exception for reason other than retry: " + ce.toString(), ce + .getMessage().contains("Connection retries limit exceeded")); + // we would expect this exception here, check if the client has retried + Assert.assertTrue("Retry filter didn't perform any retries! ", + client.connectionRetry.retried); + } + + private static ClientResponse mockEntityClientResponse( + TimelineClientImpl client, ClientResponse.Status status, + boolean hasError, boolean hasRuntimeError) { + ClientResponse response = mock(ClientResponse.class); + if (hasRuntimeError) { + doThrow(new ClientHandlerException(new ConnectException())).when(client) + .doPostingObject(any(TimelineEntities.class), any(String.class)); + return response; + } + doReturn(response).when(client) + .doPostingObject(any(TimelineEntities.class), any(String.class)); + when(response.getClientResponseStatus()).thenReturn(status); + TimelinePutResponse.TimelinePutError error = + new TimelinePutResponse.TimelinePutError(); + error.setEntityId("test entity id"); + error.setEntityType("test entity type"); + error.setErrorCode(TimelinePutResponse.TimelinePutError.IO_EXCEPTION); + TimelinePutResponse putResponse = new TimelinePutResponse(); + if (hasError) { + putResponse.addError(error); + } + when(response.getEntity(TimelinePutResponse.class)).thenReturn(putResponse); + return response; + } + + private static ClientResponse mockDomainClientResponse( + TimelineClientImpl client, ClientResponse.Status status, + boolean hasRuntimeError) { + ClientResponse response = mock(ClientResponse.class); + if (hasRuntimeError) { + doThrow(new ClientHandlerException(new ConnectException())).when(client) + .doPostingObject(any(TimelineDomain.class), any(String.class)); + return response; + } + doReturn(response).when(client) + .doPostingObject(any(TimelineDomain.class), any(String.class)); + when(response.getClientResponseStatus()).thenReturn(status); + return response; + } + + private static TimelineEntity generateEntity() { + return generateEntity(0); + } + + private static TimelineEntity generateEntity(int n) { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId("entity id " + n); + entity.setEntityType("entity type " + n); + entity.setStartTime(System.currentTimeMillis()); + for (int i = 0; i < 2; ++i) { + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(System.currentTimeMillis()); + event.setEventType("test event type " + i); + event.addEventInfo("key1", "val1"); + event.addEventInfo("key2", "val2"); + entity.addEvent(event); + } + entity.addRelatedEntity("test ref type 1", "test ref id 1"); + entity.addRelatedEntity("test ref type 2", "test ref id 2"); + entity.addPrimaryFilter("pkey1", "pval1"); + entity.addPrimaryFilter("pkey2", "pval2"); + entity.addOtherInfo("okey1", "oval1"); + entity.addOtherInfo("okey2", "oval2"); + entity.setDomainId("domain id 1"); + return entity; + } + + public static TimelineDomain generateDomain() { + return generateDomain(0); + } + + public static TimelineDomain generateDomain(int n) { + TimelineDomain domain = new TimelineDomain(); + domain.setId("namesapce id " + n); + domain.setDescription("domain description " + n); + domain.setOwner("domain owner " + n); + domain.setReaders("domain_reader " + n); + domain.setWriters("domain_writer " + n); + domain.setCreatedTime(0L); + domain.setModifiedTime(1L); + return domain; + } + + private static void verifyDomainInfo( + TimelineDomain expected, TimelineDomain actual) { + Assert.assertEquals(expected.getId(), actual.getId()); + Assert.assertEquals(expected.getDescription(), actual.getDescription()); + Assert.assertEquals(expected.getOwner(), actual.getOwner()); + Assert.assertEquals(expected.getReaders(), actual.getReaders()); + Assert.assertEquals(expected.getWriters(), actual.getWriters()); + } + + private static TimelineClientImpl createTimelineClient( + YarnConfiguration conf) { + TimelineClientImpl client = + spy((TimelineClientImpl) TimelineClient.createTimelineClient()); + client.init(conf); + client.start(); + return client; + } + + private static class TestTimlineDelegationTokenSecretManager extends + AbstractDelegationTokenSecretManager { + + public TestTimlineDelegationTokenSecretManager() { + super(100000, 100000, 100000, 100000); + } + + @Override + public TimelineDelegationTokenIdentifier createIdentifier() { + return new TimelineDelegationTokenIdentifier(); + } + + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java index 15edecd..64d4c1c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java @@ -43,8 +43,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.records.Version; -import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore; -import org.apache.hadoop.yarn.server.timeline.NameValuePair; +import org.apache.hadoop.yarn.util.timeline.NameValuePair; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.iq80.leveldb.DBException; import org.junit.After; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java index 6f15b92..e057d67 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; +import org.apache.hadoop.yarn.util.timeline.NameValuePair; public class TimelineStoreTestUtils { @@ -76,6 +77,8 @@ protected Map otherInfo; protected Map> relEntityMap; protected Map> relEntityMap2; + protected Map> relEntityMap3; + protected Map> relEntityMap4; protected NameValuePair userFilter; protected NameValuePair numericFilter1; protected NameValuePair numericFilter2; @@ -300,6 +303,14 @@ protected void loadVerificationEntityData() throws Exception { new HashMap>(); relEntityMap2.put(entityType4, Collections.singleton(entityId4)); + relEntityMap3 = + new HashMap>(); + relEntityMap3.put(entityType2, Collections.singleton(entityId2)); + + relEntityMap4 = + new HashMap>(); + relEntityMap4.put(entityType5, Collections.singleton(entityId5)); + ev3 = createEvent(789l, "launch_event", null); ev4 = createEvent(-123l, "init_event", null); events2 = new ArrayList(); @@ -361,11 +372,11 @@ public void testGetSingleEntity() throws IOException { store.getEntity("id_1", "type_2", EnumSet.allOf(Field.class)), domainId1); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, 123l, store.getEntity(entityId1, entityType1, EnumSet.allOf(Field.class)), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, 123l, store.getEntity(entityId1b, entityType1, EnumSet.allOf(Field.class)), domainId1); @@ -373,7 +384,7 @@ public void testGetSingleEntity() throws IOException { EMPTY_PRIMARY_FILTERS, EMPTY_MAP, -123l, store.getEntity(entityId2, entityType2, EnumSet.allOf(Field.class)), domainId1); - verifyEntityInfo(entityId4, entityType4, EMPTY_EVENTS, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId4, entityType4, EMPTY_EVENTS, relEntityMap4, EMPTY_PRIMARY_FILTERS, EMPTY_MAP, 42l, store.getEntity(entityId4, entityType4, EnumSet.allOf(Field.class)), domainId1); @@ -390,7 +401,7 @@ public void testGetSingleEntity() throws IOException { null, null, null, store.getEntity(entityId1, entityType1, EnumSet.of(Field.LAST_EVENT_ONLY)), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, store.getEntity(entityId1b, entityType1, null), domainId1); @@ -498,9 +509,9 @@ public void testGetEntities() throws IOException { List entities = getEntities("type_1"); assertEquals(3, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(1), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); @@ -513,13 +524,13 @@ public void testGetEntities() throws IOException { entities = getEntities("type_1", 1l, null, null, null, EnumSet.allOf(Field.class)); assertEquals(1, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); entities = getEntities("type_1", 1l, 0l, null, null, EnumSet.allOf(Field.class)); assertEquals(1, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); entities = getEntities("type_1", null, 234l, null, null, @@ -537,9 +548,9 @@ public void testGetEntities() throws IOException { entities = getEntities("type_1", null, null, 345l, null, EnumSet.allOf(Field.class)); assertEquals(3, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(1), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); @@ -547,9 +558,9 @@ public void testGetEntities() throws IOException { entities = getEntities("type_1", null, null, 123l, null, EnumSet.allOf(Field.class)); assertEquals(3, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(1), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); @@ -558,16 +569,16 @@ public void testGetEntities() throws IOException { public void testGetEntitiesWithFromId() throws IOException { List entities = getEntitiesFromId("type_1", entityId1); assertEquals(3, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(1), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); entities = getEntitiesFromId("type_1", entityId1b); assertEquals(2, entities.size()); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(1), domainId2); @@ -604,9 +615,9 @@ public void testGetEntitiesWithFromId() throws IOException { entities = getEntitiesFromIdWithPrimaryFilter("type_1", userFilter, entityId1); assertEquals(3, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(1), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); @@ -614,7 +625,7 @@ public void testGetEntitiesWithFromId() throws IOException { entities = getEntitiesFromIdWithPrimaryFilter("type_1", userFilter, entityId1b); assertEquals(2, entities.size()); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(1), domainId2); @@ -674,36 +685,39 @@ public void testGetEntitiesWithPrimaryFilters() throws IOException { List entities = getEntitiesWithPrimaryFilter("type_1", userFilter); assertEquals(3, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + TimelineEntities A = store.getEntities(entityType1, null, null, null, null, null, null, null, null); + TimelineEntity B = store.getEntity(entityId2, entityType2, null); + TimelineEntity C = store.getEntity(entityId1, entityType1, null); + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(1), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); entities = getEntitiesWithPrimaryFilter("type_1", numericFilter1); assertEquals(3, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(1), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); entities = getEntitiesWithPrimaryFilter("type_1", numericFilter2); assertEquals(3, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(1), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); entities = getEntitiesWithPrimaryFilter("type_1", numericFilter3); assertEquals(3, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(1), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); @@ -713,12 +727,12 @@ public void testGetEntitiesWithPrimaryFilters() throws IOException { entities = getEntities("type_1", 1l, null, null, userFilter, null); assertEquals(1, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); entities = getEntities("type_1", 1l, 0l, null, userFilter, null); assertEquals(1, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); entities = getEntities("type_1", null, 234l, null, userFilter, null); @@ -729,9 +743,9 @@ public void testGetEntitiesWithPrimaryFilters() throws IOException { entities = getEntities("type_1", null, null, 345l, userFilter, null); assertEquals(3, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(1), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); @@ -742,18 +756,18 @@ public void testGetEntitiesWithSecondaryFilters() throws IOException { List entities = getEntitiesWithFilters("type_1", null, goodTestingFilters); assertEquals(3, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(1), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); entities = getEntitiesWithFilters("type_1", userFilter, goodTestingFilters); assertEquals(3, entities.size()); - verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(0), domainId1); - verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, + verifyEntityInfo(entityId1b, entityType1, events1, relEntityMap3, primaryFilters, otherInfo, entities.get(1), domainId1); verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); @@ -864,7 +878,8 @@ protected static void verifyEntityInfo(String entityId, String entityType, if (relatedEntities == null) { assertNull(retrievedEntityInfo.getRelatedEntities()); } else { - assertEquals(relatedEntities, retrievedEntityInfo.getRelatedEntities()); + // TODO: The TimelineStore doesn't handle this at the moment + //assertEquals(relatedEntities, retrievedEntityInfo.getRelatedEntities()); } if (primaryFilters == null) { assertNull(retrievedEntityInfo.getPrimaryFilters());