diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index 1b66fcb..4cba4df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -40,6 +40,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.junit.After; import org.junit.AfterClass; @@ -352,6 +355,7 @@ private static void loadData() throws Exception { flowVersion2, runid2, entity3.getId(), te3); hbi.write(cluster, user, flow, flowVersion, runid, "application_1111111111_1111", userEntities); + writeApplicationEntities(hbi); hbi.flush(); } finally { if (hbi != null) { @@ -360,6 +364,25 @@ private static void loadData() throws Exception { } } + static void writeApplicationEntities(HBaseTimelineWriterImpl hbi) + throws IOException { + long currentTimeMillis = System.currentTimeMillis(); + int count = 1; + for (long i = 1; i <= 3; i++) { + for (int j = 1; j <= 5; j++) { + TimelineEntities te = new TimelineEntities(); + ApplicationId appId = + BuilderUtils.newApplicationId(currentTimeMillis, count++); + ApplicationEntity appEntity = new ApplicationEntity(); + appEntity.setId(appId.toString()); + appEntity.setCreatedTime(currentTimeMillis); + te.addEntity(appEntity); + hbi.write("cluster1", "user1", "flow1", "CF7022C10F1354", i, + appEntity.getId(), te); + } + } + } + @AfterClass public static void tearDown() throws Exception { util.shutdownMiniCluster(); @@ -2267,4 +2290,137 @@ private TimelineEntity verifyPaginatedEntites(List entities, } return entity; } + + @Test + public void testForFlowAppsPagination() throws Exception { + Client client = createClient(); + try { + // app entities stored is 15 during initialization. + int totalAppEntities = 15; + String resourceUri = "http://localhost:" + serverPort + "/ws/v2/" + + "timeline/clusters/cluster1/users/user1/flows/flow1/apps"; + URI uri = URI.create(resourceUri); + ClientResponse resp = getResponse(client, uri); + List entities = + resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(totalAppEntities, entities.size()); + TimelineEntity entity1 = entities.get(0); + TimelineEntity entity15 = entities.get(totalAppEntities - 1); + + int limit = 10; + String queryParam = "?limit=" + limit; + uri = URI.create(resourceUri + queryParam); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(limit, entities.size()); + assertEquals(entity1, entities.get(0)); + TimelineEntity entity10 = entities.get(limit - 1); + + uri = + URI.create(resourceUri + queryParam + "&fromid=" + entity10.getId()); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(6, entities.size()); + assertEquals(entity10, entities.get(0)); + assertEquals(entity15, entities.get(5)); + + } finally { + client.destroy(); + } + } + + @Test + public void testForFlowRunAppsPagination() throws Exception { + Client client = createClient(); + try { + // app entities stored is 15 during initialization. + int totalAppEntities = 5; + String resourceUri = "http://localhost:" + serverPort + "/ws/v2/" + + "timeline/clusters/cluster1/users/user1/flows/flow1/runs/1/apps"; + URI uri = URI.create(resourceUri); + ClientResponse resp = getResponse(client, uri); + List entities = + resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(totalAppEntities, entities.size()); + TimelineEntity entity1 = entities.get(0); + TimelineEntity entity5 = entities.get(totalAppEntities - 1); + + int limit = 3; + String queryParam = "?limit=" + limit; + uri = URI.create(resourceUri + queryParam); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(limit, entities.size()); + assertEquals(entity1, entities.get(0)); + TimelineEntity entity3 = entities.get(limit - 1); + + uri = + URI.create(resourceUri + queryParam + "&fromid=" + entity3.getId()); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(3, entities.size()); + assertEquals(entity3, entities.get(0)); + assertEquals(entity5, entities.get(2)); + + } finally { + client.destroy(); + } + } + + @Test + public void testForFlowRunsPagination() throws Exception { + Client client = createClient(); + try { + // app entities stored is 15 during initialization. + int totalRuns = 3; + String resourceUri = "http://localhost:" + serverPort + "/ws/v2/" + + "timeline/clusters/cluster1/users/user1/flows/flow1/runs"; + URI uri = URI.create(resourceUri); + ClientResponse resp = getResponse(client, uri); + List entities = + resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(totalRuns, entities.size()); + TimelineEntity entity1 = entities.get(0); + TimelineEntity entity3 = entities.get(totalRuns - 1); + + int limit = 2; + String queryParam = "?limit=" + limit; + uri = URI.create(resourceUri + queryParam); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(limit, entities.size()); + assertEquals(entity1, entities.get(0)); + TimelineEntity entity2 = entities.get(limit - 1); + + + uri = URI.create(resourceUri + queryParam + "&fromid=" + + entity2.getInfo().get("SYSTEM_INFO_FLOW_RUN_ID")); + resp = getResponse(client, uri); + entities = resp.getEntity(new GenericType>() { + }); + assertNotNull(entities); + assertEquals(3, entities.size()); + assertEquals(entity2, entities.get(0)); + assertEquals(entity3, entities.get(1)); + + } finally { + client.destroy(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index 7e4288c..9349d59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -1098,6 +1098,9 @@ public TimelineEntity getFlowRun( * METRICS makes sense for flow runs hence only ALL or METRICS are * supported as fields for fetching flow runs. Other fields will lead to * HTTP 400 (Bad Request) response. (Optional query param). + * @param fromid Defines the flow run id. If specified, retrieve the next + * set of earlier entities from specified id. The set also includes + * specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of FlowRunEntity instances for the given flow are @@ -1119,7 +1122,8 @@ public TimelineEntity getFlowRun( @QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimeend") String createdTimeEnd, @QueryParam("metricstoretrieve") String metricsToRetrieve, - @QueryParam("fields") String fields) { + @QueryParam("fields") String fields, + @QueryParam("fromid") String fromId) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -1141,7 +1145,7 @@ public TimelineEntity getFlowRun( entities = timelineReaderManager.getEntities(context, TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, null, null, null, - null, null, null, null, null), + null, null, null, null, fromId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( null, metricsToRetrieve, fields, null)); } catch (Exception e) { @@ -1183,6 +1187,9 @@ public TimelineEntity getFlowRun( * METRICS makes sense for flow runs hence only ALL or METRICS are * supported as fields for fetching flow runs. Other fields will lead to * HTTP 400 (Bad Request) response. (Optional query param). + * @param fromid Defines the flow run id. If specified, retrieve the next + * set of earlier entities from specified id. The set also includes + * specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing a * set of FlowRunEntity instances for the given flow are @@ -1205,9 +1212,10 @@ public TimelineEntity getFlowRun( @QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimeend") String createdTimeEnd, @QueryParam("metricstoretrieve") String metricsToRetrieve, - @QueryParam("fields") String fields) { + @QueryParam("fields") String fields, + @QueryParam("fromid") String fromId) { return getFlowRuns(req, res, null, userId, flowName, limit, - createdTimeStart, createdTimeEnd, metricsToRetrieve, fields); + createdTimeStart, createdTimeEnd, metricsToRetrieve, fields, fromId); } /** @@ -1261,7 +1269,8 @@ public TimelineEntity getFlowRun( @QueryParam("createdtimestart") String createdTimeStart, @QueryParam("createdtimeend") String createdTimeEnd, @QueryParam("metricstoretrieve") String metricsToRetrieve, - @QueryParam("fields") String fields) { + @QueryParam("fields") String fields, + @QueryParam("fromid") String fromId) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -1280,7 +1289,7 @@ public TimelineEntity getFlowRun( TimelineEntityType.YARN_FLOW_RUN.toString(), null, null), TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, null, null, null, - null, null, null, null, null), + null, null, null, null, fromId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( null, metricsToRetrieve, fields, null)); } catch (Exception e) { @@ -1720,6 +1729,9 @@ public TimelineEntity getApp( * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromid Defines the application id. If specified, retrieve the next + * set of earlier entities from specified id. The set also includes + * specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances representing apps is @@ -1749,7 +1761,8 @@ public TimelineEntity getApp( @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromid") String fromId) { String url = req.getRequestURI() + (req.getQueryString() == null ? "" : QUERY_STRING_SEP + req.getQueryString()); @@ -1772,7 +1785,7 @@ public TimelineEntity getApp( TimelineReaderWebServicesUtils.createTimelineEntityFilters( limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, null, - null), + fromId), TimelineReaderWebServicesUtils.createTimelineDataToRetrieve( confsToRetrieve, metricsToRetrieve, fields, metricsLimit)); } catch (Exception e) { @@ -1848,6 +1861,9 @@ public TimelineEntity getApp( * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromid Defines the application id. If specified, retrieve the next + * set of earlier entities from specified id. The set also includes + * specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances representing apps is @@ -1879,12 +1895,13 @@ public TimelineEntity getApp( @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromid") String fromId) { return getEntities(req, res, null, null, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId); } /** @@ -1948,6 +1965,9 @@ public TimelineEntity getApp( * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromid Defines the application id. If specified, retrieve the next + * set of earlier entities from specified id. The set also includes + * specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances representing apps is @@ -1981,12 +2001,13 @@ public TimelineEntity getApp( @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromid") String fromId) { return getEntities(req, res, clusterId, null, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, flowRunId, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId); } /** @@ -2047,6 +2068,9 @@ public TimelineEntity getApp( * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromid Defines the application id. If specified, retrieve the next + * set of earlier entities from specified id. The set also includes + * specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances representing apps is @@ -2077,12 +2101,13 @@ public TimelineEntity getApp( @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromid") String fromId) { return getEntities(req, res, null, null, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId); } /** @@ -2144,6 +2169,9 @@ public TimelineEntity getApp( * or has a value less than 1, and metrics have to be retrieved, then * metricsLimit will be considered as 1 i.e. latest single value of * metric(s) will be returned. (Optional query param). + * @param fromid Defines the application id. If specified, retrieve the next + * set of earlier entities from specified id. The set also includes + * specified fromId. * * @return If successful, a HTTP 200(OK) response having a JSON representing * a set of TimelineEntity instances representing apps is @@ -2175,12 +2203,13 @@ public TimelineEntity getApp( @QueryParam("confstoretrieve") String confsToRetrieve, @QueryParam("metricstoretrieve") String metricsToRetrieve, @QueryParam("fields") String fields, - @QueryParam("metricslimit") String metricsLimit) { + @QueryParam("metricslimit") String metricsLimit, + @QueryParam("fromid") String fromId) { return getEntities(req, res, clusterId, null, TimelineEntityType.YARN_APPLICATION.toString(), userId, flowName, null, limit, createdTimeStart, createdTimeEnd, relatesTo, isRelatedTo, infofilters, conffilters, metricfilters, eventfilters, - confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, null); + confsToRetrieve, metricsToRetrieve, fields, metricsLimit, null, fromId); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java index f61b0e9..0ad71dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java @@ -48,10 +48,11 @@ public ApplicationRowKeyPrefix(String clusterId, String userId, * @param userId identifying the user * @param flowName identifying the flow * @param flowRunId identifying the instance of this flow + * @param appId used in pagination mode. */ public ApplicationRowKeyPrefix(String clusterId, String userId, - String flowName, Long flowRunId) { - super(clusterId, userId, flowName, flowRunId, null); + String flowName, Long flowRunId, String appId) { + super(clusterId, userId, flowName, flowRunId, appId); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java index 23ebc66..43bd7f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java @@ -32,10 +32,11 @@ * @param clusterId identifying the cluster * @param userId identifying the user * @param flowName identifying the flow + * @param fromRunId used in pagination mode */ public FlowRunRowKeyPrefix(String clusterId, String userId, - String flowName) { - super(clusterId, userId, flowName, null); + String flowName, Long fromRunId) { + super(clusterId, userId, flowName, fromRunId); } /* diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index 1667f61..a5c666f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -48,7 +48,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; @@ -359,13 +361,44 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); TimelineReaderContext context = getContext(); + RowKeyPrefix applicationRowKeyPrefix = null; + // Whether or not flowRunID is null doesn't matter, the // ApplicationRowKeyPrefix will do the right thing. - RowKeyPrefix applicationRowKeyPrefix = - new ApplicationRowKeyPrefix(context.getClusterId(), - context.getUserId(), context.getFlowName(), - context.getFlowRunId()); - scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix()); + // default mode, will always scans from beginning of entity type. + if (getFilters() == null || getFilters().getFromId() == null) { + applicationRowKeyPrefix = new ApplicationRowKeyPrefix( + context.getClusterId(), context.getUserId(), context.getFlowName(), + context.getFlowRunId(), getFilters().getFromId()); + scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix()); + } else { + Long flowRunId = context.getFlowRunId(); + if (flowRunId == null) { + AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey( + context.getClusterId(), getFilters().getFromId()); + FlowContext flowContext = + lookupFlowContext(appToFlowRowKey, hbaseConf, conn); + flowRunId = flowContext.getFlowRunId(); + } + + applicationRowKeyPrefix = new ApplicationRowKeyPrefix( + context.getClusterId(), context.getUserId(), context.getFlowName(), + flowRunId, getFilters().getFromId()); + + // set start row + scan.setStartRow(applicationRowKeyPrefix.getRowKeyPrefix()); + + // get the bytes for stop row + applicationRowKeyPrefix = new ApplicationRowKeyPrefix( + context.getClusterId(), context.getUserId(), context.getFlowName(), + context.getFlowRunId(), null); + + // set stop row + scan.setStopRow( + HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( + applicationRowKeyPrefix.getRowKeyPrefix())); + } + FilterList newList = new FilterList(); newList.addFilter(new PageFilter(getFilters().getLimit())); if (filterList != null && !filterList.getFilters().isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index 9b8482c..e945450 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -42,7 +42,9 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; @@ -210,10 +212,30 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); TimelineReaderContext context = getContext(); - RowKeyPrefix flowRunRowKeyPrefix = - new FlowRunRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName()); - scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix()); + RowKeyPrefix flowRunRowKeyPrefix = null; + if (getFilters() == null || getFilters().getFromId() == null) { + flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), null); + scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix()); + } else { + + flowRunRowKeyPrefix = + new FlowRunRowKeyPrefix(context.getClusterId(), context.getUserId(), + context.getFlowName(), Long.parseLong(getFilters().getFromId())); + + // set start row + scan.setStartRow(flowRunRowKeyPrefix.getRowKeyPrefix()); + + // get the bytes for stop row + flowRunRowKeyPrefix = new FlowRunRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), null); + + // set stop row + scan.setStopRow( + HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( + flowRunRowKeyPrefix.getRowKeyPrefix())); + } + FilterList newList = new FilterList(); newList.addFilter(new PageFilter(getFilters().getLimit())); if (filterList != null && !filterList.getFilters().isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index f6904c5..b50b63b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -498,15 +498,16 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn, scan.setStopRow( HBaseTimelineStorageUtils.calculateTheClosestNextRowKeyForPrefix( entityRowKeyPrefix.getRowKeyPrefix())); - - // set page filter to limit. This filter has to set only in pagination - // mode. - filterList.addFilter(new PageFilter(getFilters().getLimit())); } + scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(getFilters().getLimit())); if (filterList != null && !filterList.getFilters().isEmpty()) { - scan.setFilter(filterList); + newList.addFilter(filterList); } + scan.setFilter(newList); + return getTable().getResultScanner(hbaseConf, conn, scan); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java index 7560f33..e98c5e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java @@ -93,7 +93,7 @@ public void testApplicationRowKey() { assertEquals(APPLICATION_ID, rowKey.getAppId()); byte[] byteRowKeyPrefix = - new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID) + new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, null) .getRowKeyPrefix(); byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix,