diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java index 4554778..cc1a62f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/FlowEntity.java @@ -17,11 +17,11 @@ */ package org.apache.hadoop.yarn.api.records.timelineservice; +import javax.xml.bind.annotation.XmlElement; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import javax.xml.bind.annotation.XmlElement; - @InterfaceAudience.Public @InterfaceStability.Unstable public class FlowEntity extends HierarchicalTimelineEntity { @@ -33,6 +33,10 @@ TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_VERSION"; public static final String FLOW_RUN_ID_INFO_KEY = TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_ID"; + public static final String FLOW_RUN_END_TIME = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_END_TIME"; + public static final String FLOW_RUN_SNAPSHOT_TIME = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_RUN_SNAPSHOT_TIME"; public FlowEntity() { super(TimelineEntityType.YARN_FLOW.toString()); @@ -48,7 +52,7 @@ public FlowEntity(TimelineEntity entity) { @XmlElement(name = "id") @Override public String getId() { - //Flow id schema: user@flow_name(or id)/version/run_id + //Flow id schema: user@flow_name(or id)/run_id String id = super.getId(); if (id == null) { StringBuilder sb = new StringBuilder(); @@ -56,8 +60,6 @@ public String getId() { sb.append('@'); sb.append(getInfo().get(FLOW_NAME_INFO_KEY).toString()); sb.append('/'); - sb.append(getInfo().get(FLOW_VERSION_INFO_KEY).toString()); - sb.append('/'); sb.append(getInfo().get(FLOW_RUN_ID_INFO_KEY).toString()); id = sb.toString(); setId(id); @@ -66,8 +68,7 @@ public String getId() { } public String getUser() { - Object user = getInfo().get(USER_INFO_KEY); - return user == null ? null : user.toString(); + return (String)getInfo().get(USER_INFO_KEY); } public void setUser(String user) { @@ -75,8 +76,7 @@ public void setUser(String user) { } public String getName() { - Object name = getInfo().get(FLOW_NAME_INFO_KEY); - return name == null ? null : name.toString(); + return (String)getInfo().get(FLOW_NAME_INFO_KEY); } public void setName(String name) { @@ -84,8 +84,7 @@ public void setName(String name) { } public String getVersion() { - Object version = getInfo().get(FLOW_VERSION_INFO_KEY); - return version == null ? null : version.toString(); + return (String)getInfo().get(FLOW_VERSION_INFO_KEY); } public void setVersion(String version) { @@ -100,4 +99,30 @@ public long getRunId() { public void setRunId(long runId) { addInfo(FLOW_RUN_ID_INFO_KEY, runId); } + + public long getStartTime() { + return getCreatedTime(); + } + + public void setStartTime(long startTime) { + setCreatedTime(startTime); + } + + public long getLatestAppEndTime() { + Object time = getInfo().get(FLOW_RUN_END_TIME); + return time == null ? 0L : (Long)time; + } + + public void setLatestAppEndTime(long endTime) { + addInfo(FLOW_RUN_END_TIME, endTime); + } + + public long getSnapshotTime() { + Object time = getInfo().get(FLOW_RUN_SNAPSHOT_TIME); + return time == null ? 0L : (Long)time; + } + + public void setSnapshotTime(long snapshotTime) { + addInfo(FLOW_RUN_SNAPSHOT_TIME, snapshotTime); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java index 6062fe1..031ee9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java @@ -29,8 +29,12 @@ YARN_APPLICATION_ATTEMPT, YARN_CONTAINER, YARN_USER, - YARN_QUEUE; + YARN_QUEUE, + YARN_FLOW_ACTIVITY; + /** + * Whether the input type can be a parent of this entity. + */ public boolean isParent(TimelineEntityType type) { switch (this) { case YARN_CLUSTER: @@ -50,6 +54,9 @@ public boolean isParent(TimelineEntityType type) { } } + /** + * Whether the input type can be a child of this entity. + */ public boolean isChild(TimelineEntityType type) { switch (this) { case YARN_CLUSTER: @@ -68,4 +75,12 @@ public boolean isChild(TimelineEntityType type) { return false; } } + + /** + * Whether the type of this entity matches the type indicated by the input + * argument + */ + public boolean matches(String typeString) { + return toString().equals(typeString); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/api/FlowActivityEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/api/FlowActivityEntity.java new file mode 100644 index 0000000..56cc96d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/api/FlowActivityEntity.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.api; + +import java.util.Collection; +import java.util.Date; +import java.util.NavigableSet; +import java.util.TreeSet; + +import javax.xml.bind.annotation.XmlElement; + +import org.apache.hadoop.yarn.api.records.timelineservice.FlowEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; + +/** + * Entity that represents a record for flow activity. It's essentially a + * container entity for flow runs with limited information. + */ +public class FlowActivityEntity extends TimelineEntity { + public static final String CLUSTER_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "CLUSTER"; + public static final String DATE_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "DATE"; + public static final String USER_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "USER"; + public static final String FLOW_NAME_INFO_KEY = + TimelineEntity.SYSTEM_INFO_KEY_PREFIX + "FLOW_NAME"; + + private final NavigableSet flowRuns = new TreeSet<>(); + + public FlowActivityEntity() { + super(TimelineEntityType.YARN_FLOW_ACTIVITY.toString()); + } + + public FlowActivityEntity(String cluster, long time, String user, + String flowName) { + super(); + setCluster(cluster); + setDate(time); + setUser(user); + setFlowName(flowName); + } + + public FlowActivityEntity(TimelineEntity entity) { + super(entity); + if (!TimelineEntityType.YARN_FLOW_ACTIVITY.matches(entity.getType())) { + throw new IllegalArgumentException("Incompatible entity type: " + + getId()); + } + } + + @XmlElement(name = "id") + @Override + public String getId() { + // flow activity: cluster/day/user@flow_name + String id = super.getId(); + if (id == null) { + StringBuilder sb = new StringBuilder(); + sb.append(getCluster()); + sb.append('/'); + sb.append(getDate().toString()); + sb.append('/'); + sb.append(getUser()); + sb.append('@'); + sb.append(getFlowName()); + id = sb.toString(); + setId(id); + } + return id; + } + + public String getCluster() { + return (String)getInfo().get(CLUSTER_INFO_KEY); + } + + public void setCluster(String cluster) { + addInfo(CLUSTER_INFO_KEY, cluster); + } + + public Date getDate() { + return (Date)getInfo().get(DATE_INFO_KEY); + } + + public void setDate(long time) { + Date date = new Date(time); + addInfo(DATE_INFO_KEY, date); + } + + public String getUser() { + return (String)getInfo().get(USER_INFO_KEY); + } + + public void setUser(String user) { + addInfo(USER_INFO_KEY, user); + } + + public String getFlowName() { + return (String)getInfo().get(FLOW_NAME_INFO_KEY); + } + + public void setFlowName(String flowName) { + addInfo(FLOW_NAME_INFO_KEY, flowName); + } + + public void addFlowRun(FlowEntity run) { + flowRuns.add(run); + } + + public void addFlowRuns(Collection runs) { + flowRuns.addAll(runs); + } + + public NavigableSet getFlowRuns() { + return flowRuns; + } + + public int getNumberOfRuns() { + return flowRuns.size(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/api/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/api/package-info.java new file mode 100644 index 0000000..8b8d93b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/api/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.api; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index c514c20..6c3004d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -40,10 +40,12 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.api.FlowActivityEntity; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; @@ -59,6 +61,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; import com.google.common.base.Preconditions; @@ -75,6 +80,7 @@ private EntityTable entityTable; private AppToFlowTable appToFlowTable; private ApplicationTable applicationTable; + private FlowActivityTable flowActivityTable; public HBaseTimelineReaderImpl() { super(HBaseTimelineReaderImpl.class.getName()); @@ -88,6 +94,7 @@ public void serviceInit(Configuration conf) throws Exception { entityTable = new EntityTable(); appToFlowTable = new AppToFlowTable(); applicationTable = new ApplicationTable(); + flowActivityTable = new FlowActivityTable(); } @Override @@ -132,7 +139,7 @@ public TimelineEntity getEntity(String userId, String clusterId, } private static boolean isApplicationEntity(String entityType) { - return TimelineEntityType.YARN_APPLICATION.toString().equals(entityType); + return TimelineEntityType.YARN_APPLICATION.matches(entityType); } @Override @@ -144,6 +151,14 @@ private static boolean isApplicationEntity(String entityType) { Map infoFilters, Map configFilters, Set metricFilters, Set eventFilters, EnumSet fieldsToRetrieve) throws IOException { + if (isFlowActivityEntity(entityType)) { + validateParamsForFlowActivityQuery(clusterId); + if (limit == null) { + limit = TimelineReader.DEFAULT_LIMIT; + } + return getFlowActivityEntities(clusterId, limit); + } + validateParams(userId, clusterId, appId, entityType, null, false); // In reality both should be null or neither should be null if (flowId == null || flowRunId == null) { @@ -501,4 +516,96 @@ private static void readMetrics(TimelineEntity entity, Result result, entity.addMetric(metric); } } + + private Set getFlowActivityEntities(String clusterId, + long limit) throws IOException { + // limit the size of the scan to the specified limit and rely on the sorted + // rows + NavigableSet entities = new TreeSet<>(); + Scan scan = new Scan(); + scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); + scan.setMaxResultSize(limit); + ResultScanner scanner = + flowActivityTable.getResultScanner(hbaseConf, conn, scan); + for (Result result : scanner) { + Object[] tokens = FlowActivityRowKey.parseRowKey(result.getRow()); + + long time = (Long)tokens[1]; + String user = (String)tokens[2]; + String flowName = (String)tokens[3]; + TimelineEntity entity = + readFlowActivityEntity(result, clusterId, time, user, flowName); + if (entity == null) { + continue; + } + entities.add(entity); + } + return entities; + } + + private static boolean isFlowActivityEntity(String type) { + return TimelineEntityType.YARN_FLOW_ACTIVITY.matches(type); + } + + private static void validateParamsForFlowActivityQuery(String clusterId) { + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + } + + private static TimelineEntity readFlowActivityEntity(Result result, + String cluster, long time, String user, String flowName) + throws IOException { + // review if this is the most efficient way of doing it + Map flowEntityMap = new HashMap<>(); + // get all the run ids + Map runIdsMap = + FlowActivityColumnPrefix.RUN_ID.readResults(result); + for (String str : runIdsMap.keySet()) { + // TODO check if the run id is correctly translated + Long runId = Long.valueOf(str); + FlowEntity flowRun = new FlowEntity(); + flowRun.setUser(user); + flowRun.setName(flowName); + flowRun.setRunId(runId); + flowEntityMap.put(runId, flowRun); + } + // process the start time + Map startTimeMap = + FlowActivityColumnPrefix.START_TIME.readResults(result); + for (Map.Entry e : startTimeMap.entrySet()) { + Long runId = Long.valueOf(e.getKey()); + FlowEntity flowRun = flowEntityMap.get(runId); + if (flowRun != null) { + Long startTime = (Long)e.getValue(); + flowRun.setStartTime(startTime); + } + } + // process the end time + Map endTimeMap = + FlowActivityColumnPrefix.END_TIME.readResults(result); + for (Map.Entry e : endTimeMap.entrySet()) { + Long runId = Long.valueOf(e.getKey()); + FlowEntity flowRun = flowEntityMap.get(runId); + if (flowRun != null) { + Long endTime = (Long)e.getValue(); + flowRun.setLatestAppEndTime(endTime); + } + } + // process the snapshot time + Map snapshotTimeMap = + FlowActivityColumnPrefix.IN_PROGRESS_TIME.readResults(result); + for (Map.Entry e : snapshotTimeMap.entrySet()) { + Long runId = Long.valueOf(e.getKey()); + FlowEntity flowRun = flowEntityMap.get(runId); + if (flowRun != null) { + Long snapshotTime = (Long)e.getValue(); + flowRun.setSnapshotTime(snapshotTime); + } + } + + // create the flow activity entity and return it + FlowActivityEntity flowActivity = + new FlowActivityEntity(cluster, time, user, flowName); + flowActivity.addFlowRuns(flowEntityMap.values()); + return flowActivity; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index 2a09f41..fb95496 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -27,6 +27,10 @@ public class FlowActivityRowKey { // TODO: more methods are needed for this class. + public static byte[] getRowKeyPrefix(String clusterId) { + return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, "")); + } + // TODO: API needs to be cleaned up. /** @@ -62,4 +66,29 @@ Bytes.toBytes(Separator.QUALIFIERS.encode(flowId))); } + /** + * Given the raw row key as bytes, returns tokens of the row key as objects. + * It consists of the cluster id (String), day timestamp (long), the user id + * (String), and the flow id (String) in that order. + */ + public static Object[] parseRowKey(byte[] rowKey) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + + if (rowKeyComponents.length < 4) { + // TODO better response? + throw new IllegalArgumentException("the row key is not valid for " + + "a flow activity"); + } + + Object[] tokens = new Object[4]; + tokens[0] = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[0])); + tokens[1] = + TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1])); + tokens[2] = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[2])); + tokens[3] = + Separator.QUALIFIERS.decode(Bytes.toString(rowKeyComponents[3])); + return tokens; + } }