diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index f9f1d1d..f6b657f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; @@ -153,27 +154,6 @@ private static void fillFields(TimelineEntity finalEntity, } } - private static boolean matchFilter(Object infoValue, Object filterValue) { - return infoValue.equals(filterValue); - } - - private static boolean matchFilters(Map entityInfo, - Map filters) { - if (entityInfo == null || entityInfo.isEmpty()) { - return false; - } - for (Map.Entry filter : filters.entrySet()) { - Object infoValue = entityInfo.get(filter.getKey()); - if (infoValue == null) { - return false; - } - if (!matchFilter(infoValue, filter.getValue())) { - return false; - } - } - return true; - } - private String getFlowRunPath(String userId, String clusterId, String flowId, Long flowRunId, String appId) throws IOException { @@ -207,36 +187,6 @@ private String getFlowRunPath(String userId, String clusterId, String flowId, throw new IOException("Unable to get flow info"); } - private static boolean matchMetricFilters(Set metrics, - Set metricFilters) { - Set tempMetrics = new HashSet(); - for (TimelineMetric metric : metrics) { - tempMetrics.add(metric.getId()); - } - - for (String metricFilter : metricFilters) { - if (!tempMetrics.contains(metricFilter)) { - return false; - } - } - return true; - } - - private static boolean matchEventFilters(Set entityEvents, - Set eventFilters) { - Set tempEvents = new HashSet(); - for (TimelineEvent event : entityEvents) { - tempEvents.add(event.getId()); - } - - for (String eventFilter : eventFilters) { - if (!tempEvents.contains(eventFilter)) { - return false; - } - } - return true; - } - private static TimelineEntity createEntityToBeReturned(TimelineEntity entity, EnumSet fieldsToRetrieve) { TimelineEntity entityToBeReturned = new TimelineEntity(); @@ -254,23 +204,6 @@ private static boolean isTimeInRange(Long time, Long timeBegin, return (time >= timeBegin) && (time <= timeEnd); } - private static boolean matchRelations( - Map> entityRelations, - Map> relations) { - for (Map.Entry> relation : relations.entrySet()) { - Set ids = entityRelations.get(relation.getKey()); - if (ids == null) { - return false; - } - for (String id : relation.getValue()) { - if (!ids.contains(id)) { - return false; - } - } - } - return true; - } - private static void mergeEntities(TimelineEntity entity1, TimelineEntity entity2) { // Ideally created time wont change except in the case of issue from client. @@ -393,27 +326,32 @@ public int compare(Long l1, Long l2) { continue; } if (relatesTo != null && !relatesTo.isEmpty() && - !matchRelations(entity.getRelatesToEntities(), relatesTo)) { + !TimelineReaderUtils + .matchRelations(entity.getRelatesToEntities(), relatesTo)) { continue; } if (isRelatedTo != null && !isRelatedTo.isEmpty() && - !matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) { + !TimelineReaderUtils + .matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) { continue; } if (infoFilters != null && !infoFilters.isEmpty() && - !matchFilters(entity.getInfo(), infoFilters)) { + !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { continue; } if (configFilters != null && !configFilters.isEmpty() && - !matchFilters(entity.getConfigs(), configFilters)) { + !TimelineReaderUtils.matchFilters( + entity.getConfigs(), configFilters)) { continue; } if (metricFilters != null && !metricFilters.isEmpty() && - !matchMetricFilters(entity.getMetrics(), metricFilters)) { + !TimelineReaderUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { continue; } if (eventFilters != null && !eventFilters.isEmpty() && - !matchEventFilters(entity.getEvents(), eventFilters)) { + !TimelineReaderUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { continue; } TimelineEntity entityToBeReturned = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java new file mode 100644 index 0000000..5925bbc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +public class HBaseTimelineReaderImpl + extends AbstractService implements TimelineReader { + + private static final Log LOG = LogFactory + .getLog(HBaseTimelineReaderImpl.class); + + private Configuration hbaseConf = null; + private Connection conn; + private EntityTable entityTable; + + public HBaseTimelineReaderImpl() { + super(HBaseTimelineReaderImpl.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + hbaseConf = HBaseConfiguration.create(conf); + conn = ConnectionFactory.createConnection(hbaseConf); + entityTable = new EntityTable(); + } + + @Override + protected void serviceStop() throws Exception { + if (conn != null) { + LOG.info("closing the hbase Connection"); + conn.close(); + } + super.serviceStop(); + } + + @Override + public TimelineEntity getEntity(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) + throws IOException { + validateParams(userId, clusterId, appId, entityType, entityId, true); + // In reality both should be null or neither should be null + if (flowId == null || flowRunId == null) { + FlowContext context = lookupFlowContext(clusterId, appId); + flowId = context.flowId; + flowRunId = context.flowRunId; + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + + byte[] rowKey = EntityRowKey.getRowKey( + clusterId, userId, flowId, flowRunId, appId, entityType, entityId); + Get get = new Get(rowKey); + return getEntity( + entityTable.getResult(hbaseConf, conn, get), fieldsToRetrieve, + false, 0L, 0L, false, 0L, 0L, null, null, null, null, null, null); + } + + @Override + public Set getEntities(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) throws IOException { + validateParams(userId, clusterId, appId, entityType, null, false); + // In reality both should be null or neither should be null + if (flowId == null || flowRunId == null) { + FlowContext context = lookupFlowContext(clusterId, appId); + flowId = context.flowId; + flowRunId = context.flowRunId; + } + if (limit == null) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = 0L; + } + if (createdTimeEnd == null) { + createdTimeEnd = Long.MAX_VALUE; + } + if (modifiedTimeBegin == null) { + modifiedTimeBegin = 0L; + } + if (modifiedTimeEnd == null) { + modifiedTimeEnd = Long.MAX_VALUE; + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + + NavigableSet entities = new TreeSet<>(); + // Scan through part of the table to find the entities belong to one app and + // one type + Scan scan = new Scan(); + scan.setStartRow(EntityRowKey.getRowKeyPrefix( + userId, clusterId, flowId, flowRunId, appId, entityType)); + scan.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = entityTable.getResultScanner(hbaseConf, conn, scan); + for (Result result : scanner) { + TimelineEntity entity = getEntity(result, fieldsToRetrieve, + true, createdTimeBegin, createdTimeEnd, + true, modifiedTimeBegin, modifiedTimeEnd, + isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters, + metricFilters); + if (entity == null) { + continue; + } + if (entities.size() > limit) { + entities.pollLast(); + } + entities.add(entity); + } + return null; + } + + private FlowContext lookupFlowContext(String clusterId, String appId) { + return null; + } + + private static class FlowContext { + private String flowId; + private Long flowRunId; + public FlowContext(String flowId, Long flowRunId) { + this.flowId = flowId; + this.flowRunId = flowRunId; + } + } + + private static void validateParams(String userId, String clusterId, + String appId, String entityType, String entityId, boolean checkEntityId) { + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(appId, "appId shouldn't be null"); + Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + if (checkEntityId) { + Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); + } + } + + private static TimelineEntity getEntity( + Result result, EnumSet fieldsToRetrieve, + boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd, + boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd, + Map> isRelatedTo, Map> relatesTo, + Map infoFilters, Map configFilters, + Set eventFilters, Set metricFilters) + throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + entity.setType(EntityColumn.TYPE.readResult(result).toString()); + entity.setId(EntityColumn.ID.readResult(result).toString()); + + // fetch created time + entity.setCreatedTime( + ((Number) EntityColumn.CREATED_TIME.readResult(result)).longValue()); + if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin || + entity.getCreatedTime() > createdTimeEnd)) { + return null; + } + + // fetch modified time + entity.setCreatedTime( + ((Number) EntityColumn.MODIFIED_TIME.readResult(result)).longValue()); + if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin || + entity.getModifiedTime() > modifiedTimeEnd)) { + return null; + } + + // fetch is related to entities + boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { + readConnections(entity, result, EntityColumnPrefix.IS_RELATED_TO); + if (!TimelineReaderUtils.matchRelations( + entity.getIsRelatedToEntities(), isRelatedTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities + boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { + readConnections(entity, result, EntityColumnPrefix.RELATES_TO); + if (!TimelineReaderUtils.matchRelations( + entity.getRelatesToEntities(), relatesTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info + boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.INFO) || checkInfo) { + // TODO + if (!TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.INFO)) { + entity.getInfo().clear(); + } + } + + // fetch configs + boolean checkConfigs = configFilters != null && configFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { + readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG); + if (!TimelineReaderUtils.matchFilters( + entity.getConfigs(), configFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.CONFIGS)) { + entity.getConfigs().clear(); + } + } + + // fetch events + boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { + readEvents(entity, result); + if (!TimelineReaderUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics + boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + readMetrics(entity, result); + if (!TimelineReaderUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.METRICS)) { + entity.getMetrics().clear(); + } + } + return entity; + } + + private static void readConnections( + TimelineEntity entity, Result result, EntityColumnPrefix prefix) + throws IOException { + // isRelatedTo and relatesTo are of type Map> + Map columns = prefix.readResults(result); + for (Map.Entry column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded( + column.getValue().toString())) { + if (prefix.equals(EntityColumnPrefix.IS_RELATED_TO)) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + private static void readKeyValuePairs( + TimelineEntity entity, Result result, EntityColumnPrefix prefix) + throws IOException { + // info and configuration are of type Map + Map columns = prefix.readResults(result); + if (prefix.equals(EntityColumnPrefix.CONFIG)) { + for (Map.Entry column : columns.entrySet()) { + entity.addConfig(column.getKey(), column.getKey().toString()); + } + } else { + entity.addInfo(columns); + } + } + + private static void readEvents(TimelineEntity entity, Result result) + throws IOException { + // TODO + } + + private static void readMetrics(TimelineEntity entity, Result result) + throws IOException { + NavigableMap> metricsResult = + EntityColumnPrefix.METRIC.readTimeseriesResults(result); + for (Map.Entry> metricResult: + metricsResult.entrySet()) { + TimelineMetric metric = new TimelineMetric(); + metric.setId(metricResult.getKey()); + // Simply assume that if the value set contains more than 1 elements, the + // metric is a TIME_SERIES metric, otherwise, it's a TIME_SERIES metric + metric.setType(metricResult.getValue().size() > 1 ? + TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.TIME_SERIES); + metric.addValues(metricResult.getValue()); + entity.addMetric(metric); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index e48ca60..b6a4bd8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -96,7 +96,7 @@ public TimelineWriteResponse write(String clusterId, String userId, byte[] rowKey = EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId, - te); + te.getType(), te.getId()); storeInfo(rowKey, te, flowVersion); storeEvents(rowKey, te.getEvents()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java index e8d8b5c..abba79a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -94,6 +96,20 @@ public ResultScanner getResultScanner(Configuration hbaseConf, } /** + * + * @param hbaseConf used to read settings that override defaults + * @param conn used to create table from + * @param get that specifies what single row you want to get from this table + * @return result of get operation + * @throws IOException + */ + public Result getResult(Configuration hbaseConf, Connection conn, Get get) + throws IOException { + Table table = conn.getTable(getTableName(hbaseConf)); + return table.get(get); + } + + /** * Get the table name for this table. * * @param hbaseConf diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java new file mode 100644 index 0000000..809aa00 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + + +public class TimelineReaderUtils { + /** + * + * @param entityRelations the relations of an entity + * @param relationFilters the relations for filtering + * @return + */ + public static boolean matchRelations( + Map> entityRelations, + Map> relationFilters) { + for (Map.Entry> relation : relationFilters.entrySet()) { + Set ids = entityRelations.get(relation.getKey()); + if (ids == null) { + return false; + } + for (String id : relation.getValue()) { + if (!ids.contains(id)) { + return false; + } + } + } + return true; + } + + /** + * + * @param map the map of key/value pairs in an entity + * @param filters the map of key/value pairs for filtering + * @return + */ + public static boolean matchFilters(Map map, + Map filters) { + for (Map.Entry filter : filters.entrySet()) { + Object value = map.get(filter.getKey()); + if (value == null) { + return false; + } + if (!value.equals(filter.getValue())) { + return false; + } + } + return true; + } + + /** + * + * @param entityEvents the set of event objects in an entity + * @param eventFilters the set of event Ids for filtering + * @return + */ + public static boolean matchEventFilters(Set entityEvents, + Set eventFilters) { + Set eventIds = new HashSet(); + for (TimelineEvent event : entityEvents) { + eventIds.add(event.getId()); + } + for (String eventFilter : eventFilters) { + if (!eventIds.contains(eventFilter)) { + return false; + } + } + return true; + } + + /** + * + * @param metrics the set of metric objects in an entity + * @param metricFilters the set of metric Ids for filtering + * @return + */ + public static boolean matchMetricFilters(Set metrics, + Set metricFilters) { + Set metricIds = new HashSet(); + for (TimelineMetric metric : metrics) { + metricIds.add(metric.getId()); + } + + for (String metricFilter : metricFilters) { + if (!metricIds.contains(metricFilter)) { + return false; + } + } + return true; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 61958c2..75b68a5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -54,17 +54,45 @@ /** * Constructs a row key prefix for the entity table as follows: - * {@code userName!clusterId!flowId!flowRunId!AppId} + * {@code userName!clusterId!flowId!flowRunId!AppId!entityType} * * @param clusterId * @param userId * @param flowId * @param flowRunId * @param appId + * @param entityType * @return byte array with the row key prefix */ + public static byte[] getRowKeyPrefix(String clusterId, String userId, + String flowId, Long flowRunId, String appId, String entityType) { + byte[] first = + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId, + flowId)); + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId)); + byte[] third = + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType)); + return Separator.QUALIFIERS.join(first, second, third); + } + + /** + * Constructs a row key for the entity table as follows: + * {@code userName!clusterId!flowId!flowRunId!AppId!entityType!entityId} + * + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @param appId + * @param entityType + * @param entityId + * @return byte array with the row key + */ public static byte[] getRowKey(String clusterId, String userId, - String flowId, Long flowRunId, String appId, TimelineEntity te) { + String flowId, Long flowRunId, String appId, String entityType, + String entityId) { byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId, flowId)); @@ -72,8 +100,8 @@ // time. byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId)); byte[] third = - Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(), - te.getId())); + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, + entityId)); return Separator.QUALIFIERS.join(first, second, third); }