From 1f473b62276da951697a7d9518e2b560f1672f63 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Fri, 17 Feb 2017 23:34:47 +0530 Subject: [PATCH] YARN-6027 --- .../common/AbstractHBaseTestBase.java | 162 +++++++++++++++++++ .../TestTimelineReaderWebServicesHBaseStorage.java | 177 +++++++-------------- .../storage/reader/FlowActivityEntityReader.java | 15 +- .../reader/TimelineReaderManager.java | 12 +- .../reader/TimelineReaderServer.java | 2 +- .../reader/TimelineReaderUtils.java | 19 +++ .../reader/TimelineReaderWebServices.java | 40 ++++- 7 files changed, 300 insertions(+), 127 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/common/AbstractHBaseTestBase.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/common/AbstractHBaseTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/common/AbstractHBaseTestBase.java new file mode 100644 index 0000000..ee52906 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/common/AbstractHBaseTestBase.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.util.List; + +import javax.ws.rs.core.MediaType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderServer; +import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.junit.Assert; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.ClientResponse.Status; +import com.sun.jersey.api.client.GenericType; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; + +public abstract class AbstractHBaseTestBase { + protected static int serverPort; + protected static TimelineReaderServer server; + protected static HBaseTestingUtility util; + + public static void setup() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + DataGeneratorForTest.createSchema(util.getConfiguration()); + } + + public static void tearDown() throws Exception { + if (util != null) { + util.shutdownMiniCluster(); + } + } + + protected static void initialize() throws Exception { + try { + Configuration config = util.getConfiguration(); + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + "localhost:0"); + config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); + config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, + "org.apache.hadoop.yarn.server.timelineservice.storage." + + "HBaseTimelineReaderImpl"); + config.setInt("hfile.format.version", 3); + server = new TimelineReaderServer() { + @Override + protected void setupOptions(Configuration conf) { + // The parent code tries to use HttpServer2 from this version of + // Hadoop, but the tests are loading in HttpServer2 from + // ${hbase-compatible-hadoop.version}. This version uses Jetty 9 + // while ${hbase-compatible-hadoop.version} uses Jetty 6, and there + // are many differences, including classnames and packages. + // We do nothing here, so that we don't cause a NoSuchMethodError. + // Once ${hbase-compatible-hadoop.version} is changed to Hadoop 3, + // we should be able to remove this @Override. + } + }; + server.init(config); + server.start(); + serverPort = server.getWebServerPort(); + } catch (Exception e) { + Assert.fail("Web server failed to start"); + } + } + + protected Client createClient() { + ClientConfig cfg = new DefaultClientConfig(); + cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class); + return new Client( + new URLConnectionClientHandler(new DummyURLConnectionFactory()), cfg); + } + + protected ClientResponse getResponse(Client client, URI uri) + throws Exception { + ClientResponse resp = + client.resource(uri).accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + if (resp == null || resp.getStatusInfo() + .getStatusCode() != ClientResponse.Status.OK.getStatusCode()) { + String msg = ""; + if (resp != null) { + msg = String.valueOf(resp.getStatusInfo().getStatusCode()); + } + throw new IOException( + "Incorrect response from timeline reader. " + "Status=" + msg); + } + return resp; + } + + protected void verifyHttpResponse(Client client, URI uri, Status status) { + ClientResponse resp = + client.resource(uri).accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertNotNull(resp); + assertTrue("Response from server should have been " + status, + resp.getStatusInfo().getStatusCode() == status.getStatusCode()); + System.out.println("Response is: " + resp.getEntity(String.class)); + } + + protected List verifyFlowEntites(Client client, URI uri, + int noOfEntities) throws Exception { + ClientResponse resp = getResponse(client, uri); + List entities = + resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(noOfEntities, entities.size()); + return entities; + } + + protected static class DummyURLConnectionFactory + implements HttpURLConnectionFactory { + + @Override + public HttpURLConnection getHttpURLConnection(final URL url) + throws IOException { + try { + return (HttpURLConnection) url.openConnection(); + } catch (UndeclaredThrowableException e) { + throw new IOException(e.getCause()); + } + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index 38865f1..52b1d90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -24,10 +24,7 @@ import static org.junit.Assert.fail; import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.HttpURLConnection; import java.net.URI; -import java.net.URL; import java.text.DateFormat; import java.util.ArrayList; import java.util.HashMap; @@ -39,7 +36,6 @@ import javax.ws.rs.core.MediaType; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; @@ -50,17 +46,13 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest; +import org.apache.hadoop.yarn.server.timelineservice.common.AbstractHBaseTestBase; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -70,27 +62,27 @@ import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse.Status; import com.sun.jersey.api.client.GenericType; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; -import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; - -public class TestTimelineReaderWebServicesHBaseStorage { - private int serverPort; - private TimelineReaderServer server; - private static HBaseTestingUtility util; + +public class TestTimelineReaderWebServicesHBaseStorage + extends AbstractHBaseTestBase { private static long ts = System.currentTimeMillis(); private static long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts); @BeforeClass - public static void setup() throws Exception { - util = new HBaseTestingUtility(); - Configuration conf = util.getConfiguration(); - conf.setInt("hfile.format.version", 3); - util.startMiniCluster(); - DataGeneratorForTest.createSchema(conf); + public static void setupBeforeClass() throws Exception { + setup(); loadData(); + initialize(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (server != null) { + server.stop(); + server = null; + } + tearDown(); } private static void loadData() throws Exception { @@ -348,7 +340,7 @@ private static void loadData() throws Exception { try { hbi = new HBaseTimelineWriterImpl(c1); hbi.init(c1); - hbi.write(cluster, user, flow, flowVersion, runid, entity.getId(), te); + hbi.write(cluster, user, flow, flowVersion, runid, entity1.getId(), te1); hbi.write(cluster, user, flow, flowVersion, runid1, entity4.getId(), te4); hbi.write(cluster, user, flow2, @@ -357,6 +349,8 @@ private static void loadData() throws Exception { "application_1111111111_1111", userEntities); writeApplicationEntities(hbi); hbi.flush(); + } catch (Exception e) { + e.printStackTrace(); } finally { if (hbi != null) { hbi.close(); @@ -393,84 +387,6 @@ static void writeApplicationEntities(HBaseTimelineWriterImpl hbi) } } - @AfterClass - public static void tearDown() throws Exception { - util.shutdownMiniCluster(); - } - - @Before - public void init() throws Exception { - try { - Configuration config = util.getConfiguration(); - config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); - config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - "localhost:0"); - config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); - config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, - "org.apache.hadoop.yarn.server.timelineservice.storage." + - "HBaseTimelineReaderImpl"); - config.setInt("hfile.format.version", 3); - server = new TimelineReaderServer() { - @Override - protected void setupOptions(Configuration conf) { - // The parent code tries to use HttpServer2 from this version of - // Hadoop, but the tests are loading in HttpServer2 from - // ${hbase-compatible-hadoop.version}. This version uses Jetty 9 - // while ${hbase-compatible-hadoop.version} uses Jetty 6, and there - // are many differences, including classnames and packages. - // We do nothing here, so that we don't cause a NoSuchMethodError. - // Once ${hbase-compatible-hadoop.version} is changed to Hadoop 3, - // we should be able to remove this @Override. - } - }; - server.init(config); - server.start(); - serverPort = server.getWebServerPort(); - } catch (Exception e) { - Assert.fail("Web server failed to start"); - } - } - - private static Client createClient() { - ClientConfig cfg = new DefaultClientConfig(); - cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class); - return new Client(new URLConnectionClientHandler( - new DummyURLConnectionFactory()), cfg); - } - - private static ClientResponse getResponse(Client client, URI uri) - throws Exception { - ClientResponse resp = - client.resource(uri).accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); - if (resp == null || - resp.getStatusInfo().getStatusCode() != - ClientResponse.Status.OK.getStatusCode()) { - String msg = ""; - if (resp != null) { - msg = String.valueOf(resp.getStatusInfo().getStatusCode()); - } - throw new IOException("Incorrect response from timeline reader. " + - "Status=" + msg); - } - return resp; - } - - private static class DummyURLConnectionFactory - implements HttpURLConnectionFactory { - - @Override - public HttpURLConnection getHttpURLConnection(final URL url) - throws IOException { - try { - return (HttpURLConnection)url.openConnection(); - } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); - } - } - } - private static TimelineEntity newEntity(String type, String id) { TimelineEntity entity = new TimelineEntity(); entity.setIdentifier(new TimelineEntity.Identifier(type, id)); @@ -512,17 +428,6 @@ private static boolean verifyMetrics( return false; } - private static void verifyHttpResponse(Client client, URI uri, - Status status) { - ClientResponse resp = - client.resource(uri).accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertNotNull(resp); - assertTrue("Response from server should have been " + status, - resp.getStatusInfo().getStatusCode() == status.getStatusCode()); - System.out.println("Response is: " + resp.getEntity(String.class)); - } - @Test public void testGetFlowRun() throws Exception { Client client = createClient(); @@ -1065,6 +970,45 @@ public void testGetFlows() throws Exception { } @Test + public void testGetFlowsForPagination() throws Exception { + String FROMID_KEY = "FROM_ID"; + Client client = createClient(); + int noOfEntities = 3; + int limit = 2; + try { + String flowURI = "http://localhost:" + serverPort + "/ws/v2/" + + "timeline/clusters/cluster1/flows"; + URI uri = URI.create(flowURI); + List flowEntites = + verifyFlowEntites(client, uri, 3, new int[] { 3, 2, 1 }, + new String[] { "flow1", "flow_name", "flow_name2" }); + FlowActivityEntity fEntity1 = flowEntites.get(0); + FlowActivityEntity fEntity3 = flowEntites.get(noOfEntities - 1); + assertNotNull(fEntity1.getInfo().get(FROMID_KEY)); + + uri = URI.create(flowURI + "?limit=" + limit); + flowEntites = verifyFlowEntites(client, uri, limit); + assertEquals(fEntity1, flowEntites.get(0)); + FlowActivityEntity fEntity2 = flowEntites.get(limit - 1); + + uri = URI + .create(flowURI + "?limit=" + limit + "&&fromid=" + + fEntity2.getInfo().get(FROMID_KEY)); + flowEntites = verifyFlowEntites(client, uri, noOfEntities - limit + 1); + assertEquals(fEntity2, flowEntites.get(0)); + assertEquals(fEntity3, flowEntites.get(noOfEntities - limit)); + + uri = URI + .create(flowURI + "?limit=" + limit + "&&fromid=" + + fEntity3.getInfo().get(FROMID_KEY)); + flowEntites = verifyFlowEntites(client, uri, 1); + assertEquals(fEntity3, flowEntites.get(0)); + } finally { + client.destroy(); + } + } + + @Test public void testGetApp() throws Exception { Client client = createClient(); try { @@ -2276,8 +2220,8 @@ private TimelineEntity verifyPaginatedEntites(List entities, return entity; } - private void verifyFlowEntites(Client client, URI uri, int noOfEntities, - int[] a, String[] flowsInSequence) throws Exception { + private List verifyFlowEntites(Client client, URI uri, + int noOfEntities, int[] a, String[] flowsInSequence) throws Exception { ClientResponse resp = getResponse(client, uri); List entities = resp.getEntity(new GenericType>() { @@ -2292,6 +2236,7 @@ private void verifyFlowEntites(Client client, URI uri, int noOfEntities, timelineEntity.getInfo().get("SYSTEM_INFO_FLOW_NAME")); assertEquals(a[count++], timelineEntity.getFlowRuns().size()); } + return entities; } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index c741d0e..039f5ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -110,11 +110,22 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); String clusterId = getContext().getClusterId(); - if (getFilters().getCreatedTimeBegin() == 0L && - getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { + if (getFilters().getFromId() == null + && getFilters().getCreatedTimeBegin() == 0L + && getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { // All records have to be chosen. scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId) .getRowKeyPrefix()); + } else if (getFilters().getFromId() != null) { + FlowActivityRowKey key = + new FlowActivityRowKey(clusterId, getFilters().getCreatedTimeEnd(), + getContext().getUserId(), getContext().getFlowName()); + scan.setStartRow(key.getRowKey()); + scan.setStopRow( + new FlowActivityRowKeyPrefix(clusterId, + (getFilters().getCreatedTimeBegin() <= 0 ? 0 + : (getFilters().getCreatedTimeBegin() - 1))) + .getRowKeyPrefix()); } else { scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters() .getCreatedTimeEnd()).getRowKeyPrefix()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java index 66e4cbf..7a57585 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java @@ -45,6 +45,7 @@ @VisibleForTesting public static final String UID_KEY = "UID"; + public static final String FROMID_KEY = "FROM_ID"; private TimelineReader reader; public TimelineReaderManager(TimelineReader timelineReader) { @@ -86,7 +87,7 @@ private static TimelineEntityType getTimelineEntityType(String entityType) { * @param entity Timeline Entity. * @param context Context defining the query. */ - private static void fillUID(TimelineEntityType entityType, + private static void fillSystemInfoKeys(TimelineEntityType entityType, TimelineEntity entity, TimelineReaderContext context) { if (entityType != null) { switch(entityType) { @@ -96,6 +97,11 @@ private static void fillUID(TimelineEntityType entityType, context.setFlowName(activityEntity.getFlowName()); entity.setUID(UID_KEY, TimelineUIDConverter.FLOW_UID.encodeUID(context)); + entity.getInfo().put(FROMID_KEY, + TimelineReaderUtils.joinAndEscapeStrings( + new String[] { activityEntity.getCluster(), + activityEntity.getDate().getTime() + "", + activityEntity.getUser(), activityEntity.getFlowName() })); return; case YARN_FLOW_RUN: FlowRunEntity runEntity = (FlowRunEntity)entity; @@ -144,7 +150,7 @@ private static void fillUID(TimelineEntityType entityType, if (entities != null) { TimelineEntityType type = getTimelineEntityType(context.getEntityType()); for (TimelineEntity entity : entities) { - fillUID(type, entity, context); + fillSystemInfoKeys(type, entity, context); } } return entities; @@ -173,7 +179,7 @@ public TimelineEntity getEntity(TimelineReaderContext context, new TimelineReaderContext(context), dataToRetrieve); if (entity != null) { TimelineEntityType type = getTimelineEntityType(context.getEntityType()); - fillUID(type, entity, context); + fillSystemInfoKeys(type, entity, context); } return entity; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java index 76f7495..8ce0a03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java @@ -165,7 +165,7 @@ protected void setupOptions(Configuration conf) { } @VisibleForTesting - int getWebServerPort() { + public int getWebServerPort() { return readerWebServer.getConnectorAddress(0).getPort(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java index c93c631..6689369 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java @@ -32,6 +32,16 @@ private TimelineReaderUtils() { } /** + * Default delimiter for joining strings. + */ + public static final char DEFAULT_DELIMITER_CHAR = '!'; + + /** + * Default escape character used for joining strings + */ + public static final char DEFAULT_ESCAPE_CHAR = '*'; + + /** * Split the passed string along the passed delimiter character while looking * for escape char to interpret the splitted parts correctly. For delimiter or * escape character to be interpreted as part of the string, they have to be @@ -107,6 +117,15 @@ private TimelineReaderUtils() { return list; } + static List split(final String str) throws IllegalArgumentException { + return split(str, DEFAULT_DELIMITER_CHAR, DEFAULT_ESCAPE_CHAR); + } + + static String joinAndEscapeStrings(final String[] strs) { + return joinAndEscapeStrings(strs, DEFAULT_DELIMITER_CHAR, + DEFAULT_ESCAPE_CHAR); + } + private static String escapeString(final String str, final char delimiterChar, final char escapeChar) { if (str == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index abf93df..3af266b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -23,6 +23,7 @@ import java.text.SimpleDateFormat; import java.util.Collections; import java.util.Date; +import java.util.List; import java.util.Locale; import java.util.Set; import java.util.TimeZone; @@ -1350,8 +1351,9 @@ public TimelineEntity getFlowRun( @Context HttpServletRequest req, @Context HttpServletResponse res, @QueryParam("limit") String limit, - @QueryParam("daterange") String dateRange) { - return getFlows(req, res, null, limit, dateRange); + @QueryParam("daterange") String dateRange, + @QueryParam("fromid") String fromId) { + return getFlows(req, res, null, limit, dateRange, fromId); } /** @@ -1397,7 +1399,8 @@ public TimelineEntity getFlowRun( @Context HttpServletResponse res, @PathParam("clusterid") String clusterId, @QueryParam("limit") String limit, - @QueryParam("daterange") String dateRange) { + @QueryParam("daterange") String dateRange, + @QueryParam("fromid") String fromId) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -1411,15 +1414,42 @@ public TimelineEntity getFlowRun( Set entities = null; try { DateRange range = parseDateRange(dateRange); + Long dayTs = null; + String user = null; + String flowName = null; + + List split = TimelineReaderUtils.split(fromId); + if (split != null) { + if (split.size() != 4) { + throw new BadRequestException("Invalid fromid has provided."); + } + if (clusterId != null && !clusterId.equals(split.get(0))) { + throw new BadRequestException( + "Cluster Id do not match. PathParam clusterid=" + clusterId + + " fromid contained clusterid=" + split.get(0)); + } + try { + dayTs = Long.parseLong(split.get(1)); + } catch (NumberFormatException e) { + throw new BadRequestException("Invalid fromid has provided."); + } + user = split.get(2); + flowName = split.get(3); + } + TimelineEntityFilters entityFilters = TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, null, null, null, null, null, null, null, null, null, - null); + fromId); entityFilters.setCreatedTimeBegin(range.dateStart); entityFilters.setCreatedTimeEnd(range.dateEnd); + if (dayTs != null) { + // overwrite with parsed timestamp + entityFilters.setCreatedTimeEnd(dayTs); + } entities = timelineReaderManager.getEntities( TimelineReaderWebServicesUtils.createTimelineReaderContext( - clusterId, null, null, null, null, + clusterId, user, flowName, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null), entityFilters, TimelineReaderWebServicesUtils. createTimelineDataToRetrieve(null, null, null, null)); -- 2.10.1 (Apple Git-78)