diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java deleted file mode 100644 index 6d1a2ff..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/ApplicationEntityReader.java +++ /dev/null @@ -1,272 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for application entities that are stored in the - * application table. - */ -class ApplicationEntityReader extends GenericEntityReader { - private static final ApplicationTable APPLICATION_TABLE = - new ApplicationTable(); - - public ApplicationEntityReader(String userId, String clusterId, - String flowId, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map> relatesTo, Map> isRelatedTo, - Map infoFilters, Map configFilters, - Set metricFilters, Set eventFilters, - EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, - relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, true); - } - - public ApplicationEntityReader(String userId, String clusterId, - String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); - } - - /** - * Uses the {@link ApplicationTable}. - */ - protected BaseTable getTable() { - return APPLICATION_TABLE; - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { - byte[] rowKey = - ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId, - appId); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - return table.getResult(hbaseConf, conn, get); - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(userId, "userId shouldn't be null"); - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); - if (singleEntityRead) { - Preconditions.checkNotNull(appId, "appId shouldn't be null"); - } else { - Preconditions.checkNotNull(flowId, "flowId shouldn't be null"); - } - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - if (singleEntityRead) { - if (flowId == null || flowRunId == null) { - FlowContext context = - lookupFlowContext(clusterId, appId, hbaseConf, conn); - flowId = context.flowId; - flowRunId = context.flowRunId; - } - } - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - if (!singleEntityRead) { - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } - if (modifiedTimeBegin == null) { - modifiedTimeBegin = DEFAULT_BEGIN_TIME; - } - if (modifiedTimeEnd == null) { - modifiedTimeEnd = DEFAULT_END_TIME; - } - } - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { - Scan scan = new Scan(); - if (flowRunId != null) { - scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(clusterId, userId, flowId, flowRunId)); - } else { - scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(clusterId, userId, flowId)); - } - scan.setFilter(new PageFilter(limit)); - return table.getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - if (result == null || result.isEmpty()) { - return null; - } - TimelineEntity entity = new TimelineEntity(); - entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); - String entityId = ApplicationColumn.ID.readResult(result).toString(); - entity.setId(entityId); - - // fetch created time - Number createdTime = - (Number)ApplicationColumn.CREATED_TIME.readResult(result); - entity.setCreatedTime(createdTime.longValue()); - if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || - entity.getCreatedTime() > createdTimeEnd)) { - return null; - } - - // fetch modified time - Number modifiedTime = - (Number)ApplicationColumn.MODIFIED_TIME.readResult(result); - entity.setModifiedTime(modifiedTime.longValue()); - if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || - entity.getModifiedTime() > modifiedTimeEnd)) { - return null; - } - - // fetch is related to entities - boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { - readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, - true); - if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( - entity.getIsRelatedToEntities(), isRelatedTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { - entity.getIsRelatedToEntities().clear(); - } - } - - // fetch relates to entities - boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { - readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, - false); - if (checkRelatesTo && !TimelineStorageUtils.matchRelations( - entity.getRelatesToEntities(), relatesTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.RELATES_TO)) { - entity.getRelatesToEntities().clear(); - } - } - - // fetch info - boolean checkInfo = infoFilters != null && infoFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.INFO) || checkInfo) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); - if (checkInfo && - !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.INFO)) { - entity.getInfo().clear(); - } - } - - // fetch configs - boolean checkConfigs = configFilters != null && configFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { - readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); - if (checkConfigs && !TimelineStorageUtils.matchFilters( - entity.getConfigs(), configFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.CONFIGS)) { - entity.getConfigs().clear(); - } - } - - // fetch events - boolean checkEvents = eventFilters != null && eventFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { - readEvents(entity, result, true); - if (checkEvents && !TimelineStorageUtils.matchEventFilters( - entity.getEvents(), eventFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.EVENTS)) { - entity.getEvents().clear(); - } - } - - // fetch metrics - boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { - readMetrics(entity, result, ApplicationColumnPrefix.METRIC); - if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( - entity.getMetrics(), metricFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.METRICS)) { - entity.getMetrics().clear(); - } - } - return entity; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java deleted file mode 100644 index 70a0915..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowActivityEntityReader.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.PageFilter; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for flow activity entities that are stored in the - * flow activity table. - */ -class FlowActivityEntityReader extends TimelineEntityReader { - private static final FlowActivityTable FLOW_ACTIVITY_TABLE = - new FlowActivityTable(); - - public FlowActivityEntityReader(String userId, String clusterId, - String flowId, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map> relatesTo, Map> isRelatedTo, - Map infoFilters, Map configFilters, - Set metricFilters, Set eventFilters, - EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, - relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, true); - } - - public FlowActivityEntityReader(String userId, String clusterId, - String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); - } - - /** - * Uses the {@link FlowActivityTable}. - */ - @Override - protected BaseTable getTable() { - return FLOW_ACTIVITY_TABLE; - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { - throw new UnsupportedOperationException( - "we don't support a single entity query"); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { - Scan scan = new Scan(); - scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); - // use the page filter to limit the result to the page size - // the scanner may still return more than the limit; therefore we need to - // read the right number as we iterate - scan.setFilter(new PageFilter(limit)); - return table.getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow()); - - long time = rowKey.getDayTimestamp(); - String user = rowKey.getUserId(); - String flowName = rowKey.getFlowId(); - - FlowActivityEntity flowActivity = - new FlowActivityEntity(clusterId, time, user, flowName); - // set the id - flowActivity.setId(flowActivity.getId()); - // get the list of run ids along with the version that are associated with - // this flow on this day - Map runIdsMap = - FlowActivityColumnPrefix.RUN_ID.readResults(result); - for (Map.Entry e : runIdsMap.entrySet()) { - Long runId = Long.valueOf(e.getKey()); - String version = (String)e.getValue(); - FlowRunEntity flowRun = new FlowRunEntity(); - flowRun.setUser(user); - flowRun.setName(flowName); - flowRun.setRunId(runId); - flowRun.setVersion(version); - // set the id - flowRun.setId(flowRun.getId()); - flowActivity.addFlowRun(flowRun); - } - - return flowActivity; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java deleted file mode 100644 index 90ce28f..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FlowRunEntityReader.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for flow run entities that are stored in the flow run - * table. - */ -class FlowRunEntityReader extends TimelineEntityReader { - private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable(); - - public FlowRunEntityReader(String userId, String clusterId, - String flowId, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map> relatesTo, Map> isRelatedTo, - Map infoFilters, Map configFilters, - Set metricFilters, Set eventFilters, - EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, - relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, false); - } - - public FlowRunEntityReader(String userId, String clusterId, - String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); - } - - /** - * Uses the {@link FlowRunTable}. - */ - @Override - protected BaseTable getTable() { - return FLOW_RUN_TABLE; - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(userId, "userId shouldn't be null"); - Preconditions.checkNotNull(flowId, "flowId shouldn't be null"); - Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null"); - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) { - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { - byte[] rowKey = - FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - return table.getResult(hbaseConf, conn, get); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { - throw new UnsupportedOperationException( - "multiple entity query is not supported"); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - FlowRunEntity flowRun = new FlowRunEntity(); - flowRun.setUser(userId); - flowRun.setName(flowId); - flowRun.setRunId(flowRunId); - - // read the start time - Number startTime = (Number)FlowRunColumn.MIN_START_TIME.readResult(result); - if (startTime != null) { - flowRun.setStartTime(startTime.longValue()); - } - // read the end time if available - Number endTime = (Number)FlowRunColumn.MAX_END_TIME.readResult(result); - if (endTime != null) { - flowRun.setMaxEndTime(endTime.longValue()); - } - - // read the flow version - String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result); - if (version != null) { - flowRun.setVersion(version); - } - - // read metrics - readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); - - // set the id - flowRun.setId(flowRun.getId()); - return flowRun; - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java deleted file mode 100644 index c18966f..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/GenericEntityReader.java +++ /dev/null @@ -1,389 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; - -import com.google.common.base.Preconditions; - -/** - * Timeline entity reader for generic entities that are stored in the entity - * table. - */ -class GenericEntityReader extends TimelineEntityReader { - private static final EntityTable ENTITY_TABLE = new EntityTable(); - private static final Log LOG = LogFactory.getLog(GenericEntityReader.class); - - protected static final long DEFAULT_BEGIN_TIME = 0L; - protected static final long DEFAULT_END_TIME = Long.MAX_VALUE; - - /** - * Used to look up the flow context. - */ - private final AppToFlowTable appToFlowTable = new AppToFlowTable(); - - public GenericEntityReader(String userId, String clusterId, - String flowId, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map> relatesTo, Map> isRelatedTo, - Map infoFilters, Map configFilters, - Set metricFilters, Set eventFilters, - EnumSet fieldsToRetrieve, boolean sortedKeys) { - super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, - createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, - relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, - eventFilters, fieldsToRetrieve, sortedKeys); - } - - public GenericEntityReader(String userId, String clusterId, - String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { - super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, - fieldsToRetrieve); - } - - /** - * Uses the {@link EntityTable}. - */ - protected BaseTable getTable() { - return ENTITY_TABLE; - } - - protected FlowContext lookupFlowContext(String clusterId, String appId, - Configuration hbaseConf, Connection conn) throws IOException { - byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); - Get get = new Get(rowKey); - Result result = appToFlowTable.getResult(hbaseConf, conn, get); - if (result != null && !result.isEmpty()) { - return new FlowContext( - AppToFlowColumn.FLOW_ID.readResult(result).toString(), - ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); - } else { - throw new IOException( - "Unable to find the context flow ID and flow run ID for clusterId=" + - clusterId + ", appId=" + appId); - } - } - - protected static class FlowContext { - protected final String flowId; - protected final Long flowRunId; - public FlowContext(String flowId, Long flowRunId) { - this.flowId = flowId; - this.flowRunId = flowRunId; - } - } - - @Override - protected void validateParams() { - Preconditions.checkNotNull(userId, "userId shouldn't be null"); - Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); - Preconditions.checkNotNull(appId, "appId shouldn't be null"); - Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); - if (singleEntityRead) { - Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); - } - } - - @Override - protected void augmentParams(Configuration hbaseConf, Connection conn) - throws IOException { - // In reality both should be null or neither should be null - if (flowId == null || flowRunId == null) { - FlowContext context = - lookupFlowContext(clusterId, appId, hbaseConf, conn); - flowId = context.flowId; - flowRunId = context.flowRunId; - } - if (fieldsToRetrieve == null) { - fieldsToRetrieve = EnumSet.noneOf(Field.class); - } - if (!singleEntityRead) { - if (limit == null || limit < 0) { - limit = TimelineReader.DEFAULT_LIMIT; - } - if (createdTimeBegin == null) { - createdTimeBegin = DEFAULT_BEGIN_TIME; - } - if (createdTimeEnd == null) { - createdTimeEnd = DEFAULT_END_TIME; - } - if (modifiedTimeBegin == null) { - modifiedTimeBegin = DEFAULT_BEGIN_TIME; - } - if (modifiedTimeEnd == null) { - modifiedTimeEnd = DEFAULT_END_TIME; - } - } - } - - @Override - protected Result getResult(Configuration hbaseConf, Connection conn) - throws IOException { - byte[] rowKey = - EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId, - entityType, entityId); - Get get = new Get(rowKey); - get.setMaxVersions(Integer.MAX_VALUE); - return table.getResult(hbaseConf, conn, get); - } - - @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException { - // Scan through part of the table to find the entities belong to one app - // and one type - Scan scan = new Scan(); - scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( - clusterId, userId, flowId, flowRunId, appId, entityType)); - scan.setMaxVersions(Integer.MAX_VALUE); - return table.getResultScanner(hbaseConf, conn, scan); - } - - @Override - protected TimelineEntity parseEntity(Result result) throws IOException { - if (result == null || result.isEmpty()) { - return null; - } - TimelineEntity entity = new TimelineEntity(); - String entityType = EntityColumn.TYPE.readResult(result).toString(); - entity.setType(entityType); - String entityId = EntityColumn.ID.readResult(result).toString(); - entity.setId(entityId); - - // fetch created time - Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result); - entity.setCreatedTime(createdTime.longValue()); - if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || - entity.getCreatedTime() > createdTimeEnd)) { - return null; - } - - // fetch modified time - Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result); - entity.setModifiedTime(modifiedTime.longValue()); - if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || - entity.getModifiedTime() > modifiedTimeEnd)) { - return null; - } - - // fetch is related to entities - boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { - readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); - if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( - entity.getIsRelatedToEntities(), isRelatedTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { - entity.getIsRelatedToEntities().clear(); - } - } - - // fetch relates to entities - boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { - readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); - if (checkRelatesTo && !TimelineStorageUtils.matchRelations( - entity.getRelatesToEntities(), relatesTo)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.RELATES_TO)) { - entity.getRelatesToEntities().clear(); - } - } - - // fetch info - boolean checkInfo = infoFilters != null && infoFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.INFO) || checkInfo) { - readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); - if (checkInfo && - !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.INFO)) { - entity.getInfo().clear(); - } - } - - // fetch configs - boolean checkConfigs = configFilters != null && configFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { - readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); - if (checkConfigs && !TimelineStorageUtils.matchFilters( - entity.getConfigs(), configFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.CONFIGS)) { - entity.getConfigs().clear(); - } - } - - // fetch events - boolean checkEvents = eventFilters != null && eventFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { - readEvents(entity, result, false); - if (checkEvents && !TimelineStorageUtils.matchEventFilters( - entity.getEvents(), eventFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.EVENTS)) { - entity.getEvents().clear(); - } - } - - // fetch metrics - boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; - if (fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { - readMetrics(entity, result, EntityColumnPrefix.METRIC); - if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( - entity.getMetrics(), metricFilters)) { - return null; - } - if (!fieldsToRetrieve.contains(Field.ALL) && - !fieldsToRetrieve.contains(Field.METRICS)) { - entity.getMetrics().clear(); - } - } - return entity; - } - - /** - * Helper method for reading relationship. - */ - protected void readRelationship( - TimelineEntity entity, Result result, ColumnPrefix prefix, - boolean isRelatedTo) throws IOException { - // isRelatedTo and relatesTo are of type Map> - Map columns = prefix.readResults(result); - for (Map.Entry column : columns.entrySet()) { - for (String id : Separator.VALUES.splitEncoded( - column.getValue().toString())) { - if (isRelatedTo) { - entity.addIsRelatedToEntity(column.getKey(), id); - } else { - entity.addRelatesToEntity(column.getKey(), id); - } - } - } - } - - /** - * Helper method for reading key-value pairs for either info or config. - */ - protected void readKeyValuePairs( - TimelineEntity entity, Result result, ColumnPrefix prefix, - boolean isConfig) throws IOException { - // info and configuration are of type Map - Map columns = prefix.readResults(result); - if (isConfig) { - for (Map.Entry column : columns.entrySet()) { - entity.addConfig(column.getKey(), column.getValue().toString()); - } - } else { - entity.addInfo(columns); - } - } - - /** - * Read events from the entity table or the application table. The column name - * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted - * if there is no info associated with the event. - * - * See {@link EntityTable} and {@link ApplicationTable} for a more detailed - * schema description. - */ - protected void readEvents(TimelineEntity entity, Result result, - boolean isApplication) throws IOException { - Map eventsMap = new HashMap<>(); - Map eventsResult = isApplication ? - ApplicationColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result) : - EntityColumnPrefix.EVENT. - readResultsHavingCompoundColumnQualifiers(result); - for (Map.Entry eventResult : eventsResult.entrySet()) { - byte[][] karr = (byte[][])eventResult.getKey(); - // the column name is of the form "eventId=timestamp=infoKey" - if (karr.length == 3) { - String id = Bytes.toString(karr[0]); - long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1])); - String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); - TimelineEvent event = eventsMap.get(key); - if (event == null) { - event = new TimelineEvent(); - event.setId(id); - event.setTimestamp(ts); - eventsMap.put(key, event); - } - // handle empty info - String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); - if (infoKey != null) { - event.addInfo(infoKey, eventResult.getValue()); - } - } else { - LOG.warn("incorrectly formatted column name: it will be discarded"); - continue; - } - } - Set eventsSet = new HashSet<>(eventsMap.values()); - entity.addEvents(eventsSet); - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index 889ae19..d62c2ba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.reader.TimelineEntityReaderFactory; public class HBaseTimelineReaderImpl extends AbstractService implements TimelineReader { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java deleted file mode 100644 index d4a659c..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReader.java +++ /dev/null @@ -1,251 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.Map; -import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; - -/** - * The base class for reading and deserializing timeline entities from the - * HBase storage. Different types can be defined for different types of the - * entities that are being requested. - */ -abstract class TimelineEntityReader { - private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); - protected final boolean singleEntityRead; - - protected String userId; - protected String clusterId; - protected String flowId; - protected Long flowRunId; - protected String appId; - protected String entityType; - protected EnumSet fieldsToRetrieve; - // used only for a single entity read mode - protected String entityId; - // used only for multiple entity read mode - protected Long limit; - protected Long createdTimeBegin; - protected Long createdTimeEnd; - protected Long modifiedTimeBegin; - protected Long modifiedTimeEnd; - protected Map> relatesTo; - protected Map> isRelatedTo; - protected Map infoFilters; - protected Map configFilters; - protected Set metricFilters; - protected Set eventFilters; - - /** - * Main table the entity reader uses. - */ - protected BaseTable table; - - /** - * Specifies whether keys for this table are sorted in a manner where entities - * can be retrieved by created time. If true, it will be sufficient to collect - * the first results as specified by the limit. Otherwise all matched entities - * will be fetched and then limit applied. - */ - private boolean sortedKeys = false; - - /** - * Instantiates a reader for multiple-entity reads. - */ - protected TimelineEntityReader(String userId, String clusterId, - String flowId, Long flowRunId, String appId, String entityType, - Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map> relatesTo, Map> isRelatedTo, - Map infoFilters, Map configFilters, - Set metricFilters, Set eventFilters, - EnumSet fieldsToRetrieve, boolean sortedKeys) { - this.singleEntityRead = false; - this.sortedKeys = sortedKeys; - this.userId = userId; - this.clusterId = clusterId; - this.flowId = flowId; - this.flowRunId = flowRunId; - this.appId = appId; - this.entityType = entityType; - this.fieldsToRetrieve = fieldsToRetrieve; - this.limit = limit; - this.createdTimeBegin = createdTimeBegin; - this.createdTimeEnd = createdTimeEnd; - this.modifiedTimeBegin = modifiedTimeBegin; - this.modifiedTimeEnd = modifiedTimeEnd; - this.relatesTo = relatesTo; - this.isRelatedTo = isRelatedTo; - this.infoFilters = infoFilters; - this.configFilters = configFilters; - this.metricFilters = metricFilters; - this.eventFilters = eventFilters; - - this.table = getTable(); - } - - /** - * Instantiates a reader for single-entity reads. - */ - protected TimelineEntityReader(String userId, String clusterId, - String flowId, Long flowRunId, String appId, String entityType, - String entityId, EnumSet fieldsToRetrieve) { - this.singleEntityRead = true; - this.userId = userId; - this.clusterId = clusterId; - this.flowId = flowId; - this.flowRunId = flowRunId; - this.appId = appId; - this.entityType = entityType; - this.fieldsToRetrieve = fieldsToRetrieve; - this.entityId = entityId; - - this.table = getTable(); - } - - /** - * Reads and deserializes a single timeline entity from the HBase storage. - */ - public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) - throws IOException { - validateParams(); - augmentParams(hbaseConf, conn); - - Result result = getResult(hbaseConf, conn); - if (result == null || result.isEmpty()) { - // Could not find a matching row. - LOG.info("Cannot find matching entity of type " + entityType); - return null; - } - return parseEntity(result); - } - - /** - * Reads and deserializes a set of timeline entities from the HBase storage. - * It goes through all the results available, and returns the number of - * entries as specified in the limit in the entity's natural sort order. - */ - public Set readEntities(Configuration hbaseConf, - Connection conn) throws IOException { - validateParams(); - augmentParams(hbaseConf, conn); - - NavigableSet entities = new TreeSet<>(); - ResultScanner results = getResults(hbaseConf, conn); - try { - for (Result result : results) { - TimelineEntity entity = parseEntity(result); - if (entity == null) { - continue; - } - entities.add(entity); - if (!sortedKeys) { - if (entities.size() > limit) { - entities.pollLast(); - } - } else { - if (entities.size() == limit) { - break; - } - } - } - return entities; - } finally { - results.close(); - } - } - - /** - * Returns the main table to be used by the entity reader. - */ - protected abstract BaseTable getTable(); - - /** - * Validates the required parameters to read the entities. - */ - protected abstract void validateParams(); - - /** - * Sets certain parameters to defaults if the values are not provided. - */ - protected abstract void augmentParams(Configuration hbaseConf, - Connection conn) throws IOException; - - /** - * Fetches a {@link Result} instance for a single-entity read. - * - * @return the {@link Result} instance or null if no such record is found. - */ - protected abstract Result getResult(Configuration hbaseConf, Connection conn) - throws IOException; - - /** - * Fetches a {@link ResultScanner} for a multi-entity read. - */ - protected abstract ResultScanner getResults(Configuration hbaseConf, - Connection conn) throws IOException; - - /** - * Given a {@link Result} instance, deserializes and creates a - * {@link TimelineEntity}. - * - * @return the {@link TimelineEntity} instance, or null if the {@link Result} - * is null or empty. - */ - protected abstract TimelineEntity parseEntity(Result result) - throws IOException; - - /** - * Helper method for reading and deserializing {@link TimelineMetric} objects - * using the specified column prefix. The timeline metrics then are added to - * the given timeline entity. - */ - protected void readMetrics(TimelineEntity entity, Result result, - ColumnPrefix columnPrefix) throws IOException { - NavigableMap> metricsResult = - columnPrefix.readResultsWithTimestamps(result); - for (Map.Entry> metricResult: - metricsResult.entrySet()) { - TimelineMetric metric = new TimelineMetric(); - metric.setId(metricResult.getKey()); - // Simply assume that if the value set contains more than 1 elements, the - // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric - metric.setType(metricResult.getValue().size() > 1 ? - TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE); - metric.addValues(metricResult.getValue()); - entity.addMetric(metric); - } - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java deleted file mode 100644 index f5341c2..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntityReaderFactory.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; - -/** - * Factory methods for instantiating a timeline entity reader. - */ -class TimelineEntityReaderFactory { - /** - * Creates a timeline entity reader instance for reading a single entity with - * the specified input. - */ - public static TimelineEntityReader createSingleEntityReader(String userId, - String clusterId, String flowId, Long flowRunId, String appId, - String entityType, String entityId, EnumSet fieldsToRetrieve) { - // currently the types that are handled separate from the generic entity - // table are application, flow run, and flow activity entities - if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { - return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { - return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { - return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); - } else { - // assume we're dealing with a generic entity read - return new GenericEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, entityId, fieldsToRetrieve); - } - } - - /** - * Creates a timeline entity reader instance for reading set of entities with - * the specified input and predicates. - */ - public static TimelineEntityReader createMultipleEntitiesReader(String userId, - String clusterId, String flowId, Long flowRunId, String appId, - String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd, - Long modifiedTimeBegin, Long modifiedTimeEnd, - Map> relatesTo, Map> isRelatedTo, - Map infoFilters, Map configFilters, - Set metricFilters, Set eventFilters, - EnumSet fieldsToRetrieve) { - // currently the types that are handled separate from the generic entity - // table are application, flow run, and flow activity entities - if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { - return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, - modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { - return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, - modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve); - } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { - return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, - modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve); - } else { - // assume we're dealing with a generic entity read - return new GenericEntityReader(userId, clusterId, flowId, flowRunId, - appId, entityType, limit, createdTimeBegin, createdTimeEnd, - modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, - infoFilters, configFilters, metricFilters, eventFilters, - fieldsToRetrieve, false); - } - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java new file mode 100644 index 0000000..262ccab --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for application entities that are stored in the + * application table. + */ +class ApplicationEntityReader extends GenericEntityReader { + private static final ApplicationTable APPLICATION_TABLE = + new ApplicationTable(); + + public ApplicationEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, fieldsToRetrieve, true); + } + + public ApplicationEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, + fieldsToRetrieve); + } + + /** + * Uses the {@link ApplicationTable}. + */ + protected BaseTable getTable() { + return APPLICATION_TABLE; + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn) + throws IOException { + byte[] rowKey = + ApplicationRowKey.getRowKey(clusterId, userId, flowId, flowRunId, + appId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + if (singleEntityRead) { + Preconditions.checkNotNull(appId, "appId shouldn't be null"); + } else { + Preconditions.checkNotNull(flowId, "flowId shouldn't be null"); + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + if (singleEntityRead) { + if (flowId == null || flowRunId == null) { + FlowContext context = + lookupFlowContext(clusterId, appId, hbaseConf, conn); + flowId = context.flowId; + flowRunId = context.flowRunId; + } + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + if (!singleEntityRead) { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + if (modifiedTimeBegin == null) { + modifiedTimeBegin = DEFAULT_BEGIN_TIME; + } + if (modifiedTimeEnd == null) { + modifiedTimeEnd = DEFAULT_END_TIME; + } + } + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn) throws IOException { + Scan scan = new Scan(); + if (flowRunId != null) { + scan.setRowPrefixFilter(ApplicationRowKey. + getRowKeyPrefix(clusterId, userId, flowId, flowRunId)); + } else { + scan.setRowPrefixFilter(ApplicationRowKey. + getRowKeyPrefix(clusterId, userId, flowId)); + } + scan.setFilter(new PageFilter(limit)); + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); + String entityId = ApplicationColumn.ID.readResult(result).toString(); + entity.setId(entityId); + + // fetch created time + Number createdTime = + (Number)ApplicationColumn.CREATED_TIME.readResult(result); + entity.setCreatedTime(createdTime.longValue()); + if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || + entity.getCreatedTime() > createdTimeEnd)) { + return null; + } + + // fetch modified time + Number modifiedTime = + (Number)ApplicationColumn.MODIFIED_TIME.readResult(result); + entity.setModifiedTime(modifiedTime.longValue()); + if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || + entity.getModifiedTime() > modifiedTimeEnd)) { + return null; + } + + // fetch is related to entities + boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, + true); + if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( + entity.getIsRelatedToEntities(), isRelatedTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities + boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { + readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, + false); + if (checkRelatesTo && !TimelineStorageUtils.matchRelations( + entity.getRelatesToEntities(), relatesTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info + boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.INFO) || checkInfo) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); + if (checkInfo && + !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.INFO)) { + entity.getInfo().clear(); + } + } + + // fetch configs + boolean checkConfigs = configFilters != null && configFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); + if (checkConfigs && !TimelineStorageUtils.matchFilters( + entity.getConfigs(), configFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.CONFIGS)) { + entity.getConfigs().clear(); + } + } + + // fetch events + boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { + readEvents(entity, result, true); + if (checkEvents && !TimelineStorageUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics + boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + readMetrics(entity, result, ApplicationColumnPrefix.METRIC); + if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.METRICS)) { + entity.getMetrics().clear(); + } + } + return entity; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java new file mode 100644 index 0000000..ddddda8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for flow activity entities that are stored in the + * flow activity table. + */ +class FlowActivityEntityReader extends TimelineEntityReader { + private static final FlowActivityTable FLOW_ACTIVITY_TABLE = + new FlowActivityTable(); + + public FlowActivityEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, fieldsToRetrieve, true); + } + + public FlowActivityEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, + fieldsToRetrieve); + } + + /** + * Uses the {@link FlowActivityTable}. + */ + @Override + protected BaseTable getTable() { + return FLOW_ACTIVITY_TABLE; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn) + throws IOException { + throw new UnsupportedOperationException( + "we don't support a single entity query"); + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn) throws IOException { + Scan scan = new Scan(); + scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); + // use the page filter to limit the result to the page size + // the scanner may still return more than the limit; therefore we need to + // read the right number as we iterate + scan.setFilter(new PageFilter(limit)); + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(result.getRow()); + + long time = rowKey.getDayTimestamp(); + String user = rowKey.getUserId(); + String flowName = rowKey.getFlowId(); + + FlowActivityEntity flowActivity = + new FlowActivityEntity(clusterId, time, user, flowName); + // set the id + flowActivity.setId(flowActivity.getId()); + // get the list of run ids along with the version that are associated with + // this flow on this day + Map runIdsMap = + FlowActivityColumnPrefix.RUN_ID.readResults(result); + for (Map.Entry e : runIdsMap.entrySet()) { + Long runId = Long.valueOf(e.getKey()); + String version = (String)e.getValue(); + FlowRunEntity flowRun = new FlowRunEntity(); + flowRun.setUser(user); + flowRun.setName(flowName); + flowRun.setRunId(runId); + flowRun.setVersion(version); + // set the id + flowRun.setId(flowRun.getId()); + flowActivity.addFlowRun(flowRun); + } + + return flowActivity; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java new file mode 100644 index 0000000..a532fa5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for flow run entities that are stored in the flow run + * table. + */ +class FlowRunEntityReader extends TimelineEntityReader { + private static final FlowRunTable FLOW_RUN_TABLE = new FlowRunTable(); + + public FlowRunEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, fieldsToRetrieve, false); + } + + public FlowRunEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, + fieldsToRetrieve); + } + + /** + * Uses the {@link FlowRunTable}. + */ + @Override + protected BaseTable getTable() { + return FLOW_RUN_TABLE; + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(flowId, "flowId shouldn't be null"); + Preconditions.checkNotNull(flowRunId, "flowRunId shouldn't be null"); + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) { + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn) + throws IOException { + byte[] rowKey = + FlowRunRowKey.getRowKey(clusterId, userId, flowId, flowRunId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn) throws IOException { + throw new UnsupportedOperationException( + "multiple entity query is not supported"); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + FlowRunEntity flowRun = new FlowRunEntity(); + flowRun.setUser(userId); + flowRun.setName(flowId); + flowRun.setRunId(flowRunId); + + // read the start time + Number startTime = (Number)FlowRunColumn.MIN_START_TIME.readResult(result); + if (startTime != null) { + flowRun.setStartTime(startTime.longValue()); + } + // read the end time if available + Number endTime = (Number)FlowRunColumn.MAX_END_TIME.readResult(result); + if (endTime != null) { + flowRun.setMaxEndTime(endTime.longValue()); + } + + // read the flow version + String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result); + if (version != null) { + flowRun.setVersion(version); + } + + // read metrics + readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); + + // set the id + flowRun.setId(flowRun.getId()); + return flowRun; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java new file mode 100644 index 0000000..958a5ec --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; + +import com.google.common.base.Preconditions; + +/** + * Timeline entity reader for generic entities that are stored in the entity + * table. + */ +class GenericEntityReader extends TimelineEntityReader { + private static final EntityTable ENTITY_TABLE = new EntityTable(); + private static final Log LOG = LogFactory.getLog(GenericEntityReader.class); + + protected static final long DEFAULT_BEGIN_TIME = 0L; + protected static final long DEFAULT_END_TIME = Long.MAX_VALUE; + + /** + * Used to look up the flow context. + */ + private final AppToFlowTable appToFlowTable = new AppToFlowTable(); + + public GenericEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve, boolean sortedKeys) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, limit, + createdTimeBegin, createdTimeEnd, modifiedTimeBegin, modifiedTimeEnd, + relatesTo, isRelatedTo, infoFilters, configFilters, metricFilters, + eventFilters, fieldsToRetrieve, sortedKeys); + } + + public GenericEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) { + super(userId, clusterId, flowId, flowRunId, appId, entityType, entityId, + fieldsToRetrieve); + } + + /** + * Uses the {@link EntityTable}. + */ + protected BaseTable getTable() { + return ENTITY_TABLE; + } + + protected FlowContext lookupFlowContext(String clusterId, String appId, + Configuration hbaseConf, Connection conn) throws IOException { + byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); + Get get = new Get(rowKey); + Result result = appToFlowTable.getResult(hbaseConf, conn, get); + if (result != null && !result.isEmpty()) { + return new FlowContext( + AppToFlowColumn.FLOW_ID.readResult(result).toString(), + ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); + } else { + throw new IOException( + "Unable to find the context flow ID and flow run ID for clusterId=" + + clusterId + ", appId=" + appId); + } + } + + protected static class FlowContext { + protected final String flowId; + protected final Long flowRunId; + public FlowContext(String flowId, Long flowRunId) { + this.flowId = flowId; + this.flowRunId = flowRunId; + } + } + + @Override + protected void validateParams() { + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(appId, "appId shouldn't be null"); + Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + if (singleEntityRead) { + Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); + } + } + + @Override + protected void augmentParams(Configuration hbaseConf, Connection conn) + throws IOException { + // In reality both should be null or neither should be null + if (flowId == null || flowRunId == null) { + FlowContext context = + lookupFlowContext(clusterId, appId, hbaseConf, conn); + flowId = context.flowId; + flowRunId = context.flowRunId; + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + if (!singleEntityRead) { + if (limit == null || limit < 0) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = DEFAULT_BEGIN_TIME; + } + if (createdTimeEnd == null) { + createdTimeEnd = DEFAULT_END_TIME; + } + if (modifiedTimeBegin == null) { + modifiedTimeBegin = DEFAULT_BEGIN_TIME; + } + if (modifiedTimeEnd == null) { + modifiedTimeEnd = DEFAULT_END_TIME; + } + } + } + + @Override + protected Result getResult(Configuration hbaseConf, Connection conn) + throws IOException { + byte[] rowKey = + EntityRowKey.getRowKey(clusterId, userId, flowId, flowRunId, appId, + entityType, entityId); + Get get = new Get(rowKey); + get.setMaxVersions(Integer.MAX_VALUE); + return table.getResult(hbaseConf, conn, get); + } + + @Override + protected ResultScanner getResults(Configuration hbaseConf, + Connection conn) throws IOException { + // Scan through part of the table to find the entities belong to one app + // and one type + Scan scan = new Scan(); + scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( + clusterId, userId, flowId, flowRunId, appId, entityType)); + scan.setMaxVersions(Integer.MAX_VALUE); + return table.getResultScanner(hbaseConf, conn, scan); + } + + @Override + protected TimelineEntity parseEntity(Result result) throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + String entityType = EntityColumn.TYPE.readResult(result).toString(); + entity.setType(entityType); + String entityId = EntityColumn.ID.readResult(result).toString(); + entity.setId(entityId); + + // fetch created time + Number createdTime = (Number)EntityColumn.CREATED_TIME.readResult(result); + entity.setCreatedTime(createdTime.longValue()); + if (!singleEntityRead && (entity.getCreatedTime() < createdTimeBegin || + entity.getCreatedTime() > createdTimeEnd)) { + return null; + } + + // fetch modified time + Number modifiedTime = (Number)EntityColumn.MODIFIED_TIME.readResult(result); + entity.setModifiedTime(modifiedTime.longValue()); + if (!singleEntityRead && (entity.getModifiedTime() < modifiedTimeBegin || + entity.getModifiedTime() > modifiedTimeEnd)) { + return null; + } + + // fetch is related to entities + boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); + if (checkIsRelatedTo && !TimelineStorageUtils.matchRelations( + entity.getIsRelatedToEntities(), isRelatedTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities + boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { + readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); + if (checkRelatesTo && !TimelineStorageUtils.matchRelations( + entity.getRelatesToEntities(), relatesTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info + boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.INFO) || checkInfo) { + readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); + if (checkInfo && + !TimelineStorageUtils.matchFilters(entity.getInfo(), infoFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.INFO)) { + entity.getInfo().clear(); + } + } + + // fetch configs + boolean checkConfigs = configFilters != null && configFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { + readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); + if (checkConfigs && !TimelineStorageUtils.matchFilters( + entity.getConfigs(), configFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.CONFIGS)) { + entity.getConfigs().clear(); + } + } + + // fetch events + boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { + readEvents(entity, result, false); + if (checkEvents && !TimelineStorageUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics + boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + readMetrics(entity, result, EntityColumnPrefix.METRIC); + if (checkMetrics && !TimelineStorageUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.METRICS)) { + entity.getMetrics().clear(); + } + } + return entity; + } + + /** + * Helper method for reading relationship. + */ + protected void readRelationship( + TimelineEntity entity, Result result, ColumnPrefix prefix, + boolean isRelatedTo) throws IOException { + // isRelatedTo and relatesTo are of type Map> + Map columns = prefix.readResults(result); + for (Map.Entry column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded( + column.getValue().toString())) { + if (isRelatedTo) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + /** + * Helper method for reading key-value pairs for either info or config. + */ + protected void readKeyValuePairs( + TimelineEntity entity, Result result, ColumnPrefix prefix, + boolean isConfig) throws IOException { + // info and configuration are of type Map + Map columns = prefix.readResults(result); + if (isConfig) { + for (Map.Entry column : columns.entrySet()) { + entity.addConfig(column.getKey(), column.getValue().toString()); + } + } else { + entity.addInfo(columns); + } + } + + /** + * Read events from the entity table or the application table. The column name + * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted + * if there is no info associated with the event. + * + * See {@link EntityTable} and {@link ApplicationTable} for a more detailed + * schema description. + */ + protected void readEvents(TimelineEntity entity, Result result, + boolean isApplication) throws IOException { + Map eventsMap = new HashMap<>(); + Map eventsResult = isApplication ? + ApplicationColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result) : + EntityColumnPrefix.EVENT. + readResultsHavingCompoundColumnQualifiers(result); + for (Map.Entry eventResult : eventsResult.entrySet()) { + byte[][] karr = (byte[][])eventResult.getKey(); + // the column name is of the form "eventId=timestamp=infoKey" + if (karr.length == 3) { + String id = Bytes.toString(karr[0]); + long ts = TimelineStorageUtils.invertLong(Bytes.toLong(karr[1])); + String key = Separator.VALUES.joinEncoded(id, Long.toString(ts)); + TimelineEvent event = eventsMap.get(key); + if (event == null) { + event = new TimelineEvent(); + event.setId(id); + event.setTimestamp(ts); + eventsMap.put(key, event); + } + // handle empty info + String infoKey = karr[2].length == 0 ? null : Bytes.toString(karr[2]); + if (infoKey != null) { + event.addInfo(infoKey, eventResult.getValue()); + } + } else { + LOG.warn("incorrectly formatted column name: it will be discarded"); + continue; + } + } + Set eventsSet = new HashSet<>(eventsMap.values()); + entity.addEvents(eventsSet); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java new file mode 100644 index 0000000..5ec670d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; + +/** + * The base class for reading and deserializing timeline entities from the + * HBase storage. Different types can be defined for different types of the + * entities that are being requested. + */ +public abstract class TimelineEntityReader { + private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class); + protected final boolean singleEntityRead; + + protected String userId; + protected String clusterId; + protected String flowId; + protected Long flowRunId; + protected String appId; + protected String entityType; + protected EnumSet fieldsToRetrieve; + // used only for a single entity read mode + protected String entityId; + // used only for multiple entity read mode + protected Long limit; + protected Long createdTimeBegin; + protected Long createdTimeEnd; + protected Long modifiedTimeBegin; + protected Long modifiedTimeEnd; + protected Map> relatesTo; + protected Map> isRelatedTo; + protected Map infoFilters; + protected Map configFilters; + protected Set metricFilters; + protected Set eventFilters; + + /** + * Main table the entity reader uses. + */ + protected BaseTable table; + + /** + * Specifies whether keys for this table are sorted in a manner where entities + * can be retrieved by created time. If true, it will be sufficient to collect + * the first results as specified by the limit. Otherwise all matched entities + * will be fetched and then limit applied. + */ + private boolean sortedKeys = false; + + /** + * Instantiates a reader for multiple-entity reads. + */ + protected TimelineEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve, boolean sortedKeys) { + this.singleEntityRead = false; + this.sortedKeys = sortedKeys; + this.userId = userId; + this.clusterId = clusterId; + this.flowId = flowId; + this.flowRunId = flowRunId; + this.appId = appId; + this.entityType = entityType; + this.fieldsToRetrieve = fieldsToRetrieve; + this.limit = limit; + this.createdTimeBegin = createdTimeBegin; + this.createdTimeEnd = createdTimeEnd; + this.modifiedTimeBegin = modifiedTimeBegin; + this.modifiedTimeEnd = modifiedTimeEnd; + this.relatesTo = relatesTo; + this.isRelatedTo = isRelatedTo; + this.infoFilters = infoFilters; + this.configFilters = configFilters; + this.metricFilters = metricFilters; + this.eventFilters = eventFilters; + + this.table = getTable(); + } + + /** + * Instantiates a reader for single-entity reads. + */ + protected TimelineEntityReader(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) { + this.singleEntityRead = true; + this.userId = userId; + this.clusterId = clusterId; + this.flowId = flowId; + this.flowRunId = flowRunId; + this.appId = appId; + this.entityType = entityType; + this.fieldsToRetrieve = fieldsToRetrieve; + this.entityId = entityId; + + this.table = getTable(); + } + + /** + * Reads and deserializes a single timeline entity from the HBase storage. + */ + public TimelineEntity readEntity(Configuration hbaseConf, Connection conn) + throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + Result result = getResult(hbaseConf, conn); + if (result == null || result.isEmpty()) { + // Could not find a matching row. + LOG.info("Cannot find matching entity of type " + entityType); + return null; + } + return parseEntity(result); + } + + /** + * Reads and deserializes a set of timeline entities from the HBase storage. + * It goes through all the results available, and returns the number of + * entries as specified in the limit in the entity's natural sort order. + */ + public Set readEntities(Configuration hbaseConf, + Connection conn) throws IOException { + validateParams(); + augmentParams(hbaseConf, conn); + + NavigableSet entities = new TreeSet<>(); + ResultScanner results = getResults(hbaseConf, conn); + try { + for (Result result : results) { + TimelineEntity entity = parseEntity(result); + if (entity == null) { + continue; + } + entities.add(entity); + if (!sortedKeys) { + if (entities.size() > limit) { + entities.pollLast(); + } + } else { + if (entities.size() == limit) { + break; + } + } + } + return entities; + } finally { + results.close(); + } + } + + /** + * Returns the main table to be used by the entity reader. + */ + protected abstract BaseTable getTable(); + + /** + * Validates the required parameters to read the entities. + */ + protected abstract void validateParams(); + + /** + * Sets certain parameters to defaults if the values are not provided. + */ + protected abstract void augmentParams(Configuration hbaseConf, + Connection conn) throws IOException; + + /** + * Fetches a {@link Result} instance for a single-entity read. + * + * @return the {@link Result} instance or null if no such record is found. + */ + protected abstract Result getResult(Configuration hbaseConf, Connection conn) + throws IOException; + + /** + * Fetches a {@link ResultScanner} for a multi-entity read. + */ + protected abstract ResultScanner getResults(Configuration hbaseConf, + Connection conn) throws IOException; + + /** + * Given a {@link Result} instance, deserializes and creates a + * {@link TimelineEntity}. + * + * @return the {@link TimelineEntity} instance, or null if the {@link Result} + * is null or empty. + */ + protected abstract TimelineEntity parseEntity(Result result) + throws IOException; + + /** + * Helper method for reading and deserializing {@link TimelineMetric} objects + * using the specified column prefix. The timeline metrics then are added to + * the given timeline entity. + */ + protected void readMetrics(TimelineEntity entity, Result result, + ColumnPrefix columnPrefix) throws IOException { + NavigableMap> metricsResult = + columnPrefix.readResultsWithTimestamps(result); + for (Map.Entry> metricResult: + metricsResult.entrySet()) { + TimelineMetric metric = new TimelineMetric(); + metric.setId(metricResult.getKey()); + // Simply assume that if the value set contains more than 1 elements, the + // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric + metric.setType(metricResult.getValue().size() > 1 ? + TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE); + metric.addValues(metricResult.getValue()); + entity.addMetric(metric); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java new file mode 100644 index 0000000..a43d182 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReaderFactory.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.reader; + +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; + +/** + * Factory methods for instantiating a timeline entity reader. + */ +public class TimelineEntityReaderFactory { + /** + * Creates a timeline entity reader instance for reading a single entity with + * the specified input. + */ + public static TimelineEntityReader createSingleEntityReader(String userId, + String clusterId, String flowId, Long flowRunId, String appId, + String entityType, String entityId, EnumSet fieldsToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { + return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { + return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { + return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } else { + // assume we're dealing with a generic entity read + return new GenericEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, entityId, fieldsToRetrieve); + } + } + + /** + * Creates a timeline entity reader instance for reading set of entities with + * the specified input and predicates. + */ + public static TimelineEntityReader createMultipleEntitiesReader(String userId, + String clusterId, String flowId, Long flowRunId, String appId, + String entityType, Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) { + // currently the types that are handled separate from the generic entity + // table are application, flow run, and flow activity entities + if (TimelineEntityType.YARN_APPLICATION.matches(entityType)) { + return new ApplicationEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entityType)) { + return new FlowActivityEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve); + } else if (TimelineEntityType.YARN_FLOW_RUN.matches(entityType)) { + return new FlowRunEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve); + } else { + // assume we're dealing with a generic entity read + return new GenericEntityReader(userId, clusterId, flowId, flowRunId, + appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeEnd, relatesTo, isRelatedTo, + infoFilters, configFilters, metricFilters, eventFilters, + fieldsToRetrieve, false); + } + } +}