diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 2778f50..7ba0dca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -208,6 +208,61 @@ public void testWriteFlowRunMinMax() throws Exception { } /** + * Write 1 application entity and retrieve the flow activity record + * as a non-existent user. + */ + @Test + public void testGetUserFlowActivity() throws Exception { + String cluster = "testWriteFlowActivityOneFlow_cluster1"; + String user = "testWriteFlowActivityOneFlow_user1"; + String flow = "flow_activity_test_flow_name"; + String flowVersion = "A122110F135BC4"; + long runid = 1001111178919L; + + TimelineEntities te = new TimelineEntities(); + long appCreatedTime = 1425016501000L; + TimelineEntity entityApp1 = + TestFlowDataGenerator.getFlowApp1(appCreatedTime); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(); + hbi.init(c1); + String appName = "application_1111999999_1234"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + if (hbi != null) { + hbi.close(); + } + } + // check flow activity + checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1, + appCreatedTime); + + // use the reader to verify the data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + + Set entities = hbr.getEntities( + new TimelineReaderContext(cluster, "Nobody", flow, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), + new TimelineEntityFilters(10L, null, null, null, null, null, + null, null, null), + new TimelineDataToRetrieve()); + assertEquals(0, entities.size()); + } finally { + if (hbr != null) { + hbr.close(); + } + } + } + /** * Write 1 application entity and checks the record for today in the flow * activity table. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index 9ba5e38..f403cec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -26,7 +26,10 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.RegexStringComparator; +import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; @@ -122,10 +125,22 @@ protected ResultScanner getResults(Configuration hbaseConf, .getCreatedTimeBegin() <= 0 ? 0 : (getFilters().getCreatedTimeBegin() - 1))).getRowKeyPrefix()); } + + FilterList filters = new FilterList(); // use the page filter to limit the result to the page size // the scanner may still return more than the limit; therefore we need to // read the right number as we iterate - scan.setFilter(new PageFilter(getFilters().getLimit())); + filters.addFilter(new PageFilter(getFilters().getLimit())); + + final String userId = getContext().getUserId(); + if (userId != null && !userId.isEmpty()) { + // set a RowFilter if user id is provided + RowFilter rowFilter = new RowFilter( + CompareOp.EQUAL, new RegexStringComparator(userId)); + filters.addFilter(rowFilter); + } + + scan.setFilter(filters); return getTable().getResultScanner(hbaseConf, conn, scan); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index 139a1be..7a7677e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -1242,6 +1242,7 @@ public TimelineEntity getFlowRun( * * @param req Servlet request. * @param res Servlet response. + * @param userId the user who ran the flows (Optional path param) * @param limit If specified, defines the number of flows to return. The * maximum possible value for limit can be {@link Long#MAX_VALUE}. If it * is not specified or has a value less than 0, then limit will be @@ -1276,9 +1277,10 @@ public TimelineEntity getFlowRun( public Set getFlows( @Context HttpServletRequest req, @Context HttpServletResponse res, + @QueryParam("userid") String userId, @QueryParam("limit") String limit, @QueryParam("daterange") String dateRange) { - return getFlows(req, res, null, limit, dateRange); + return getFlows(req, res, null, userId, limit, dateRange); } /** @@ -1288,6 +1290,7 @@ public TimelineEntity getFlowRun( * @param res Servlet response. * @param clusterId Cluster id to which the flows to be queried belong to( * Mandatory path param). + * @param userId the user who ran the flows (Optional path param) * @param limit If specified, defines the number of flows to return. The * maximum possible value for limit can be {@link Long#MAX_VALUE}. If it * is not specified or has a value less than 0, then limit will be @@ -1323,6 +1326,7 @@ public TimelineEntity getFlowRun( @Context HttpServletRequest req, @Context HttpServletResponse res, @PathParam("clusterid") String clusterId, + @QueryParam("userid") String userId, @QueryParam("limit") String limit, @QueryParam("daterange") String dateRange) { String url = req.getRequestURI() + @@ -1345,7 +1349,7 @@ public TimelineEntity getFlowRun( entityFilters.setCreatedTimeEnd(range.dateEnd); entities = timelineReaderManager.getEntities( TimelineReaderWebServicesUtils.createTimelineReaderContext( - clusterId, null, null, null, null, + clusterId, userId, null, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), entityFilters, TimelineReaderWebServicesUtils. createTimelineDataToRetrieve(null, null, null, null));