diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 0923105..c395f62 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -204,6 +204,72 @@ public void testWriteFlowRunMinMax() throws Exception { } /** + * Write 1 application entity and retrieve the flow activity record + * as a non-existent user. + */ + @Test + public void testGetUserFlowActivityWithWrongUser() throws Exception { + String cluster = "testWriteFlowActivityOneFlow_cluster1"; + String user = "testWriteFlowActivityOneFlow_user1"; + String flow = "flow_activity_test_flow_name"; + String flowVersion = "A122110F135BC4"; + long runid = 1001111178919L; + + TimelineEntities te = new TimelineEntities(); + long appCreatedTime = 1425016501000L; + TimelineEntity entityApp1 = + TestFlowDataGenerator.getFlowApp1(appCreatedTime); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(); + hbi.init(c1); + String appName = "application_1111999999_1234"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + if (hbi != null) { + hbi.close(); + } + } + // check flow activity + checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1, + appCreatedTime); + + // use the reader to verify the data + HBaseTimelineReaderImpl hbr = null; + try { + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + + // retrieve flow activities with a nonexistent user name + Set entities = hbr.getEntities( + new TimelineReaderContext(cluster, "nobody", flow, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), + new TimelineEntityFilters.Builder().entityLimit(10L).build(), + new TimelineDataToRetrieve()); + assertEquals("nobody should get no flow activity",0, entities.size()); + + // retrieve flow activities with a nonexistent user name + // that is a substring of the correct one + String userSubstring = user.substring(0, user.length() - 2); + entities = hbr.getEntities( + new TimelineReaderContext(cluster, userSubstring, flow, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null), + new TimelineEntityFilters.Builder().entityLimit(10L).build(), + new TimelineDataToRetrieve()); + assertEquals(userSubstring + " should get no flow activity", + 0, entities.size()); + } finally { + if (hbr != null) { + hbr.close(); + } + } + } + /** * Write 1 application entity and checks the record for today in the flow * activity table. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index b8a5dba..9017735 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -19,6 +19,7 @@ import java.util.List; +import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; @@ -27,6 +28,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import java.nio.charset.Charset; + /** * Represents a rowkey for the flow activity table. */ @@ -126,6 +129,19 @@ public static FlowActivityRowKey parseRowKeyFromString(String encodedRowKey) { } /** + * Get an instance of SubstringComparator for a given user id. + * @param userId the user id to match + * @return an instance of SubstringComparator + */ + public static SubstringComparator getSubstringComparatorForUserId( + final String userId) { + byte[] userIdSegment = + FlowActivityRowKeyConverter.encodeUserIdSegment(userId); + return new SubstringComparator( + new String(userIdSegment, Charset.forName("UTF-8"))); + } + + /** * Encodes and decodes row key for flow activity table. The row key is of the * form : clusterId!dayTimestamp!user!flowName. dayTimestamp(top of the day * timestamp) is a long and rest are strings. @@ -243,5 +259,18 @@ public FlowActivityRowKey decodeFromString(String encodedRowKey) { return new FlowActivityRowKey(split.get(0), dayTs, split.get(2), split.get(3)); } + + /** + * Return the encoded user id segment of a flow activity row key ( + * the encoded user id followed by the separator). + * @param userId the user id + * @return the encoded user id segment + */ + public static byte[] encodeUserIdSegment(final String userId) { + return Separator.QUALIFIERS.join( + Separator.encode(userId, Separator.SPACE, + Separator.TAB, Separator.QUALIFIERS), + new byte[0]); + } } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index a1cdb29..3fe8cea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -28,6 +28,9 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.filter.SubstringComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -143,10 +146,23 @@ protected ResultScanner getResults(Configuration hbaseConf, .getCreatedTimeBegin() <= 0 ? 0 : (getFilters().getCreatedTimeBegin() - 1))).getRowKeyPrefix()); } + + FilterList filters = new FilterList(); // use the page filter to limit the result to the page size // the scanner may still return more than the limit; therefore we need to // read the right number as we iterate - scan.setFilter(new PageFilter(getFilters().getLimit())); + filters.addFilter(new PageFilter(getFilters().getLimit())); + + final String userId = getContext().getUserId(); + if (userId != null && !userId.isEmpty()) { + // set a RowFilter if user id is provided + SubstringComparator userIdComparator = + FlowActivityRowKey.getSubstringComparatorForUserId(userId); + RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, userIdComparator); + filters.addFilter(rowFilter); + } + + scan.setFilter(filters); return getTable().getResultScanner(hbaseConf, conn, scan); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index 453f3b6..a634f01 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -1285,6 +1285,7 @@ public TimelineEntity getFlowRun( * * @param req Servlet request. * @param res Servlet response. + * @param userId the user who ran the flows (Optional path param) * @param limit If specified, defines the number of flows to return. The * maximum possible value for limit can be {@link Long#MAX_VALUE}. If it * is not specified or has a value less than 0, then limit will be @@ -1323,10 +1324,11 @@ public TimelineEntity getFlowRun( public Set getFlows( @Context HttpServletRequest req, @Context HttpServletResponse res, + @QueryParam("userid") String userId, @QueryParam("limit") String limit, @QueryParam("daterange") String dateRange, @QueryParam("fromid") String fromId) { - return getFlows(req, res, null, limit, dateRange, fromId); + return getFlows(req, res, null, userId, limit, dateRange, fromId); } /** @@ -1336,6 +1338,7 @@ public TimelineEntity getFlowRun( * @param res Servlet response. * @param clusterId Cluster id to which the flows to be queried belong to( * Mandatory path param). + * @param userId the user who ran the flows (Optional path param) * @param limit If specified, defines the number of flows to return. The * maximum possible value for limit can be {@link Long#MAX_VALUE}. If it * is not specified or has a value less than 0, then limit will be @@ -1375,6 +1378,7 @@ public TimelineEntity getFlowRun( @Context HttpServletRequest req, @Context HttpServletResponse res, @PathParam("clusterid") String clusterId, + @QueryParam("userid") String userId, @QueryParam("limit") String limit, @QueryParam("daterange") String dateRange, @QueryParam("fromid") String fromId) { @@ -1397,7 +1401,7 @@ public TimelineEntity getFlowRun( null, null, null, null, null, null, fromId); entities = timelineReaderManager.getEntities( TimelineReaderWebServicesUtils.createTimelineReaderContext( - clusterId, null, null, null, null, + clusterId, userId, null, null, null, TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null), entityFilters, TimelineReaderWebServicesUtils. createTimelineDataToRetrieve(null, null, null, null)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md index 7435201..52a20db 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md @@ -500,6 +500,7 @@ predicates, an empty list will be returned. #### Query Parameters Supported: +1. `userid` - If specified, only the flows that the given user ran are returned. 1. `limit` - If specified, defines the number of flows to return. The maximum possible value for limit is maximum value of Long. If it is not specified or has a value less than 0, then limit will be considered as 100.