From f90f787152945ec6d5c97dc942ffc38d91cc2369 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Thu, 23 Feb 2017 16:57:08 +0530 Subject: [PATCH] YARN-6027-YARN-5355.0004 --- .../common/AbstractHBaseTestBase.java | 162 +++++++++++++++++++ .../TestTimelineReaderWebServicesHBaseStorage.java | 180 +++++++-------------- .../storage/flow/FlowActivityRowKey.java | 29 ++++ .../storage/reader/FlowActivityEntityReader.java | 20 ++- .../storage/reader/TimelineEntityReader.java | 1 + .../reader/TimelineReaderServer.java | 2 +- .../reader/TimelineReaderUtils.java | 22 ++- .../reader/TimelineReaderWebServices.java | 10 +- 8 files changed, 294 insertions(+), 132 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/common/AbstractHBaseTestBase.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/common/AbstractHBaseTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/common/AbstractHBaseTestBase.java new file mode 100644 index 0000000..10c1ab6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/common/AbstractHBaseTestBase.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.util.List; + +import javax.ws.rs.core.MediaType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderServer; +import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.junit.Assert; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.ClientResponse.Status; +import com.sun.jersey.api.client.GenericType; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; + +public abstract class AbstractHBaseTestBase { + protected static int serverPort; + protected static TimelineReaderServer server; + protected static HBaseTestingUtility util; + + public static void setup() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + DataGeneratorForTest.createSchema(util.getConfiguration()); + } + + public static void tearDown() throws Exception { + if (util != null) { + util.shutdownMiniCluster(); + } + } + + protected static void initialize() throws Exception { + try { + Configuration config = util.getConfiguration(); + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + "localhost:0"); + config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); + config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, + "org.apache.hadoop.yarn.server.timelineservice.storage." + + "HBaseTimelineReaderImpl"); + config.setInt("hfile.format.version", 3); + server = new TimelineReaderServer() { + @Override + protected void setupOptions(Configuration conf) { + // The parent code tries to use HttpServer2 from this version of + // Hadoop, but the tests are loading in HttpServer2 from + // ${hbase-compatible-hadoop.version}. This version uses Jetty 9 + // while ${hbase-compatible-hadoop.version} uses Jetty 6, and there + // are many differences, including classnames and packages. + // We do nothing here, so that we don't cause a NoSuchMethodError. + // Once ${hbase-compatible-hadoop.version} is changed to Hadoop 3, + // we should be able to remove this @Override. + } + }; + server.init(config); + server.start(); + serverPort = server.getWebServerPort(); + } catch (Exception e) { + Assert.fail("Web server failed to start"); + } + } + + protected Client createClient() { + ClientConfig cfg = new DefaultClientConfig(); + cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class); + return new Client( + new URLConnectionClientHandler(new DummyURLConnectionFactory()), cfg); + } + + protected ClientResponse getResponse(Client client, URI uri) + throws Exception { + ClientResponse resp = + client.resource(uri).accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + if (resp == null || resp.getStatusInfo() + .getStatusCode() != ClientResponse.Status.OK.getStatusCode()) { + String msg = ""; + if (resp != null) { + msg = String.valueOf(resp.getStatusInfo().getStatusCode()); + } + throw new IOException( + "Incorrect response from timeline reader. " + "Status=" + msg); + } + return resp; + } + + protected void verifyHttpResponse(Client client, URI uri, Status status) { + ClientResponse resp = + client.resource(uri).accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertNotNull(resp); + assertTrue("Response from server should have been " + status, + resp.getStatusInfo().getStatusCode() == status.getStatusCode()); + System.out.println("Response is: " + resp.getEntity(String.class)); + } + + protected List verifyFlowEntites(Client client, URI uri, + int noOfEntities) throws Exception { + ClientResponse resp = getResponse(client, uri); + List entities = + resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(noOfEntities, entities.size()); + return entities; + } + + protected static class DummyURLConnectionFactory + implements HttpURLConnectionFactory { + + @Override + public HttpURLConnection getHttpURLConnection(final URL url) + throws IOException { + try { + return (HttpURLConnection) url.openConnection(); + } catch (UndeclaredThrowableException e) { + throw new IOException(e.getCause()); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index 38865f1..943b911 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -24,10 +24,7 @@ import static org.junit.Assert.fail; import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.HttpURLConnection; import java.net.URI; -import java.net.URL; import java.text.DateFormat; import java.util.ArrayList; import java.util.HashMap; @@ -39,7 +36,6 @@ import javax.ws.rs.core.MediaType; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; @@ -50,17 +46,12 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest; +import org.apache.hadoop.yarn.server.timelineservice.common.AbstractHBaseTestBase; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -70,27 +61,27 @@ import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse.Status; import com.sun.jersey.api.client.GenericType; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; -import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; - -public class TestTimelineReaderWebServicesHBaseStorage { - private int serverPort; - private TimelineReaderServer server; - private static HBaseTestingUtility util; + +public class TestTimelineReaderWebServicesHBaseStorage + extends AbstractHBaseTestBase { private static long ts = System.currentTimeMillis(); private static long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts); @BeforeClass - public static void setup() throws Exception { - util = new HBaseTestingUtility(); - Configuration conf = util.getConfiguration(); - conf.setInt("hfile.format.version", 3); - util.startMiniCluster(); - DataGeneratorForTest.createSchema(conf); + public static void setupBeforeClass() throws Exception { + setup(); loadData(); + initialize(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (server != null) { + server.stop(); + server = null; + } + tearDown(); } private static void loadData() throws Exception { @@ -393,84 +384,6 @@ static void writeApplicationEntities(HBaseTimelineWriterImpl hbi) } } - @AfterClass - public static void tearDown() throws Exception { - util.shutdownMiniCluster(); - } - - @Before - public void init() throws Exception { - try { - Configuration config = util.getConfiguration(); - config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); - config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); - config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - "localhost:0"); - config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); - config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, - "org.apache.hadoop.yarn.server.timelineservice.storage." + - "HBaseTimelineReaderImpl"); - config.setInt("hfile.format.version", 3); - server = new TimelineReaderServer() { - @Override - protected void setupOptions(Configuration conf) { - // The parent code tries to use HttpServer2 from this version of - // Hadoop, but the tests are loading in HttpServer2 from - // ${hbase-compatible-hadoop.version}. This version uses Jetty 9 - // while ${hbase-compatible-hadoop.version} uses Jetty 6, and there - // are many differences, including classnames and packages. - // We do nothing here, so that we don't cause a NoSuchMethodError. - // Once ${hbase-compatible-hadoop.version} is changed to Hadoop 3, - // we should be able to remove this @Override. - } - }; - server.init(config); - server.start(); - serverPort = server.getWebServerPort(); - } catch (Exception e) { - Assert.fail("Web server failed to start"); - } - } - - private static Client createClient() { - ClientConfig cfg = new DefaultClientConfig(); - cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class); - return new Client(new URLConnectionClientHandler( - new DummyURLConnectionFactory()), cfg); - } - - private static ClientResponse getResponse(Client client, URI uri) - throws Exception { - ClientResponse resp = - client.resource(uri).accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); - if (resp == null || - resp.getStatusInfo().getStatusCode() != - ClientResponse.Status.OK.getStatusCode()) { - String msg = ""; - if (resp != null) { - msg = String.valueOf(resp.getStatusInfo().getStatusCode()); - } - throw new IOException("Incorrect response from timeline reader. " + - "Status=" + msg); - } - return resp; - } - - private static class DummyURLConnectionFactory - implements HttpURLConnectionFactory { - - @Override - public HttpURLConnection getHttpURLConnection(final URL url) - throws IOException { - try { - return (HttpURLConnection)url.openConnection(); - } catch (UndeclaredThrowableException e) { - throw new IOException(e.getCause()); - } - } - } - private static TimelineEntity newEntity(String type, String id) { TimelineEntity entity = new TimelineEntity(); entity.setIdentifier(new TimelineEntity.Identifier(type, id)); @@ -512,17 +425,6 @@ private static boolean verifyMetrics( return false; } - private static void verifyHttpResponse(Client client, URI uri, - Status status) { - ClientResponse resp = - client.resource(uri).accept(MediaType.APPLICATION_JSON) - .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertNotNull(resp); - assertTrue("Response from server should have been " + status, - resp.getStatusInfo().getStatusCode() == status.getStatusCode()); - System.out.println("Response is: " + resp.getEntity(String.class)); - } - @Test public void testGetFlowRun() throws Exception { Client client = createClient(); @@ -1065,6 +967,44 @@ public void testGetFlows() throws Exception { } @Test + public void testGetFlowsForPagination() throws Exception { + String FROMID_KEY = "FROM_ID"; + Client client = createClient(); + int noOfEntities = 3; + int limit = 2; + try { + String flowURI = "http://localhost:" + serverPort + "/ws/v2/" + + "timeline/clusters/cluster1/flows"; + URI uri = URI.create(flowURI); + List flowEntites = + verifyFlowEntites(client, uri, 3, new int[] {3, 2, 1}, + new String[] {"flow1", "flow_name", "flow_name2"}); + FlowActivityEntity fEntity1 = flowEntites.get(0); + FlowActivityEntity fEntity3 = flowEntites.get(noOfEntities - 1); + + uri = URI.create(flowURI + "?limit=" + limit); + flowEntites = verifyFlowEntites(client, uri, limit); + assertEquals(fEntity1, flowEntites.get(0)); + FlowActivityEntity fEntity2 = flowEntites.get(limit - 1); + + uri = URI + .create(flowURI + "?limit=" + limit + "&&fromid=" + + fEntity2.getInfo().get(FROMID_KEY)); + flowEntites = verifyFlowEntites(client, uri, noOfEntities - limit + 1); + assertEquals(fEntity2, flowEntites.get(0)); + assertEquals(fEntity3, flowEntites.get(noOfEntities - limit)); + + uri = URI + .create(flowURI + "?limit=" + limit + "&&fromid=" + + fEntity3.getInfo().get(FROMID_KEY)); + flowEntites = verifyFlowEntites(client, uri, 1); + assertEquals(fEntity3, flowEntites.get(0)); + } finally { + client.destroy(); + } + } + + @Test public void testGetApp() throws Exception { Client client = createClient(); try { @@ -2198,14 +2138,6 @@ public void testGetFlowAppsNotPresent() throws Exception { } } - @After - public void stop() throws Exception { - if (server != null) { - server.stop(); - server = null; - } - } - @Test public void testGenericEntitiesForPagination() throws Exception { Client client = createClient(); @@ -2276,7 +2208,8 @@ private TimelineEntity verifyPaginatedEntites(List entities, return entity; } - private void verifyFlowEntites(Client client, URI uri, int noOfEntities, + private List verifyFlowEntites(Client client, URI uri, + int noOfEntities, int[] a, String[] flowsInSequence) throws Exception { ClientResponse resp = getResponse(client, uri); List entities = @@ -2292,6 +2225,7 @@ private void verifyFlowEntites(Client client, URI uri, int noOfEntities, timelineEntity.getInfo().get("SYSTEM_INFO_FLOW_NAME")); assertEquals(a[count++], timelineEntity.getFlowRuns().size()); } + return entities; } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index bb77e36..b1ae54d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; +import java.util.Collections; +import java.util.List; + import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; @@ -103,6 +107,31 @@ public static FlowActivityRowKey parseRowKey(byte[] rowKey) { return new FlowActivityRowKeyConverter().decode(rowKey); } + public static FlowActivityRowKey decode(String fromId) { + List split = TimelineReaderUtils.split(fromId); + if (split == null || split.size() != 4) { + throw new IllegalArgumentException("Invalid fromid is provided."); + } + Long dayTs = null; + try { + dayTs = Long.valueOf(split.get(1)); + } catch (NumberFormatException e) { + new IllegalArgumentException( + "Invalid fromid is provided." + e.getMessage()); + } + return new FlowActivityRowKey(split.get(0), dayTs, split.get(2), + split.get(3)); + } + + public String getRowKeyAsString() { + if (this.clusterId == null || this.dayTs == null || this.userId == null + || this.flowName == null) { + return null; + } + return TimelineReaderUtils.joinAndEscapeStrings(new String[] { + this.clusterId, this.dayTs.toString(), this.userId, this.flowName }); + } + /** * Encodes and decodes row key for flow activity table. The row key is of the * form : clusterId!dayTimestamp!user!flowName. dayTimestamp(top of the day diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index c741d0e..84e23a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -110,11 +110,25 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); String clusterId = getContext().getClusterId(); - if (getFilters().getCreatedTimeBegin() == 0L && - getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { + if (getFilters().getFromId() == null + && getFilters().getCreatedTimeBegin() == 0L + && getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { // All records have to be chosen. scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId) .getRowKeyPrefix()); + } else if (getFilters().getFromId() != null) { + FlowActivityRowKey key = + FlowActivityRowKey.decode(getFilters().getFromId()); + if (!clusterId.equals(key.getClusterId())) { + throw new IllegalArgumentException( + "fromId doesn't belong to clusterId=" + clusterId); + } + scan.setStartRow(key.getRowKey()); + scan.setStopRow( + new FlowActivityRowKeyPrefix(clusterId, + (getFilters().getCreatedTimeBegin() <= 0 ? 0 + : (getFilters().getCreatedTimeBegin() - 1))) + .getRowKeyPrefix()); } else { scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters() .getCreatedTimeEnd()).getRowKeyPrefix()); @@ -157,7 +171,7 @@ protected TimelineEntity parseEntity(Result result) throws IOException { flowRun.setId(flowRun.getId()); flowActivity.addFlowRun(flowRun); } - + flowActivity.getInfo().put(FROMID_KEY, rowKey.getRowKeyAsString()); return flowActivity; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 4c88cd3..d247784 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -62,6 +62,7 @@ public abstract class TimelineEntityReader extends AbstractTimelineStorageReader { private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); + protected static final String FROMID_KEY = "FROM_ID"; private final boolean singleEntityRead; private TimelineDataToRetrieve dataToRetrieve; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java index 8e24ac9..4362c20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java @@ -174,7 +174,7 @@ protected void setupOptions(Configuration conf) { } @VisibleForTesting - int getWebServerPort() { + public int getWebServerPort() { return readerWebServer.getConnectorAddress(0).getPort(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java index c93c631..5fc36aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java @@ -27,11 +27,21 @@ /** * Set of utility methods to be used across timeline reader. */ -final class TimelineReaderUtils { +public class TimelineReaderUtils { private TimelineReaderUtils() { } /** + * Default delimiter for joining strings. + */ + private static final char DEFAULT_DELIMITER_CHAR = '!'; + + /** + * Default escape character used for joining strings + */ + private static final char DEFAULT_ESCAPE_CHAR = '*'; + + /** * Split the passed string along the passed delimiter character while looking * for escape char to interpret the splitted parts correctly. For delimiter or * escape character to be interpreted as part of the string, they have to be @@ -168,4 +178,14 @@ static String joinAndEscapeStrings(final String[] strs, // Join the strings after they have been escaped. return StringUtils.join(strs, delimiterChar); } + + public static List split(final String str) + throws IllegalArgumentException { + return split(str, DEFAULT_DELIMITER_CHAR, DEFAULT_ESCAPE_CHAR); + } + + public static String joinAndEscapeStrings(final String[] strs) { + return joinAndEscapeStrings(strs, DEFAULT_DELIMITER_CHAR, + DEFAULT_ESCAPE_CHAR); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index abf93df..d3e275b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -1350,8 +1350,9 @@ public TimelineEntity getFlowRun( @Context HttpServletRequest req, @Context HttpServletResponse res, @QueryParam("limit") String limit, - @QueryParam("daterange") String dateRange) { - return getFlows(req, res, null, limit, dateRange); + @QueryParam("daterange") String dateRange, + @QueryParam("fromid") String fromId) { + return getFlows(req, res, null, limit, dateRange, fromId); } /** @@ -1397,7 +1398,8 @@ public TimelineEntity getFlowRun( @Context HttpServletResponse res, @PathParam("clusterid") String clusterId, @QueryParam("limit") String limit, - @QueryParam("daterange") String dateRange) { + @QueryParam("daterange") String dateRange, + @QueryParam("fromid") String fromId) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -1414,7 +1416,7 @@ public TimelineEntity getFlowRun( TimelineEntityFilters entityFilters = TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, null, null, null, null, null, null, null, null, null, - null); + fromId); entityFilters.setCreatedTimeBegin(range.dateStart); entityFilters.setCreatedTimeEnd(range.dateEnd); entities = timelineReaderManager.getEntities( -- 2.10.1 (Apple Git-78)