From f7beb9dbf42211895676a224b6dd0c1deee0cccb Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Tue, 7 Feb 2017 16:24:52 +0530 Subject: [PATCH] YARN-6027 --- .../common/AbstractHBaseTestBase.java | 160 ++++++++++++ .../TestHBaseStorageFlowActivityForFilters.java | 277 +++++++++++++++++++++ .../storage/reader/FlowActivityEntityReader.java | 152 ++++++++++- .../storage/reader/TimelineEntityReader.java | 7 +- .../reader/TimelineEntityFilters.java | 9 + .../reader/TimelineReaderServer.java | 2 +- .../reader/TimelineReaderWebServices.java | 31 ++- 7 files changed, 623 insertions(+), 15 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/common/AbstractHBaseTestBase.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivityForFilters.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/common/AbstractHBaseTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/common/AbstractHBaseTestBase.java new file mode 100644 index 0000000..5a0d0cd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/common/AbstractHBaseTestBase.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.common; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.util.List; + +import javax.ws.rs.core.MediaType; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderServer; +import org.apache.hadoop.yarn.server.timelineservice.storage.DataGeneratorForTest; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.junit.Assert; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.ClientResponse.Status; +import com.sun.jersey.api.client.GenericType; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory; +import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; + +public abstract class AbstractHBaseTestBase { + protected static int serverPort; + protected static TimelineReaderServer server; + protected static HBaseTestingUtility util; + + public static void setup() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + DataGeneratorForTest.createSchema(util.getConfiguration()); + } + + public static void tearDown() throws Exception { + util.shutdownMiniCluster(); + } + + protected static void initialize() throws Exception { + try { + Configuration config = util.getConfiguration(); + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, + "localhost:0"); + config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); + config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, + "org.apache.hadoop.yarn.server.timelineservice.storage." + + "HBaseTimelineReaderImpl"); + config.setInt("hfile.format.version", 3); + server = new TimelineReaderServer() { + @Override + protected void setupOptions(Configuration conf) { + // The parent code tries to use HttpServer2 from this version of + // Hadoop, but the tests are loading in HttpServer2 from + // ${hbase-compatible-hadoop.version}. This version uses Jetty 9 + // while ${hbase-compatible-hadoop.version} uses Jetty 6, and there + // are many differences, including classnames and packages. + // We do nothing here, so that we don't cause a NoSuchMethodError. + // Once ${hbase-compatible-hadoop.version} is changed to Hadoop 3, + // we should be able to remove this @Override. + } + }; + server.init(config); + server.start(); + serverPort = server.getWebServerPort(); + } catch (Exception e) { + Assert.fail("Web server failed to start"); + } + } + + protected Client createClient() { + ClientConfig cfg = new DefaultClientConfig(); + cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class); + return new Client( + new URLConnectionClientHandler(new DummyURLConnectionFactory()), cfg); + } + + protected ClientResponse getResponse(Client client, URI uri) + throws Exception { + ClientResponse resp = + client.resource(uri).accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + if (resp == null || resp.getStatusInfo() + .getStatusCode() != ClientResponse.Status.OK.getStatusCode()) { + String msg = ""; + if (resp != null) { + msg = String.valueOf(resp.getStatusInfo().getStatusCode()); + } + throw new IOException( + "Incorrect response from timeline reader. " + "Status=" + msg); + } + return resp; + } + + protected void verifyHttpResponse(Client client, URI uri, Status status) { + ClientResponse resp = + client.resource(uri).accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON).get(ClientResponse.class); + assertNotNull(resp); + assertTrue("Response from server should have been " + status, + resp.getStatusInfo().getStatusCode() == status.getStatusCode()); + System.out.println("Response is: " + resp.getEntity(String.class)); + } + + protected List verifyFlowEntites(Client client, URI uri, + int noOfEntities) throws Exception { + ClientResponse resp = getResponse(client, uri); + List entities = + resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(noOfEntities, entities.size()); + return entities; + } + + protected static class DummyURLConnectionFactory + implements HttpURLConnectionFactory { + + @Override + public HttpURLConnection getHttpURLConnection(final URL url) + throws IOException { + try { + return (HttpURLConnection) url.openConnection(); + } catch (UndeclaredThrowableException e) { + throw new IOException(e.getCause()); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivityForFilters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivityForFilters.java new file mode 100644 index 0000000..c57614c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivityForFilters.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.net.URI; +import java.util.List; + +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.server.timelineservice.common.AbstractHBaseTestBase; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.sun.jersey.api.client.Client; + +/** + * Tests the FlowRun and FlowActivity Tables. + */ +public class TestHBaseStorageFlowActivityForFilters + extends AbstractHBaseTestBase { + + private final static KeyConverter longKeyConverter = + new LongKeyConverter(); + private static Connection conn; + private static TypedBufferedMutator flowActivityTable; + private static final long NUM_OF_MILLISECONDS_IN_A_DAY = 60 * 60 * 24 * 1000; + + private static String clusterId = "cluster1"; + private static String[] flowNames = + new String[] { "flow1", "flow2", "flow3" }; + private static String[] users = + new String[] { "user1", "user2", "user3", "user4", "user5" }; + private static String appId = "application_11111111111_0001"; + private static String flowVersion = "CF7022C10F1354"; + private static long today = System.currentTimeMillis(); + private static long yesterday = today - NUM_OF_MILLISECONDS_IN_A_DAY; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + setup(); + conn = ConnectionFactory.createConnection(util.getConfiguration()); + flowActivityTable = + new FlowActivityTable().getTableMutator(util.getConfiguration(), conn); + loadDataIntoFlowTable(); + initialize(); + } + + private static void loadDataIntoFlowTable() throws IOException { + writeIntoFlowActivity(yesterday); + writeIntoFlowActivity(today); + flowActivityTable.flush(); + } + + private static void writeIntoFlowActivity(long timestamp) throws IOException { + for (int i = 0; i < users.length; i++) { + for (int j = 0; j < flowNames.length; j++) { + writeToFlowActivityTable(clusterId, users[i], flowNames[j], flowVersion, + appId, System.currentTimeMillis(), timestamp); + } + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (server != null) { + server.stop(); + server = null; + } + if (flowActivityTable != null) { + flowActivityTable.close(); + } + if (conn != null) { + conn.close(); + } + tearDown(); + } + + @Test + public void testFlowsForPagination() throws Exception { + Client client = createClient(); + int noOfEntities = users.length * flowNames.length * 2; + int limit = 20; + try { + String flowURI = "http://localhost:" + serverPort + "/ws/v2/" + + "timeline/clusters/" + clusterId + "/flows"; + URI uri = URI.create(flowURI); + List flowEntites = + verifyFlowEntites(client, uri, noOfEntities); + for (FlowActivityEntity flowActivityEntity : flowEntites) { + assertEquals("Number of flow runs does not match", 1, + flowActivityEntity.getFlowRuns().size()); + } + FlowActivityEntity fEntity1 = flowEntites.get(0); + FlowActivityEntity fEntity30 = flowEntites.get(noOfEntities - 1); + + uri = URI.create(flowURI + "?limit=" + limit); + flowEntites = verifyFlowEntites(client, uri, limit); + assertEquals(fEntity1, flowEntites.get(0)); + FlowActivityEntity fEntity20 = flowEntites.get(limit - 1); + + uri = URI.create( + flowURI + "?limit=" + limit + "&&fromid=" + fEntity20.getId()); + flowEntites = verifyFlowEntites(client, uri, noOfEntities - limit + 1); + assertEquals(fEntity20, flowEntites.get(0)); + assertEquals(fEntity30, flowEntites.get(noOfEntities - limit)); + + uri = URI.create( + flowURI + "?limit=" + limit + "&&fromid=" + fEntity30.getId()); + flowEntites = verifyFlowEntites(client, uri, 1); + assertEquals(fEntity30, flowEntites.get(0)); + } finally { + client.destroy(); + } + } + + @Test + public void testCollapsedFlowsForPagination() throws Exception { + Client client = createClient(); + int limit = 10; + int noOfEntitiesPerDay = users.length * flowNames.length; + try { + String flowURI = "http://localhost:" + serverPort + "/ws/v2/" + + "timeline/clusters/" + clusterId + "/flows"; + URI uri = URI.create(flowURI + "?collapse=true"); + List flowEntites = + verifyFlowEntites(client, uri, noOfEntitiesPerDay); + for (FlowActivityEntity flowActivityEntity : flowEntites) { + assertEquals("Number of flow runs are not collapsed", 2, + flowActivityEntity.getFlowRuns().size()); + } + FlowActivityEntity fEntity1 = flowEntites.get(0); + FlowActivityEntity fEntity15 = flowEntites.get(noOfEntitiesPerDay - 1); + + uri = URI.create(flowURI + "?collapse=true&&limit=" + limit); + flowEntites = verifyFlowEntites(client, uri, limit); + assertEquals(fEntity1, flowEntites.get(0)); + FlowActivityEntity fEntity10 = flowEntites.get(limit - 1); + + uri = URI.create(flowURI + "?collapse=true&&limit=" + limit + "&&fromid=" + + fEntity10.getId()); + flowEntites = + verifyFlowEntites(client, uri, noOfEntitiesPerDay - limit + 1); + assertEquals(fEntity10, flowEntites.get(0)); + assertEquals(fEntity15, flowEntites.get(noOfEntitiesPerDay - limit)); + + uri = URI.create(flowURI + "?collapse=true&&limit=" + limit + "&&fromid=" + + fEntity15.getId()); + flowEntites = verifyFlowEntites(client, uri, 1); + assertEquals(fEntity15, flowEntites.get(0)); + } finally { + client.destroy(); + } + } + + @Test + public void testFlowsForPerUserPagination() throws Exception { + Client client = createClient(); + int noOfEntitiesPerUser = + (users.length * flowNames.length * 2) / users.length; + int limit = 4; + try { + String flowURI = "http://localhost:" + serverPort + "/ws/v2/" + + "timeline/clusters/" + clusterId + "/flows"; + URI uri = URI.create(flowURI + "?userid=user1"); + List flowEntites = + verifyFlowEntites(client, uri, noOfEntitiesPerUser); + for (FlowActivityEntity flowActivityEntity : flowEntites) { + assertEquals("Number of flow runs does not match", 1, + flowActivityEntity.getFlowRuns().size()); + assertEquals("user1", + flowActivityEntity.getInfo().get("SYSTEM_INFO_USER")); + } + FlowActivityEntity fEntity1 = flowEntites.get(0); + FlowActivityEntity fEntity6 = flowEntites.get(noOfEntitiesPerUser - 1); + + uri = URI.create(flowURI + "?userid=user1&&limit=" + limit); + flowEntites = verifyFlowEntites(client, uri, limit); + assertEquals(fEntity1, flowEntites.get(0)); + FlowActivityEntity fEntity4 = flowEntites.get(limit - 1); + + uri = URI.create(flowURI + "?userid=user1&&limit=" + limit + "&&fromid=" + + fEntity4.getId()); + flowEntites = + verifyFlowEntites(client, uri, noOfEntitiesPerUser - limit + 1); + assertEquals(fEntity4, flowEntites.get(0)); + assertEquals(fEntity6, flowEntites.get(noOfEntitiesPerUser - limit)); + + uri = URI.create(flowURI + "?userid=user1&&limit=" + limit + "&&fromid=" + + fEntity6.getId()); + flowEntites = verifyFlowEntites(client, uri, 1); + assertEquals(fEntity6, flowEntites.get(0)); + } finally { + client.destroy(); + } + } + + @Test + public void testCollapsedFlowsForPerUser() throws Exception { + Client client = createClient(); + int limit = 2; + int noOfEntitiesPerUser = (users.length * flowNames.length) / users.length; + try { + String flowURI = "http://localhost:" + serverPort + "/ws/v2/" + + "timeline/clusters/" + clusterId + "/flows"; + URI uri = URI.create(flowURI + "?collapse=true&&userid=user1"); + List flowEntites = + verifyFlowEntites(client, uri, noOfEntitiesPerUser); + for (FlowActivityEntity flowActivityEntity : flowEntites) { + assertEquals("Number of flow runs are not collapsed", 2, + flowActivityEntity.getFlowRuns().size()); + assertEquals("user1", + flowActivityEntity.getInfo().get("SYSTEM_INFO_USER")); + } + FlowActivityEntity fEntity1 = flowEntites.get(0); + FlowActivityEntity fEntity3 = flowEntites.get(noOfEntitiesPerUser - 1); + + uri = + URI.create(flowURI + "?collapse=true&&userid=user1&&limit=" + limit); + flowEntites = verifyFlowEntites(client, uri, limit); + assertEquals(fEntity1, flowEntites.get(0)); + FlowActivityEntity fEntity2 = flowEntites.get(limit - 1); + + uri = URI.create(flowURI + "?collapse=true&&userid=user1&&limit=" + limit + + "&&fromid=" + fEntity2.getId()); + flowEntites = + verifyFlowEntites(client, uri, noOfEntitiesPerUser - limit + 1); + assertEquals(fEntity2, flowEntites.get(0)); + assertEquals(fEntity3, flowEntites.get(noOfEntitiesPerUser - limit)); + + uri = URI.create(flowURI + "?collapse=true&&userid=user1&&limit=" + limit + + "&&fromid=" + fEntity3.getId()); + flowEntites = verifyFlowEntites(client, uri, 1); + assertEquals(fEntity3, flowEntites.get(0)); + } finally { + client.destroy(); + } + } + + private static void writeToFlowActivityTable(String clusterId, String userId, + String flowName, String flowVersion, String appId, long flowRunId, + long timestamp) throws IOException { + FlowRunRowKey flowRunRowKey = + new FlowRunRowKey(clusterId, userId, flowName, flowRunId); + // store in flow activity table + byte[] flowActivityRowKeyBytes = + new FlowActivityRowKey(flowRunRowKey.getClusterId(), timestamp, + flowRunRowKey.getUserId(), flowName).getRowKey(); + byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); + FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes, + flowActivityTable, qualifier, null, flowVersion, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index c741d0e..68d8b9a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -19,7 +19,10 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; @@ -27,7 +30,6 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -57,6 +59,9 @@ */ private final KeyConverter longKeyConverter = new LongKeyConverter(); + private Map userToFlows = + new LinkedHashMap<>(); + public FlowActivityEntityReader(TimelineReaderContext ctxt, TimelineEntityFilters entityFilters, TimelineDataToRetrieve toRetrieve) { @@ -110,11 +115,26 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); String clusterId = getContext().getClusterId(); - if (getFilters().getCreatedTimeBegin() == 0L && - getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { + if (getFilters().getFromId() == null + && getFilters().getCreatedTimeBegin() == 0L + && getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { // All records have to be chosen. scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId) .getRowKeyPrefix()); + } else if (getFilters().getFromId() != null && !getFilters().isCollapse()) { + String[] split = getFilters().getFromId().split("/"); + Long dayTs = Long.parseLong(split[1]); + String[] userToFlow = split[2].split("@"); + String userId = userToFlow[0]; + String flowName = userToFlow[1]; + FlowActivityRowKey key = + new FlowActivityRowKey(clusterId, dayTs, userId, flowName); + scan.setStartRow(key.getRowKey()); + scan.setStopRow( + new FlowActivityRowKeyPrefix(clusterId, + (getFilters().getCreatedTimeBegin() <= 0 ? 0 + : (getFilters().getCreatedTimeBegin() - 1))) + .getRowKeyPrefix()); } else { scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters() .getCreatedTimeEnd()).getRowKeyPrefix()); @@ -122,10 +142,6 @@ protected ResultScanner getResults(Configuration hbaseConf, .getCreatedTimeBegin() <= 0 ? 0 : (getFilters().getCreatedTimeBegin() - 1))).getRowKeyPrefix()); } - // use the page filter to limit the result to the page size - // the scanner may still return more than the limit; therefore we need to - // read the right number as we iterate - scan.setFilter(new PageFilter(getFilters().getLimit())); return getTable().getResultScanner(hbaseConf, conn, scan); } @@ -137,10 +153,25 @@ protected TimelineEntity parseEntity(Result result) throws IOException { String user = rowKey.getUserId(); String flowName = rowKey.getFlowName(); + if (getContext().getUserId() != null + && !getContext().getUserId().equalsIgnoreCase(user)) { + return null; + } + FlowActivityEntity flowActivity = new FlowActivityEntity( getContext().getClusterId(), time, user, flowName); // set the id flowActivity.setId(flowActivity.getId()); + + if (getFilters().isCollapse()) { + UserToFlow userToFlow = new UserToFlow(user, flowName); + FlowActivityEntity fActivity = + userToFlows.putIfAbsent(userToFlow, flowActivity); + if (fActivity != null) { + flowActivity = fActivity; + } + } + // get the list of run ids along with the version that are associated with // this flow on this day Map runIdsMap = @@ -157,7 +188,112 @@ protected TimelineEntity parseEntity(Result result) throws IOException { flowRun.setId(flowRun.getId()); flowActivity.addFlowRun(flowRun); } - return flowActivity; } + + static class UserToFlow { + private String user; + private String flowName; + + UserToFlow(String user, String flowName) { + this.user = user; + this.flowName = flowName; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getUser()); + sb.append('@'); + sb.append(getFlowName()); + return sb.toString(); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = + prime * result + ((getUser() == null) ? 0 : getUser().hashCode()); + result = prime * result + + ((getFlowName() == null) ? 0 : getFlowName().hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof UserToFlow)) { + return false; + } + UserToFlow other = (UserToFlow) obj; + if (getUser() == null) { + if (other.getUser() != null) { + return false; + } + } else if (!user.equals(other.getUser())) { + return false; + } + if (getFlowName() == null) { + if (other.getFlowName() != null) { + return false; + } + } else if (!flowName.equals(other.getFlowName())) { + return false; + } + return true; + } + + public String getFlowName() { + return flowName; + } + + public String getUser() { + return user; + } + } + + @Override + protected Set processResults(ResultScanner results) + throws IOException { + Set entities = new LinkedHashSet<>(); + try { + for (Result result : results) { + TimelineEntity entity = parseEntity(result); + if (entity == null || getFilters().isCollapse()) { + continue; + } + entities.add(entity); + if (entities.size() == getFilters().getLimit()) { + break; + } + } + + // process collapsed entities + if (getFilters().isCollapse()) { + UserToFlow fromId = null; + if (getFilters().getFromId() != null) { + // validation is done at getResult method, so parsing of fromId never + // fail here. + String[] split = getFilters().getFromId().split("/")[2].split("@"); + fromId = new UserToFlow(split[0], split[1]); + } + boolean found = false; + for (UserToFlow key : userToFlows.keySet()) { + if (!found && fromId != null && !(found = key.equals(fromId))) { + continue; + } + entities.add(userToFlows.get(key)); + if (entities.size() == getFilters().getLimit()) { + break; + } + } + } + return entities; + } finally { + results.close(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 4c88cd3..39f2a3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -232,12 +232,17 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) validateParams(); augmentParams(hbaseConf, conn); - Set entities = new LinkedHashSet<>(); FilterList filterList = createFilterList(); if (LOG.isDebugEnabled() && filterList != null) { LOG.debug("FilterList created for scan is - " + filterList); } ResultScanner results = getResults(hbaseConf, conn, filterList); + return processResults(results); + } + + protected Set processResults(ResultScanner results) + throws IOException { + Set entities = new LinkedHashSet<>(); try { for (Result result : results) { TimelineEntity entity = parseEntity(result); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java index 79a83c6..ea02b6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineEntityFilters.java @@ -128,6 +128,7 @@ private TimelineFilterList eventFilters; private Long fromIdPrefix; private String fromId; + private boolean collapse; private static final long DEFAULT_BEGIN_TIME = 0L; private static final long DEFAULT_END_TIME = Long.MAX_VALUE; @@ -284,4 +285,12 @@ public Long getFromIdPrefix() { public void setFromIdPrefix(Long fromIdPrefix) { this.fromIdPrefix = fromIdPrefix; } + + public boolean isCollapse() { + return collapse; + } + + public void setCollapse(boolean collapse) { + this.collapse = collapse; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java index 76f7495..8ce0a03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java @@ -165,7 +165,7 @@ protected void setupOptions(Configuration conf) { } @VisibleForTesting - int getWebServerPort() { + public int getWebServerPort() { return readerWebServer.getConnectorAddress(0).getPort(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index abf93df..d12a46d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -1334,6 +1334,13 @@ public TimelineEntity getFlowRun( * 2 dates. * "daterange=20150711-" returns flows active on and after 20150711. * "daterange=-20150711" returns flows active on and before 20150711. + * @param userId If specified, retrieves all flows submitted by this user. + * @param fromId If specified, retrieve the next set of flow entities + * from the given fromId. The set of flow entities retrieved is inclusive + * of specified fromId. The fromId values should be same as fromId info + * field in flow entities. It defines flow entity id. + * @param collapse If specified, returns all the flows run in different + * days are clubbed together per user. The values are either true or false. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of FlowActivityEntity instances are returned.
@@ -1350,8 +1357,11 @@ public TimelineEntity getFlowRun( @Context HttpServletRequest req, @Context HttpServletResponse res, @QueryParam("limit") String limit, - @QueryParam("daterange") String dateRange) { - return getFlows(req, res, null, limit, dateRange); + @QueryParam("daterange") String dateRange, + @QueryParam("userid") String userId, + @QueryParam("fromid") String fromId, + @QueryParam("collapse") String collapse) { + return getFlows(req, res, null, limit, dateRange, userId, fromId, collapse); } /** @@ -1380,6 +1390,13 @@ public TimelineEntity getFlowRun( * 2 dates. * "daterange=20150711-" returns flows active on and after 20150711. * "daterange=-20150711" returns flows active on and before 20150711. + * @param userId If specified, retrieves all flows submitted by this user. + * @param fromId If specified, retrieve the next set of flow entities + * from the given fromId. The set of flow entities retrieved is inclusive + * of specified fromId. The fromId values should be same as fromId info + * field in flow entities. It defines flow entity id. + * @param collapse If specified, returns all the flows run in different + * days are clubbed together per user. The values are either true or false. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of FlowActivityEntity instances are returned.
@@ -1397,7 +1414,10 @@ public TimelineEntity getFlowRun( @Context HttpServletResponse res, @PathParam("clusterid") String clusterId, @QueryParam("limit") String limit, - @QueryParam("daterange") String dateRange) { + @QueryParam("daterange") String dateRange, + @QueryParam("userid") String userId, + @QueryParam("fromid") String fromId, + @QueryParam("collapse") String collapse) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -1414,12 +1434,13 @@ public TimelineEntity getFlowRun( TimelineEntityFilters entityFilters = TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, null, null, null, null, null, null, null, null, null, - null); + fromId); entityFilters.setCreatedTimeBegin(range.dateStart); entityFilters.setCreatedTimeEnd(range.dateEnd); + entityFilters.setCollapse(Boolean.parseBoolean(collapse)); entities = timelineReaderManager.getEntities( TimelineReaderWebServicesUtils.createTimelineReaderContext( - clusterId, null, null, null, null, + clusterId, userId, null, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null), entityFilters, TimelineReaderWebServicesUtils. createTimelineDataToRetrieve(null, null, null, null)); -- 2.7.4 (Apple Git-66)