diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java deleted file mode 100644 index d812a6c..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java +++ /dev/null @@ -1,382 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FamilyFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for application entities that are stored in the - * application table. - */ -class ApplicationEntityReader extends GenericEntityReader { - private static final ApplicationTable APPLICATION_TABLE = - new ApplicationTable(); - - public ApplicationEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map> relatesTo, Map> isRelatedTo, - Map infoFilters, Map configFilters, - Set metricFilters, Set eventFilters, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, - relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve, - true); - } - - public ApplicationEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, TimelineFilterList confsToRetrieve, - TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, - confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); - } - - /** - * Uses the {@link ApplicationTable}. - */ - protected BaseTable getTable() { - return APPLICATION_TABLE; - } - - @Override - protected FilterList constructFilterListBasedOnFields() { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - // Fetch all the columns. - if (fieldsToRetrieve.contains(Field.ALL) && - (confsToRetrieve == null || - confsToRetrieve.getFilterList().isEmpty()) && - (metricsToRetrieve == null || - metricsToRetrieve.getFilterList().isEmpty())) { - return list; - } - FilterList infoColFamilyList = new FilterList(); - // By default fetch everything in INFO column family. - FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(ApplicationColumnFamily.INFO.getBytes())); - infoColFamilyList.addFilter(infoColumnFamily); - // Events not required. - if (!fieldsToRetrieve.contains(Field.EVENTS) && - !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - ApplicationColumnPrefix.EVENT.getColumnPrefixBytes("")))); - } - // info not required. - if (!fieldsToRetrieve.contains(Field.INFO) && - !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - ApplicationColumnPrefix.INFO.getColumnPrefixBytes("")))); - } - // is releated to not required. - if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && - !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); - } - // relates to not required. - if (!fieldsToRetrieve.contains(Field.RELATES_TO) && - !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); - } - list.addFilter(infoColFamilyList); - if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || - (confsToRetrieve != null && - !confsToRetrieve.getFilterList().isEmpty())) { - FilterList filterCfg = - new FilterList(new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes()))); - if (confsToRetrieve != null && - !confsToRetrieve.getFilterList().isEmpty()) { - filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.CONFIG, confsToRetrieve)); - } - list.addFilter(filterCfg); - } - if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || - (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty())) { - FilterList filterMetrics = - new FilterList(new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes()))); - if (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.METRIC, metricsToRetrieve)); - } - list.addFilter(filterMetrics); - } - return list; - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - byte[] rowKey = - ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId, - appId); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - if (filterList != null && !filterList.getFilters().isEmpty()) { - get.setFilter(filterList); - } - return table.getResult(hbaseConf, conn, get); - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); - if (singleEntityRead) { - Preconditions.checkNotNull(appId, "appId shouldn't be null"); - } else { - Preconditions.checkNotNull(userId, "userId shouldn't be null"); - Preconditions.checkNotNull(flowName, "flowName shouldn't be null"); - } - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - if (singleEntityRead) { - if (flowName == null || flowRunId == null || userId == null) { - FlowContext context = - lookupFlowContext(clusterId, appId, hbaseConf, conn); - flowName = context.flowName; - flowRunId = context.flowRunId; - userId = context.userId; - } - } - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - if (!fieldsToRetrieve.contains(Field.CONFIGS) && - confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.CONFIGS); - } - if (!fieldsToRetrieve.contains(Field.METRICS) && - metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.METRICS); - } - if (!singleEntityRead) { - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } - if (modifiedTimeBegin == null) { - modifiedTimeBegin = DEFAULT_BEGIN_TIME; - } - if (modifiedTimeEnd == null) { - modifiedTimeEnd = DEFAULT_END_TIME; - } - } - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { - Scan scan = new Scan(); - if (flowRunId != null) { - scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(clusterId, userId, flowName, flowRunId)); - } else { - scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(clusterId, userId, flowName)); - } - FilterList newList = new FilterList(); - newList.addFilter(new PageFilter(limit)); - if (filterList != null && !filterList.getFilters().isEmpty()) { - newList.addFilter(filterList); - } - scan.setFilter(newList); - return table.getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - if (result == null || result.isEmpty()) { - return null; - } - TimelineEntity entity = new TimelineEntity(); - entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); - String entityId = ApplicationColumn.ID.readResult(result).toString(); - entity.setId(entityId); - - // fetch created time - Number createdTime = - (Number)ApplicationColumn.CREATED_TIME.readResult(result); - entity.setCreatedTime(createdTime.longValue()); - if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || - entity.getCreatedTime() > createdTimeEnd)) { - return null; - } - - // fetch modified time - Number modifiedTime = - (Number)ApplicationColumn.MODIFIED_TIME.readResult(result); - entity.setModifiedTime(modifiedTime.longValue()); - if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || - entity.getModifiedTime() > modifiedTimeEnd)) { - return null; - } - - // fetch is related to entities - boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { - readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, - true); - if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( - entity.getIsRelatedToEntities(), isRelatedTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { - entity.getIsRelatedToEntities().clear(); - } - } - - // fetch relates to entities - boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { - readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, - false); - if (checkRelatesTo && !TimelineStorageUtils.matchRelations( - entity.getRelatesToEntities(), relatesTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.RELATES_TO)) { - entity.getRelatesToEntities().clear(); - } - } - - // fetch info - boolean checkInfo = infoFilters != null && infoFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.INFO) || checkInfo) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); - if (checkInfo && - !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.INFO)) { - entity.getInfo().clear(); - } - } - - // fetch configs - boolean checkConfigs = configFilters != null && configFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); - if (checkConfigs && !TimelineStorageUtils.matchFilters( - entity.getConfigs(), configFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.CONFIGS)) { - entity.getConfigs().clear(); - } - } - - // fetch events - boolean checkEvents = eventFilters != null && eventFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { - readEvents(entity, result, true); - if (checkEvents && !TimelineStorageUtils.matchEventFilters( - entity.getEvents(), eventFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.EVENTS)) { - entity.getEvents().clear(); - } - } - - // fetch metrics - boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { - readMetrics(entity, result, ApplicationColumnPrefix.METRIC); - if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( - entity.getMetrics(), metricFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.METRICS)) { - entity.getMetrics().clear(); - } - } - return entity; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java deleted file mode 100644 index 7e8d4ba..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for flow activity entities that are stored in the - * flow activity table. - */ -class FlowActivityEntityReader extends TimelineEntityReader { - private static final FlowActivityTable FLOW_ACTIVITY_TABLE = - new FlowActivityTable(); - - public FlowActivityEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map> relatesTo, Map> isRelatedTo, - Map infoFilters, Map configFilters, - Set metricFilters, Set eventFilters, - EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, - relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, null, null, fieldsToRetrieve, true); - } - - public FlowActivityEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, - null, null, fieldsToRetrieve); - } - - /** - * Uses the {@link FlowActivityTable}. - */ - @Override - protected BaseTable getTable() { - return FLOW_ACTIVITY_TABLE; - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } - } - - @Override - protected FilterList constructFilterListBasedOnFields() { - return null; - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - throw new UnsupportedOperationException( - "we don't support a single entity query"); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { - Scan scan = new Scan(); - if (createdTimeBegin == DEFAULT_BEGIN_TIME && - createdTimeEnd == DEFAULT_END_TIME) { - scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); - } else { - scan.setStartRow( - FlowActivityRowKey.getRowKeyPrefix(clusterId, createdTimeEnd)); - scan.setStopRow( - FlowActivityRowKey.getRowKeyPrefix(clusterId, - (createdTimeBegin <= 0 ? 0: (createdTimeBegin - 1)))); - } - // use the page filter to limit the result to the page size - // the scanner may still return more than the limit; therefore we need to - // read the right number as we iterate - scan.setFilter(new PageFilter(limit)); - return table.getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow()); - - long time = rowKey.getDayTimestamp(); - String user = rowKey.getUserId(); - String flowName = rowKey.getFlowName(); - - FlowActivityEntity flowActivity = - new FlowActivityEntity(clusterId, time, user, flowName); - // set the id - flowActivity.setId(flowActivity.getId()); - // get the list of run ids along with the version that are associated with - // this flow on this day - Map runIdsMap = - FlowActivityColumnPrefix.RUN_ID.readResults(result); - for (Map.Entry e : runIdsMap.entrySet()) { - Long runId = Long.valueOf(e.getKey()); - String version = (String)e.getValue(); - FlowRunEntity flowRun = new FlowRunEntity(); - flowRun.setUser(user); - flowRun.setName(flowName); - flowRun.setRunId(runId); - flowRun.setVersion(version); - // set the id - flowRun.setId(flowRun.getId()); - flowActivity.addFlowRun(flowRun); - } - - return flowActivity; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java deleted file mode 100644 index c9076ee..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java +++ /dev/null @@ -1,225 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; -import org.apache.hadoop.hbase.filter.FamilyFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for flow run entities that are stored in the flow run - * table. - */ -class FlowRunEntityReader extends TimelineEntityReader { - private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable(); - - public FlowRunEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map> relatesTo, Map> isRelatedTo, - Map infoFilters, Map configFilters, - Set metricFilters, Set eventFilters, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, - relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true); - } - - public FlowRunEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, TimelineFilterList confsToRetrieve, - TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, - null, metricsToRetrieve, fieldsToRetrieve); - } - - /** - * Uses the {@link FlowRunTable}. - */ - @Override - protected BaseTable getTable() { - return FLOW_RUN_TABLE; - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(userId, "userId shouldn't be null"); - Preconditions.checkNotNull(flowName, "flowName shouldn't be null"); - if (singleEntityRead) { - Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null"); - } - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) { - if (!singleEntityRead) { - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } - if (!fieldsToRetrieve.contains(Field.METRICS) && - metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.METRICS); - } - } - } - - @Override - protected FilterList constructFilterListBasedOnFields() { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - - // By default fetch everything in INFO column family. - FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(FlowRunColumnFamily.INFO.getBytes())); - // Metrics not required. - if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) && - !fieldsToRetrieve.contains(Field.ALL)) { - FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); - infoColFamilyList.addFilter(infoColumnFamily); - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes("")))); - list.addFilter(infoColFamilyList); - } - if (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - FilterList infoColFamilyList = new FilterList(); - infoColFamilyList.addFilter(infoColumnFamily); - infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList( - FlowRunColumnPrefix.METRIC, metricsToRetrieve)); - list.addFilter(infoColFamilyList); - } - return list; - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - byte[] rowKey = - FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - if (filterList != null && !filterList.getFilters().isEmpty()) { - get.setFilter(filterList); - } - return table.getResult(hbaseConf, conn, get); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { - Scan scan = new Scan(); - scan.setRowPrefixFilter( - FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowName)); - FilterList newList = new FilterList(); - newList.addFilter(new PageFilter(limit)); - if (filterList != null && !filterList.getFilters().isEmpty()) { - newList.addFilter(filterList); - } - scan.setFilter(newList); - return table.getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - FlowRunEntity flowRun = new FlowRunEntity(); - flowRun.setUser(userId); - flowRun.setName(flowName); - if (singleEntityRead) { - flowRun.setRunId(flowRunId); - } else { - FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow()); - flowRun.setRunId(rowKey.getFlowRunId()); - } - - // read the start time - Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result); - if (startTime != null) { - flowRun.setStartTime(startTime.longValue()); - } - if (!singleEntityRead && (flowRun.getStartTime() < createdTimeBegin || - flowRun.getStartTime() > createdTimeEnd)) { - return null; - } - - // read the end time if available - Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result); - if (endTime != null) { - flowRun.setMaxEndTime(endTime.longValue()); - } - - // read the flow version - String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result); - if (version != null) { - flowRun.setVersion(version); - } - - // read metrics - if (singleEntityRead || fieldsToRetrieve.contains(Field.METRICS)) { - readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); - } - - // set the id - flowRun.setId(flowRun.getId()); - return flowRun; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java deleted file mode 100644 index 784dfd5..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java +++ /dev/null @@ -1,496 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; -import org.apache.hadoop.hbase.filter.FamilyFilter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for generic entities that are stored in the entity - * table. - */ -class GenericEntityReader extends TimelineEntityReader { - private static final EntityTable ENTITY_TABLE = new EntityTable(); - private static final Log LOG = LogFactory.getLog(GenericEntityReader.class); - - /** - * Used to look up the flow context. - */ - private final AppToFlowTable appToFlowTable = new AppToFlowTable(); - - public GenericEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map> relatesTo, Map> isRelatedTo, - Map infoFilters, Map configFilters, - Set metricFilters, Set eventFilters, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet fieldsToRetrieve, boolean sortedKeys) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, - relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve, - sortedKeys); - } - - public GenericEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, TimelineFilterList confsToRetrieve, - TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, - confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); - } - - /** - * Uses the {@link EntityTable}. - */ - protected BaseTable getTable() { - return ENTITY_TABLE; - } - - @Override - protected FilterList constructFilterListBasedOnFields() { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - // Fetch all the columns. - if (fieldsToRetrieve.contains(Field.ALL) && - (confsToRetrieve == null || - confsToRetrieve.getFilterList().isEmpty()) && - (metricsToRetrieve == null || - metricsToRetrieve.getFilterList().isEmpty())) { - return list; - } - FilterList infoColFamilyList = new FilterList(); - // By default fetch everything in INFO column family. - FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(EntityColumnFamily.INFO.getBytes())); - infoColFamilyList.addFilter(infoColumnFamily); - // Events not required. - if (!fieldsToRetrieve.contains(Field.EVENTS) && - !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - EntityColumnPrefix.EVENT.getColumnPrefixBytes("")))); - } - // info not required. - if (!fieldsToRetrieve.contains(Field.INFO) && - !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - EntityColumnPrefix.INFO.getColumnPrefixBytes("")))); - } - // is related to not required. - if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && - !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); - } - // relates to not required. - if (!fieldsToRetrieve.contains(Field.RELATES_TO) && - !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); - } - list.addFilter(infoColFamilyList); - if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || - (confsToRetrieve != null && - !confsToRetrieve.getFilterList().isEmpty())) { - FilterList filterCfg = - new FilterList(new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes()))); - if (confsToRetrieve != null && - !confsToRetrieve.getFilterList().isEmpty()) { - filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.CONFIG, confsToRetrieve)); - } - list.addFilter(filterCfg); - } - if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || - (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty())) { - FilterList filterMetrics = - new FilterList(new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(EntityColumnFamily.METRICS.getBytes()))); - if (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.METRIC, metricsToRetrieve)); - } - list.addFilter(filterMetrics); - } - return list; - } - - protected FlowContext lookupFlowContext(String clusterId, String appId, - Configuration hbaseConf, Connection conn) throws IOException { - byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); - Get get = new Get(rowKey); - Result result = appToFlowTable.getResult(hbaseConf, conn, get); - if (result != null && !result.isEmpty()) { - return new FlowContext( - AppToFlowColumn.USER_ID.readResult(result).toString(), - AppToFlowColumn.FLOW_ID.readResult(result).toString(), - ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); - } else { - throw new IOException( - "Unable to find the context flow ID and flow run ID for clusterId=" + - clusterId + ", appId=" + appId); - } - } - - protected static class FlowContext { - protected final String userId; - protected final String flowName; - protected final Long flowRunId; - public FlowContext(String user, String flowName, Long flowRunId) { - this.userId = user; - this.flowName = flowName; - this.flowRunId = flowRunId; - } - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(appId, "appId shouldn't be null"); - Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); - if (singleEntityRead) { - Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); - } - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - // In reality all three should be null or neither should be null - if (flowName == null || flowRunId == null || userId == null) { - FlowContext context = - lookupFlowContext(clusterId, appId, hbaseConf, conn); - flowName = context.flowName; - flowRunId = context.flowRunId; - userId = context.userId; - } - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - if (!fieldsToRetrieve.contains(Field.CONFIGS) && - confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.CONFIGS); - } - if (!fieldsToRetrieve.contains(Field.METRICS) && - metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { - fieldsToRetrieve.add(Field.METRICS); - } - if (!singleEntityRead) { - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } - if (modifiedTimeBegin == null) { - modifiedTimeBegin = DEFAULT_BEGIN_TIME; - } - if (modifiedTimeEnd == null) { - modifiedTimeEnd = DEFAULT_END_TIME; - } - } - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException { - byte[] rowKey = - EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId, - entityType, entityId); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - if (filterList != null && !filterList.getFilters().isEmpty()) { - get.setFilter(filterList); - } - return table.getResult(hbaseConf, conn, get); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { - // Scan through part of the table to find the entities belong to one app - // and one type - Scan scan = new Scan(); - scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( - clusterId, userId, flowName, flowRunId, appId, entityType)); - scan.setMaxVersions(Integer.MAX_VALUE); - if (filterList != null && !filterList.getFilters().isEmpty()) { - scan.setFilter(filterList); - } - return table.getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - if (result == null || result.isEmpty()) { - return null; - } - TimelineEntity entity = new TimelineEntity(); - String entityType = EntityColumn.TYPE.readResult(result).toString(); - entity.setType(entityType); - String entityId = EntityColumn.ID.readResult(result).toString(); - entity.setId(entityId); - - // fetch created time - Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result); - entity.setCreatedTime(createdTime.longValue()); - if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || - entity.getCreatedTime() > createdTimeEnd)) { - return null; - } - - // fetch modified time - Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result); - entity.setModifiedTime(modifiedTime.longValue()); - if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || - entity.getModifiedTime() > modifiedTimeEnd)) { - return null; - } - - // fetch is related to entities - boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { - readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); - if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( - entity.getIsRelatedToEntities(), isRelatedTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { - entity.getIsRelatedToEntities().clear(); - } - } - - // fetch relates to entities - boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { - readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); - if (checkRelatesTo && !TimelineStorageUtils.matchRelations( - entity.getRelatesToEntities(), relatesTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.RELATES_TO)) { - entity.getRelatesToEntities().clear(); - } - } - - // fetch info - boolean checkInfo = infoFilters != null && infoFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.INFO) || checkInfo) { - readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); - if (checkInfo && - !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.INFO)) { - entity.getInfo().clear(); - } - } - - // fetch configs - boolean checkConfigs = configFilters != null && configFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { - readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); - if (checkConfigs && !TimelineStorageUtils.matchFilters( - entity.getConfigs(), configFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.CONFIGS)) { - entity.getConfigs().clear(); - } - } - - // fetch events - boolean checkEvents = eventFilters != null && eventFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { - readEvents(entity, result, false); - if (checkEvents && !TimelineStorageUtils.matchEventFilters( - entity.getEvents(), eventFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.EVENTS)) { - entity.getEvents().clear(); - } - } - - // fetch metrics - boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { - readMetrics(entity, result, EntityColumnPrefix.METRIC); - if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( - entity.getMetrics(), metricFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.METRICS)) { - entity.getMetrics().clear(); - } - } - return entity; - } - - /** - * Helper method for reading relationship. - */ - protected void readRelationship( - TimelineEntity entity, Result result, ColumnPrefix prefix, - boolean isRelatedTo) throws IOException { - // isRelatedTo and relatesTo are of type Map> - Map columns = prefix.readResults(result); - for (Map.Entry column : columns.entrySet()) { - for (String id : Separator.VALUES.splitEncoded( - column.getValue().toString())) { - if (isRelatedTo) { - entity.addIsRelatedToEntity(column.getKey(), id); - } else { - entity.addRelatesToEntity(column.getKey(), id); - } - } - } - } - - /** - * Helper method for reading key-value pairs for either info or config. - */ - protected void readKeyValuePairs( - TimelineEntity entity, Result result, ColumnPrefix prefix, - boolean isConfig) throws IOException { - // info and configuration are of type Map - Map columns = prefix.readResults(result); - if (isConfig) { - for (Map.Entry column : columns.entrySet()) { - entity.addConfig(column.getKey(), column.getValue().toString()); - } - } else { - entity.addInfo(columns); - } - } - - /** - * Read events from the entity table or the application table. The column name - * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted - * if there is no info associated with the event. - * - * See {@link EntityTable} and {@link ApplicationTable} for a more detailed - * schema description. - */ - protected void readEvents(TimelineEntity entity, Result result, - boolean isApplication) throws IOException { - Map eventsMap = new HashMap<>(); - Map eventsResult = isApplication ? - ApplicationColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result) : - EntityColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result); - for (Map.Entry eventResult : eventsResult.entrySet()) { - byte[][] karr = (byte[][])eventResult.getKey(); - // the column name is of the form "eventId=timestamp=infoKey" - if (karr.length == 3) { - String id = Bytes.toString(karr[0]); - long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1])); - String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); - TimelineEvent event = eventsMap.get(key); - if (event == null) { - event = new TimelineEvent(); - event.setId(id); - event.setTimestamp(ts); - eventsMap.put(key, event); - } - // handle empty info - String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); - if (infoKey != null) { - event.addInfo(infoKey, eventResult.getValue()); - } - } else { - LOG.warn("incorrectly formatted column name: it will be discarded"); - continue; - } - } - Set eventsSet = new HashSet<>(eventsMap.values()); - entity.addEvents(eventsSet); - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index 96c5a19..bc48cbe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -32,6 +32,8 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory; public class HBaseTimelineReaderImpl extends AbstractService implements TimelineReader { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java deleted file mode 100644 index a26c0c2..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; - -/** - * The base class for reading and deserializing timeline entities from the - * HBase storage. Different types can be defined for different types of the - * entities that are being requested. - */ -abstract class TimelineEntityReader { - private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); - protected static final long DEFAULT_BEGIN_TIME = 0L; - protected static final long DEFAULT_END_TIME = Long.MAX_VALUE; - - protected final boolean singleEntityRead; - - protected String userId; - protected String clusterId; - protected String flowName; - protected Long flowRunId; - protected String appId; - protected String entityType; - protected EnumSet fieldsToRetrieve; - // used only for a single entity read mode - protected String entityId; - // used only for multiple entity read mode - protected Long limit; - protected Long createdTimeBegin; - protected Long createdTimeEnd; - protected Long modifiedTimeBegin; - protected Long modifiedTimeEnd; - protected Map> relatesTo; - protected Map> isRelatedTo; - protected Map infoFilters; - protected Map configFilters; - protected Set metricFilters; - protected Set eventFilters; - protected TimelineFilterList confsToRetrieve; - protected TimelineFilterList metricsToRetrieve; - - /** - * Main table the entity reader uses. - */ - protected BaseTable table; - - /** - * Specifies whether keys for this table are sorted in a manner where entities - * can be retrieved by created time. If true, it will be sufficient to collect - * the first results as specified by the limit. Otherwise all matched entities - * will be fetched and then limit applied. - */ - private boolean sortedKeys = false; - - /** - * Instantiates a reader for multiple-entity reads. - */ - protected TimelineEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map> relatesTo, Map> isRelatedTo, - Map infoFilters, Map configFilters, - Set metricFilters, Set eventFilters, - TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, - EnumSet fieldsToRetrieve, boolean sortedKeys) { - this.singleEntityRead = false; - this.sortedKeys = sortedKeys; - this.userId = userId; - this.clusterId = clusterId; - this.flowName = flowName; - this.flowRunId = flowRunId; - this.appId = appId; - this.entityType = entityType; - this.fieldsToRetrieve = fieldsToRetrieve; - this.limit = limit; - this.createdTimeBegin = createdTimeBegin; - this.createdTimeEnd = createdTimeEnd; - this.modifiedTimeBegin = modifiedTimeBegin; - this.modifiedTimeEnd = modifiedTimeEnd; - this.relatesTo = relatesTo; - this.isRelatedTo = isRelatedTo; - this.infoFilters = infoFilters; - this.configFilters = configFilters; - this.metricFilters = metricFilters; - this.eventFilters = eventFilters; - this.confsToRetrieve = confsToRetrieve; - this.metricsToRetrieve = metricsToRetrieve; - - this.table = getTable(); - } - - /** - * Instantiates a reader for single-entity reads. - */ - protected TimelineEntityReader(String userId, String clusterId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId, TimelineFilterList confsToRetrieve, - TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { - this.singleEntityRead = true; - this.userId = userId; - this.clusterId = clusterId; - this.flowName = flowName; - this.flowRunId = flowRunId; - this.appId = appId; - this.entityType = entityType; - this.fieldsToRetrieve = fieldsToRetrieve; - this.entityId = entityId; - this.confsToRetrieve = confsToRetrieve; - this.metricsToRetrieve = metricsToRetrieve; - - this.table = getTable(); - } - - /** - * Creates a {@link FilterList} based on fields, confs and metrics to - * retrieve. This filter list will be set in Scan/Get objects to trim down - * results fetched from HBase back-end storage. - * @return a {@link FilterList} object. - */ - protected abstract FilterList constructFilterListBasedOnFields(); - - /** - * Reads and deserializes a single timeline entity from the HBase storage. - */ - public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) - throws IOException { - validateParams(); - augmentParams(hbaseConf, conn); - - FilterList filterList = constructFilterListBasedOnFields(); - Result result = getResult(hbaseConf, conn, filterList); - if (result == null || result.isEmpty()) { - // Could not find a matching row. - LOG.info("Cannot find matching entity of type " + entityType); - return null; - } - return parseEntity(result); - } - - /** - * Reads and deserializes a set of timeline entities from the HBase storage. - * It goes through all the results available, and returns the number of - * entries as specified in the limit in the entity's natural sort order. - */ - public Set readEntities(Configuration hbaseConf, - Connection conn) throws IOException { - validateParams(); - augmentParams(hbaseConf, conn); - - NavigableSet entities = new TreeSet<>(); - FilterList filterList = constructFilterListBasedOnFields(); - ResultScanner results = getResults(hbaseConf, conn, filterList); - try { - for (Result result : results) { - TimelineEntity entity = parseEntity(result); - if (entity == null) { - continue; - } - entities.add(entity); - if (!sortedKeys) { - if (entities.size() > limit) { - entities.pollLast(); - } - } else { - if (entities.size() == limit) { - break; - } - } - } - return entities; - } finally { - results.close(); - } - } - - /** - * Returns the main table to be used by the entity reader. - */ - protected abstract BaseTable getTable(); - - /** - * Validates the required parameters to read the entities. - */ - protected abstract void validateParams(); - - /** - * Sets certain parameters to defaults if the values are not provided. - */ - protected abstract void augmentParams(Configuration hbaseConf, - Connection conn) throws IOException; - - /** - * Fetches a {@link Result} instance for a single-entity read. - * - * @return the {@link Result} instance or null if no such record is found. - */ - protected abstract Result getResult(Configuration hbaseConf, Connection conn, - FilterList filterList) throws IOException; - - /** - * Fetches a {@link ResultScanner} for a multi-entity read. - */ - protected abstract ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException; - - /** - * Given a {@link Result} instance, deserializes and creates a - * {@link TimelineEntity}. - * - * @return the {@link TimelineEntity} instance, or null if the {@link Result} - * is null or empty. - */ - protected abstract TimelineEntity parseEntity(Result result) - throws IOException; - - /** - * Helper method for reading and deserializing {@link TimelineMetric} objects - * using the specified column prefix. The timeline metrics then are added to - * the given timeline entity. - */ - protected void readMetrics(TimelineEntity entity, Result result, - ColumnPrefix columnPrefix) throws IOException { - NavigableMap> metricsResult = - columnPrefix.readResultsWithTimestamps(result); - for (Map.Entry> metricResult: - metricsResult.entrySet()) { - TimelineMetric metric = new TimelineMetric(); - metric.setId(metricResult.getKey()); - // Simply assume that if the value set contains more than 1 elements, the - // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric - metric.setType(metricResult.getValue().size() > 1 ? - TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE); - metric.addValues(metricResult.getValue()); - entity.addMetric(metric); - } - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java deleted file mode 100644 index 36ed4ca..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; - -/** - * Factory methods for instantiating a timeline entity reader. - */ -class TimelineEntityReaderFactory { - /** - * Creates a timeline entity reader instance for reading a single entity with - * the specified input. - */ - public static TimelineEntityReader createSingleEntityReader(String userId, - String clusterId, String flowName, Long flowRunId, String appId, - String entityType, String entityId, TimelineFilterList confs, - TimelineFilterList metrics, EnumSet fieldsToRetrieve) { - // currently the types that are handled separate from the generic entity - // table are application, flow run, and flow activity entities - if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { - return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, entityId, confs, metrics, fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { - return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, entityId, confs, metrics, fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { - return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); - } else { - // assume we're dealing with a generic entity read - return new GenericEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, entityId, confs, metrics, fieldsToRetrieve); - } - } - - /** - * Creates a timeline entity reader instance for reading set of entities with - * the specified input and predicates. - */ - public static TimelineEntityReader createMultipleEntitiesReader(String userId, - String clusterId, String flowName, Long flowRunId, String appId, - String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map> relatesTo, Map> isRelatedTo, - Map infoFilters, Map configFilters, - Set metricFilters, Set eventFilters, - TimelineFilterList confs, TimelineFilterList metrics, - EnumSet fieldsToRetrieve) { - // currently the types that are handled separate from the generic entity - // table are application, flow run, and flow activity entities - if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { - return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, - modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, confs, - metrics, fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { - return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, - modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { - return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, - modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, confs, - metrics, fieldsToRetrieve); - } else { - // assume we're dealing with a generic entity read - return new GenericEntityReader(userId, clusterId, flowName, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, - modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, confs, - metrics, fieldsToRetrieve, false); - } - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java new file mode 100644 index 0000000..181ec81 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for application entities that are stored in the + * application table. + */ +class ApplicationEntityReader extends GenericEntityReader { + private static final ApplicationTable APPLICATION_TABLE = + new ApplicationTable(); + + public ApplicationEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, + EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve, + true); + } + + public ApplicationEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, + confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); + } + + /** + * Uses the {@link ApplicationTable}. + */ + protected BaseTable getTable() { + return APPLICATION_TABLE; + } + + @Override + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + // Fetch all the columns. + if (fieldsToRetrieve.contains(Field.ALL) && + (confsToRetrieve == null || + confsToRetrieve.getFilterList().isEmpty()) && + (metricsToRetrieve == null || + metricsToRetrieve.getFilterList().isEmpty())) { + return list; + } + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + // Events not required. + if (!fieldsToRetrieve.contains(Field.EVENTS) && + !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.EVENT.getColumnPrefixBytes("")))); + } + // info not required. + if (!fieldsToRetrieve.contains(Field.INFO) && + !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.INFO.getColumnPrefixBytes("")))); + } + // is releated to not required. + if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && + !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); + } + // relates to not required. + if (!fieldsToRetrieve.contains(Field.RELATES_TO) && + !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + ApplicationColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); + } + list.addFilter(infoColFamilyList); + if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || + (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty())) { + FilterList filterCfg = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.CONFIGS.getBytes()))); + if (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty()) { + filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.CONFIG, confsToRetrieve)); + } + list.addFilter(filterCfg); + } + if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || + (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty())) { + FilterList filterMetrics = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(ApplicationColumnFamily.METRICS.getBytes()))); + if (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.METRIC, metricsToRetrieve)); + } + list.addFilter(filterMetrics); + } + return list; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + byte[] rowKey = + ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId, + appId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + if (singleEntityRead) { + Preconditions.checkNotNull(appId, "appId shouldn't be null"); + } else { + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(flowName, "flowName shouldn't be null"); + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + if (singleEntityRead) { + if (flowName == null || flowRunId == null || userId == null) { + FlowContext context = + lookupFlowContext(clusterId, appId, hbaseConf, conn); + flowName = context.flowName; + flowRunId = context.flowRunId; + userId = context.userId; + } + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + if (!fieldsToRetrieve.contains(Field.CONFIGS) && + confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.CONFIGS); + } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } + if (!singleEntityRead) { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + if (modifiedTimeBegin == null) { + modifiedTimeBegin = DEFAULT_BEGIN_TIME; + } + if (modifiedTimeEnd == null) { + modifiedTimeEnd = DEFAULT_END_TIME; + } + } + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException { + Scan scan = new Scan(); + if (flowRunId != null) { + scan.setRowPrefixFilter(ApplicationRowKey. + getRowKeyPrefix(clusterId, userId, flowName, flowRunId)); + } else { + scan.setRowPrefixFilter(ApplicationRowKey. + getRowKeyPrefix(clusterId, userId, flowName)); + } + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(limit)); + if (filterList != null && !filterList.getFilters().isEmpty()) { + newList.addFilter(filterList); + } + scan.setFilter(newList); + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); + String entityId = ApplicationColumn.ID.readResult(result).toString(); + entity.setId(entityId); + + // fetch created time + Number createdTime = + (Number)ApplicationColumn.CREATED_TIME.readResult(result); + entity.setCreatedTime(createdTime.longValue()); + if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || + entity.getCreatedTime() > createdTimeEnd)) { + return null; + } + + // fetch modified time + Number modifiedTime = + (Number)ApplicationColumn.MODIFIED_TIME.readResult(result); + entity.setModifiedTime(modifiedTime.longValue()); + if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || + entity.getModifiedTime() > modifiedTimeEnd)) { + return null; + } + + // fetch is related to entities + boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, + true); + if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( + entity.getIsRelatedToEntities(), isRelatedTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities + boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { + readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, + false); + if (checkRelatesTo && !TimelineStorageUtils.matchRelations( + entity.getRelatesToEntities(), relatesTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info + boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.INFO) || checkInfo) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); + if (checkInfo && + !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.INFO)) { + entity.getInfo().clear(); + } + } + + // fetch configs + boolean checkConfigs = configFilters != null && configFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); + if (checkConfigs && !TimelineStorageUtils.matchFilters( + entity.getConfigs(), configFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.CONFIGS)) { + entity.getConfigs().clear(); + } + } + + // fetch events + boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { + readEvents(entity, result, true); + if (checkEvents && !TimelineStorageUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics + boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + readMetrics(entity, result, ApplicationColumnPrefix.METRIC); + if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.METRICS)) { + entity.getMetrics().clear(); + } + } + return entity; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java new file mode 100644 index 0000000..52ceef8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for flow activity entities that are stored in the + * flow activity table. + */ +class FlowActivityEntityReader extends TimelineEntityReader { + private static final FlowActivityTable FLOW_ACTIVITY_TABLE = + new FlowActivityTable(); + + public FlowActivityEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, null, null, fieldsToRetrieve, true); + } + + public FlowActivityEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, + null, null, fieldsToRetrieve); + } + + /** + * Uses the {@link FlowActivityTable}. + */ + @Override + protected BaseTable getTable() { + return FLOW_ACTIVITY_TABLE; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + } + + @Override + protected FilterList constructFilterListBasedOnFields() { + return null; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + throw new UnsupportedOperationException( + "we don't support a single entity query"); + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException { + Scan scan = new Scan(); + if (createdTimeBegin == DEFAULT_BEGIN_TIME && + createdTimeEnd == DEFAULT_END_TIME) { + scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); + } else { + scan.setStartRow( + FlowActivityRowKey.getRowKeyPrefix(clusterId, createdTimeEnd)); + scan.setStopRow( + FlowActivityRowKey.getRowKeyPrefix(clusterId, + (createdTimeBegin <= 0 ? 0: (createdTimeBegin - 1)))); + } + // use the page filter to limit the result to the page size + // the scanner may still return more than the limit; therefore we need to + // read the right number as we iterate + scan.setFilter(new PageFilter(limit)); + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow()); + + long time = rowKey.getDayTimestamp(); + String user = rowKey.getUserId(); + String flowName = rowKey.getFlowName(); + + FlowActivityEntity flowActivity = + new FlowActivityEntity(clusterId, time, user, flowName); + // set the id + flowActivity.setId(flowActivity.getId()); + // get the list of run ids along with the version that are associated with + // this flow on this day + Map runIdsMap = + FlowActivityColumnPrefix.RUN_ID.readResults(result); + for (Map.Entry e : runIdsMap.entrySet()) { + Long runId = Long.valueOf(e.getKey()); + String version = (String)e.getValue(); + FlowRunEntity flowRun = new FlowRunEntity(); + flowRun.setUser(user); + flowRun.setName(flowName); + flowRun.setRunId(runId); + flowRun.setVersion(version); + // set the id + flowRun.setId(flowRun.getId()); + flowActivity.addFlowRun(flowRun); + } + + return flowActivity; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java new file mode 100644 index 0000000..6286ee1 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for flow run entities that are stored in the flow run + * table. + */ +class FlowRunEntityReader extends TimelineEntityReader { + private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable(); + + public FlowRunEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, + EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, null, metricsToRetrieve, fieldsToRetrieve, true); + } + + public FlowRunEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, + null, metricsToRetrieve, fieldsToRetrieve); + } + + /** + * Uses the {@link FlowRunTable}. + */ + @Override + protected BaseTable getTable() { + return FLOW_RUN_TABLE; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(flowName, "flowName shouldn't be null"); + if (singleEntityRead) { + Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null"); + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) { + if (!singleEntityRead) { + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } + } + } + + @Override + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(FlowRunColumnFamily.INFO.getBytes())); + // Metrics not required. + if (!singleEntityRead && !fieldsToRetrieve.contains(Field.METRICS) && + !fieldsToRetrieve.contains(Field.ALL)) { + FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); + infoColFamilyList.addFilter(infoColumnFamily); + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes("")))); + list.addFilter(infoColFamilyList); + } + if (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + FilterList infoColFamilyList = new FilterList(); + infoColFamilyList.addFilter(infoColumnFamily); + infoColFamilyList.addFilter(TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricsToRetrieve)); + list.addFilter(infoColFamilyList); + } + return list; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + byte[] rowKey = + FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException { + Scan scan = new Scan(); + scan.setRowPrefixFilter( + FlowRunRowKey.getRowKeyPrefix(clusterId, userId, flowName)); + FilterList newList = new FilterList(); + newList.addFilter(new PageFilter(limit)); + if (filterList != null && !filterList.getFilters().isEmpty()) { + newList.addFilter(filterList); + } + scan.setFilter(newList); + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + FlowRunEntity flowRun = new FlowRunEntity(); + flowRun.setUser(userId); + flowRun.setName(flowName); + if (singleEntityRead) { + flowRun.setRunId(flowRunId); + } else { + FlowRunRowKey rowKey = FlowRunRowKey.parseRowKey(result.getRow()); + flowRun.setRunId(rowKey.getFlowRunId()); + } + + // read the start time + Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result); + if (startTime != null) { + flowRun.setStartTime(startTime.longValue()); + } + if (!singleEntityRead && (flowRun.getStartTime() < createdTimeBegin || + flowRun.getStartTime() > createdTimeEnd)) { + return null; + } + + // read the end time if available + Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result); + if (endTime != null) { + flowRun.setMaxEndTime(endTime.longValue()); + } + + // read the flow version + String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result); + if (version != null) { + flowRun.setVersion(version); + } + + // read metrics + if (singleEntityRead || fieldsToRetrieve.contains(Field.METRICS)) { + readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); + } + + // set the id + flowRun.setId(flowRun.getId()); + return flowRun; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java new file mode 100644 index 0000000..f3f380c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -0,0 +1,497 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.FamilyFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.QualifierFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for generic entities that are stored in the entity + * table. + */ +class GenericEntityReader extends TimelineEntityReader { + private static final EntityTable ENTITY_TABLE = new EntityTable(); + private static final Log LOG = LogFactory.getLog(GenericEntityReader.class); + + /** + * Used to look up the flow context. + */ + private final AppToFlowTable appToFlowTable = new AppToFlowTable(); + + public GenericEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, + EnumSet fieldsToRetrieve, boolean sortedKeys) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, confsToRetrieve, metricsToRetrieve, fieldsToRetrieve, + sortedKeys); + } + + public GenericEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowName, flowRunId, appId, entityType, entityId, + confsToRetrieve, metricsToRetrieve, fieldsToRetrieve); + } + + /** + * Uses the {@link EntityTable}. + */ + protected BaseTable getTable() { + return ENTITY_TABLE; + } + + @Override + protected FilterList constructFilterListBasedOnFields() { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + // Fetch all the columns. + if (fieldsToRetrieve.contains(Field.ALL) && + (confsToRetrieve == null || + confsToRetrieve.getFilterList().isEmpty()) && + (metricsToRetrieve == null || + metricsToRetrieve.getFilterList().isEmpty())) { + return list; + } + FilterList infoColFamilyList = new FilterList(); + // By default fetch everything in INFO column family. + FamilyFilter infoColumnFamily = + new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.INFO.getBytes())); + infoColFamilyList.addFilter(infoColumnFamily); + // Events not required. + if (!fieldsToRetrieve.contains(Field.EVENTS) && + !fieldsToRetrieve.contains(Field.ALL) && eventFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.EVENT.getColumnPrefixBytes("")))); + } + // info not required. + if (!fieldsToRetrieve.contains(Field.INFO) && + !fieldsToRetrieve.contains(Field.ALL) && infoFilters == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.INFO.getColumnPrefixBytes("")))); + } + // is related to not required. + if (!fieldsToRetrieve.contains(Field.IS_RELATED_TO) && + !fieldsToRetrieve.contains(Field.ALL) && isRelatedTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.IS_RELATED_TO.getColumnPrefixBytes("")))); + } + // relates to not required. + if (!fieldsToRetrieve.contains(Field.RELATES_TO) && + !fieldsToRetrieve.contains(Field.ALL) && relatesTo == null) { + infoColFamilyList.addFilter( + new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator( + EntityColumnPrefix.RELATES_TO.getColumnPrefixBytes("")))); + } + list.addFilter(infoColFamilyList); + if ((fieldsToRetrieve.contains(Field.CONFIGS) || configFilters != null) || + (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty())) { + FilterList filterCfg = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.CONFIGS.getBytes()))); + if (confsToRetrieve != null && + !confsToRetrieve.getFilterList().isEmpty()) { + filterCfg.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.CONFIG, confsToRetrieve)); + } + list.addFilter(filterCfg); + } + if ((fieldsToRetrieve.contains(Field.METRICS) || metricFilters != null) || + (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty())) { + FilterList filterMetrics = + new FilterList(new FamilyFilter(CompareOp.EQUAL, + new BinaryComparator(EntityColumnFamily.METRICS.getBytes()))); + if (metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + filterMetrics.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.METRIC, metricsToRetrieve)); + } + list.addFilter(filterMetrics); + } + return list; + } + + protected FlowContext lookupFlowContext(String clusterId, String appId, + Configuration hbaseConf, Connection conn) throws IOException { + byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); + Get get = new Get(rowKey); + Result result = appToFlowTable.getResult(hbaseConf, conn, get); + if (result != null && !result.isEmpty()) { + return new FlowContext( + AppToFlowColumn.USER_ID.readResult(result).toString(), + AppToFlowColumn.FLOW_ID.readResult(result).toString(), + ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); + } else { + throw new IOException( + "Unable to find the context flow ID and flow run ID for clusterId=" + + clusterId + ", appId=" + appId); + } + } + + protected static class FlowContext { + protected final String userId; + protected final String flowName; + protected final Long flowRunId; + public FlowContext(String user, String flowName, Long flowRunId) { + this.userId = user; + this.flowName = flowName; + this.flowRunId = flowRunId; + } + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(appId, "appId shouldn't be null"); + Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + if (singleEntityRead) { + Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + // In reality all three should be null or neither should be null + if (flowName == null || flowRunId == null || userId == null) { + FlowContext context = + lookupFlowContext(clusterId, appId, hbaseConf, conn); + flowName = context.flowName; + flowRunId = context.flowRunId; + userId = context.userId; + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + if (!fieldsToRetrieve.contains(Field.CONFIGS) && + confsToRetrieve != null && !confsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.CONFIGS); + } + if (!fieldsToRetrieve.contains(Field.METRICS) && + metricsToRetrieve != null && + !metricsToRetrieve.getFilterList().isEmpty()) { + fieldsToRetrieve.add(Field.METRICS); + } + if (!singleEntityRead) { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + if (modifiedTimeBegin == null) { + modifiedTimeBegin = DEFAULT_BEGIN_TIME; + } + if (modifiedTimeEnd == null) { + modifiedTimeEnd = DEFAULT_END_TIME; + } + } + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { + byte[] rowKey = + EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId, + entityType, entityId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + get.setFilter(filterList); + } + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException { + // Scan through part of the table to find the entities belong to one app + // and one type + Scan scan = new Scan(); + scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( + clusterId, userId, flowName, flowRunId, appId, entityType)); + scan.setMaxVersions(Integer.MAX_VALUE); + if (filterList != null && !filterList.getFilters().isEmpty()) { + scan.setFilter(filterList); + } + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + String entityType = EntityColumn.TYPE.readResult(result).toString(); + entity.setType(entityType); + String entityId = EntityColumn.ID.readResult(result).toString(); + entity.setId(entityId); + + // fetch created time + Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result); + entity.setCreatedTime(createdTime.longValue()); + if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || + entity.getCreatedTime() > createdTimeEnd)) { + return null; + } + + // fetch modified time + Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result); + entity.setModifiedTime(modifiedTime.longValue()); + if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || + entity.getModifiedTime() > modifiedTimeEnd)) { + return null; + } + + // fetch is related to entities + boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); + if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( + entity.getIsRelatedToEntities(), isRelatedTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities + boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { + readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); + if (checkRelatesTo && !TimelineStorageUtils.matchRelations( + entity.getRelatesToEntities(), relatesTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info + boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.INFO) || checkInfo) { + readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); + if (checkInfo && + !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.INFO)) { + entity.getInfo().clear(); + } + } + + // fetch configs + boolean checkConfigs = configFilters != null && configFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { + readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); + if (checkConfigs && !TimelineStorageUtils.matchFilters( + entity.getConfigs(), configFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.CONFIGS)) { + entity.getConfigs().clear(); + } + } + + // fetch events + boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { + readEvents(entity, result, false); + if (checkEvents && !TimelineStorageUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics + boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + readMetrics(entity, result, EntityColumnPrefix.METRIC); + if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.METRICS)) { + entity.getMetrics().clear(); + } + } + return entity; + } + + /** + * Helper method for reading relationship. + */ + protected void readRelationship( + TimelineEntity entity, Result result, ColumnPrefix prefix, + boolean isRelatedTo) throws IOException { + // isRelatedTo and relatesTo are of type Map> + Map columns = prefix.readResults(result); + for (Map.Entry column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded( + column.getValue().toString())) { + if (isRelatedTo) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + /** + * Helper method for reading key-value pairs for either info or config. + */ + protected void readKeyValuePairs( + TimelineEntity entity, Result result, ColumnPrefix prefix, + boolean isConfig) throws IOException { + // info and configuration are of type Map + Map columns = prefix.readResults(result); + if (isConfig) { + for (Map.Entry column : columns.entrySet()) { + entity.addConfig(column.getKey(), column.getValue().toString()); + } + } else { + entity.addInfo(columns); + } + } + + /** + * Read events from the entity table or the application table. The column name + * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted + * if there is no info associated with the event. + * + * See {@link EntityTable} and {@link ApplicationTable} for a more detailed + * schema description. + */ + protected void readEvents(TimelineEntity entity, Result result, + boolean isApplication) throws IOException { + Map eventsMap = new HashMap<>(); + Map eventsResult = isApplication ? + ApplicationColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result) : + EntityColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result); + for (Map.Entry eventResult : eventsResult.entrySet()) { + byte[][] karr = (byte[][])eventResult.getKey(); + // the column name is of the form "eventId=timestamp=infoKey" + if (karr.length == 3) { + String id = Bytes.toString(karr[0]); + long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1])); + String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); + TimelineEvent event = eventsMap.get(key); + if (event == null) { + event = new TimelineEvent(); + event.setId(id); + event.setTimestamp(ts); + eventsMap.put(key, event); + } + // handle empty info + String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); + if (infoKey != null) { + event.addInfo(infoKey, eventResult.getValue()); + } + } else { + LOG.warn("incorrectly formatted column name: it will be discarded"); + continue; + } + } + Set eventsSet = new HashSet<>(eventsMap.values()); + entity.addEvents(eventsSet); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java new file mode 100644 index 0000000..e801466 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; + +/** + * The base class for reading and deserializing timeline entities from the + * HBase storage. Different types can be defined for different types of the + * entities that are being requested. + */ +public abstract class TimelineEntityReader { + private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); + protected static final long DEFAULT_BEGIN_TIME = 0L; + protected static final long DEFAULT_END_TIME = Long.MAX_VALUE; + + protected final boolean singleEntityRead; + + protected String userId; + protected String clusterId; + protected String flowName; + protected Long flowRunId; + protected String appId; + protected String entityType; + protected EnumSet fieldsToRetrieve; + // used only for a single entity read mode + protected String entityId; + // used only for multiple entity read mode + protected Long limit; + protected Long createdTimeBegin; + protected Long createdTimeEnd; + protected Long modifiedTimeBegin; + protected Long modifiedTimeEnd; + protected Map> relatesTo; + protected Map> isRelatedTo; + protected Map infoFilters; + protected Map configFilters; + protected Set metricFilters; + protected Set eventFilters; + protected TimelineFilterList confsToRetrieve; + protected TimelineFilterList metricsToRetrieve; + + /** + * Main table the entity reader uses. + */ + protected BaseTable table; + + /** + * Specifies whether keys for this table are sorted in a manner where entities + * can be retrieved by created time. If true, it will be sufficient to collect + * the first results as specified by the limit. Otherwise all matched entities + * will be fetched and then limit applied. + */ + private boolean sortedKeys = false; + + /** + * Instantiates a reader for multiple-entity reads. + */ + protected TimelineEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + TimelineFilterList confsToRetrieve, TimelineFilterList metricsToRetrieve, + EnumSet fieldsToRetrieve, boolean sortedKeys) { + this.singleEntityRead = false; + this.sortedKeys = sortedKeys; + this.userId = userId; + this.clusterId = clusterId; + this.flowName = flowName; + this.flowRunId = flowRunId; + this.appId = appId; + this.entityType = entityType; + this.fieldsToRetrieve = fieldsToRetrieve; + this.limit = limit; + this.createdTimeBegin = createdTimeBegin; + this.createdTimeEnd = createdTimeEnd; + this.modifiedTimeBegin = modifiedTimeBegin; + this.modifiedTimeEnd = modifiedTimeEnd; + this.relatesTo = relatesTo; + this.isRelatedTo = isRelatedTo; + this.infoFilters = infoFilters; + this.configFilters = configFilters; + this.metricFilters = metricFilters; + this.eventFilters = eventFilters; + this.confsToRetrieve = confsToRetrieve; + this.metricsToRetrieve = metricsToRetrieve; + + this.table = getTable(); + } + + /** + * Instantiates a reader for single-entity reads. + */ + protected TimelineEntityReader(String userId, String clusterId, + String flowName, Long flowRunId, String appId, String entityType, + String entityId, TimelineFilterList confsToRetrieve, + TimelineFilterList metricsToRetrieve, EnumSet fieldsToRetrieve) { + this.singleEntityRead = true; + this.userId = userId; + this.clusterId = clusterId; + this.flowName = flowName; + this.flowRunId = flowRunId; + this.appId = appId; + this.entityType = entityType; + this.fieldsToRetrieve = fieldsToRetrieve; + this.entityId = entityId; + this.confsToRetrieve = confsToRetrieve; + this.metricsToRetrieve = metricsToRetrieve; + + this.table = getTable(); + } + + /** + * Creates a {@link FilterList} based on fields, confs and metrics to + * retrieve. This filter list will be set in Scan/Get objects to trim down + * results fetched from HBase back-end storage. + * @return a {@link FilterList} object. + */ + protected abstract FilterList constructFilterListBasedOnFields(); + + /** + * Reads and deserializes a single timeline entity from the HBase storage. + */ + public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) + throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + FilterList filterList = constructFilterListBasedOnFields(); + Result result = getResult(hbaseConf, conn, filterList); + if (result == null || result.isEmpty()) { + // Could not find a matching row. + LOG.info("Cannot find matching entity of type " + entityType); + return null; + } + return parseEntity(result); + } + + /** + * Reads and deserializes a set of timeline entities from the HBase storage. + * It goes through all the results available, and returns the number of + * entries as specified in the limit in the entity's natural sort order. + */ + public Set readEntities(Configuration hbaseConf, + Connection conn) throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + NavigableSet entities = new TreeSet<>(); + FilterList filterList = constructFilterListBasedOnFields(); + ResultScanner results = getResults(hbaseConf, conn, filterList); + try { + for (Result result : results) { + TimelineEntity entity = parseEntity(result); + if (entity == null) { + continue; + } + entities.add(entity); + if (!sortedKeys) { + if (entities.size() > limit) { + entities.pollLast(); + } + } else { + if (entities.size() == limit) { + break; + } + } + } + return entities; + } finally { + results.close(); + } + } + + /** + * Returns the main table to be used by the entity reader. + */ + protected abstract BaseTable getTable(); + + /** + * Validates the required parameters to read the entities. + */ + protected abstract void validateParams(); + + /** + * Sets certain parameters to defaults if the values are not provided. + */ + protected abstract void augmentParams(Configuration hbaseConf, + Connection conn) throws IOException; + + /** + * Fetches a {@link Result} instance for a single-entity read. + * + * @return the {@link Result} instance or null if no such record is found. + */ + protected abstract Result getResult(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException; + + /** + * Fetches a {@link ResultScanner} for a multi-entity read. + */ + protected abstract ResultScanner getResults(Configuration hbaseConf, + Connection conn, FilterList filterList) throws IOException; + + /** + * Given a {@link Result} instance, deserializes and creates a + * {@link TimelineEntity}. + * + * @return the {@link TimelineEntity} instance, or null if the {@link Result} + * is null or empty. + */ + protected abstract TimelineEntity parseEntity(Result result) + throws IOException; + + /** + * Helper method for reading and deserializing {@link TimelineMetric} objects + * using the specified column prefix. The timeline metrics then are added to + * the given timeline entity. + */ + protected void readMetrics(TimelineEntity entity, Result result, + ColumnPrefix columnPrefix) throws IOException { + NavigableMap> metricsResult = + columnPrefix.readResultsWithTimestamps(result); + for (Map.Entry> metricResult: + metricsResult.entrySet()) { + TimelineMetric metric = new TimelineMetric(); + metric.setId(metricResult.getKey()); + // Simply assume that if the value set contains more than 1 elements, the + // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric + metric.setType(metricResult.getValue().size() > 1 ? + TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE); + metric.addValues(metricResult.getValue()); + entity.addMetric(metric); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java new file mode 100644 index 0000000..c77897a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; + +/** + * Factory methods for instantiating a timeline entity reader. + */ +public class TimelineEntityReaderFactory { + /** + * Creates a timeline entity reader instance for reading a single entity with + * the specified input. + */ + public static TimelineEntityReader createSingleEntityReader(String userId, + String clusterId, String flowName, Long flowRunId, String appId, + String entityType, String entityId, TimelineFilterList confs, + TimelineFilterList metrics, EnumSet fieldsToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { + return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { + return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { + return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } else { + // assume we're dealing with a generic entity read + return new GenericEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, entityId, confs, metrics, fieldsToRetrieve); + } + } + + /** + * Creates a timeline entity reader instance for reading set of entities with + * the specified input and predicates. + */ + public static TimelineEntityReader createMultipleEntitiesReader(String userId, + String clusterId, String flowName, Long flowRunId, String appId, + String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + TimelineFilterList confs, TimelineFilterList metrics, + EnumSet fieldsToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { + return new ApplicationEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { + return new FlowActivityEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { + return new FlowRunEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve); + } else { + // assume we're dealing with a generic entity read + return new GenericEntityReader(userId, clusterId, flowName, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, confs, + metrics, fieldsToRetrieve, false); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java new file mode 100644 index 0000000..0b3fa38 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file