diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java index 81ce54c..6397a0e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java @@ -100,7 +100,7 @@ void publishApplicationCreatedEvent(ApplicationCreatedEvent event) { tEvent.setTimestamp(event.getTimestamp()); entity.addEvent(tEvent); - putEntity(entity, event.getApplicationId()); + putEntity(entity, event.getApplicationId(), true); } @Override @@ -264,6 +264,11 @@ private static ContainerEntity createContainerEntity(ContainerId containerId) { } private void putEntity(TimelineEntity entity, ApplicationId appId) { + putEntity(entity, appId, false); + } + + private void putEntity(TimelineEntity entity, ApplicationId appId, + boolean newApp) { try { if (LOG.isDebugEnabled()) { LOG.debug("Publishing the entity " + entity + ", JSON-style content: " @@ -273,8 +278,10 @@ private void putEntity(TimelineEntity entity, ApplicationId appId) { rmTimelineCollectorManager.get(appId); TimelineEntities entities = new TimelineEntities(); entities.addEntity(entity); - timelineCollector.putEntities(entities, + timelineCollector.putEntities(entities, newApp, UserGroupInformation.getCurrentUser()); + LOG.info("Put entities:"); + LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entities, true)); } catch (Exception e) { LOG.error("Error when publishing entity " + entity, e); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index bb7db12..a6bb10d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -68,6 +68,11 @@ protected void setWriter(TimelineWriter w) { this.writer = w; } + public TimelineWriteResponse putEntities(TimelineEntities entities, + UserGroupInformation callerUgi) throws IOException { + return putEntities(entities, false, callerUgi); + } + /** * Handles entity writes. These writes are synchronous and are written to the * backing storage without buffering/batching. If any entity already exists, @@ -75,24 +80,30 @@ protected void setWriter(TimelineWriter w) { * * This method should be reserved for selected critical entities and events. * For normal voluminous writes one should use the async method - * {@link #putEntitiesAsync(TimelineEntities, UserGroupInformation)}. + * {@link #putEntitiesAsync(TimelineEntities, boolean, UserGroupInformation)}. * * @param entities entities to post + * @param newApp the flag indicate the start of a new app * @param callerUgi the caller UGI * @return the response that contains the result of the post. */ public TimelineWriteResponse putEntities(TimelineEntities entities, - UserGroupInformation callerUgi) throws IOException { + boolean newApp, UserGroupInformation callerUgi) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE"); - LOG.debug("putEntities(entities=" + entities + ", callerUgi=" - + callerUgi + ")"); + LOG.debug("putEntities(entities=" + entities + ", newApp =" + newApp + + ", callerUgi=" + callerUgi + ")"); } TimelineCollectorContext context = getTimelineEntityContext(); return writer.write(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowVersion(), context.getFlowRunId(), - context.getAppId(), entities); + context.getAppId(), newApp, entities); + } + + public void putEntitiesAsync(TimelineEntities entities, + UserGroupInformation callerUgi) { + putEntitiesAsync(entities, false, callerUgi); } /** @@ -104,14 +115,15 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, * storage. * * @param entities entities to post + * @param newApp the flag indicate the start of a new app * @param callerUgi the caller UGI */ public void putEntitiesAsync(TimelineEntities entities, - UserGroupInformation callerUgi) { + boolean newApp, UserGroupInformation callerUgi) { // TODO implement if (LOG.isDebugEnabled()) { - LOG.debug("putEntitiesAsync(entities=" + entities + ", callerUgi=" + - callerUgi + ")"); + LOG.debug("putEntitiesAsync(entities=" + entities + ", newApp =" + + newApp +", callerUgi=" + callerUgi + ")"); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java index 5573185..69ca080 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java @@ -20,9 +20,17 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + @Private @Unstable public class TimelineReaderManager extends AbstractService { @@ -33,4 +41,45 @@ public TimelineReaderManager(TimelineReader timelineReader) { super(TimelineReaderManager.class.getName()); this.reader = timelineReader; } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + reader.init(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + public TimelineEntity getEntity(String userId, String clusterId, String flowId, + Long flowRunId, String appId, String entityType, String entityId, + EnumSet fieldsToRetrieve, + UserGroupInformation ugi) throws IOException { + // TODO: apply UGI + return reader.getEntity(userId, clusterId, flowId, flowRunId, appId, + entityType, entityId, fieldsToRetrieve); + } + + public Set getEntities(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve, + UserGroupInformation ugi) throws IOException { + // TODO: apply UGI + return reader.getEntities(userId, clusterId, flowId, flowRunId, appId, + entityType, limit, createdTimeBegin, createdTimeEnd, modifiedTimeBegin, + modifiedTimeEnd, relatesTo, isRelatedTo, infoFilters, configFilters, + metricFilters, eventFilters, fieldsToRetrieve); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java index 874112c..319cfb0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java @@ -54,7 +54,7 @@ public class TimelineReaderServer extends CompositeService { private static final Log LOG = LogFactory.getLog(TimelineReaderServer.class); private static final int SHUTDOWN_HOOK_PRIORITY = 30; - private static final String TIMELINE_READER_MANAGER_ATTR = + static final String TIMELINE_READER_MANAGER_ATTR = "timeline.reader.manager"; private HttpServer2 readerWebServer; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java index 3655a72..072eb3c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderWebServices.java @@ -18,20 +18,40 @@ package org.apache.hadoop.yarn.server.timelineservice.reader; +import javax.servlet.ServletContext; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.GET; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; +import com.google.common.base.Splitter; +import com.google.inject.Inject; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.inject.Singleton; +import org.apache.hadoop.yarn.webapp.NotFoundException; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; /** REST end point for Timeline Reader */ @@ -41,19 +61,220 @@ @Path("/ws/v2/timeline") public class TimelineReaderWebServices { - private void init(HttpServletResponse response) { + + private static final Log LOG = + LogFactory.getLog(TimelineReaderWebServices.class); + + private static final Splitter COMMA_SPLITTER = + Splitter.on(",").omitEmptyStrings().trimResults(); + private static final Splitter COLON_SPLITTER = + Splitter.on(":").omitEmptyStrings().trimResults(); + + private static void init(HttpServletResponse response) { response.setContentType(null); } + private static UserGroupInformation getUser(HttpServletRequest req) { + String remoteUser = req.getRemoteUser(); + UserGroupInformation callerUGI = null; + if (remoteUser != null) { + callerUGI = UserGroupInformation.createRemoteUser(remoteUser); + } + return callerUGI; + } + + @Context + private ServletContext context; + + + @Inject + public TimelineReaderWebServices() { + } /** * Return the description of the timeline reader web services. */ @GET - @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + @Produces(MediaType.APPLICATION_JSON) public TimelineAbout about( @Context HttpServletRequest req, @Context HttpServletResponse res) { init(res); return TimelineUtils.createTimelineAbout("Timeline Reader API"); } + + @GET + @Path("/entities/{clusterId}/{appId}/{entityType}/{entityId}") + @Produces(MediaType.APPLICATION_JSON) + public TimelineEntity getEntity( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("clusterId") String clusterId, + @PathParam("appId") String appId, + @PathParam("entityType") String entityType, + @PathParam("entityId") String entityId, + @QueryParam("userId") String userId, + @QueryParam("flowId") String flowId, + @QueryParam("flowRunId") Long flowRunId, + @QueryParam("fields") String fields) throws IOException { + init(res); + UserGroupInformation callerUGI = getUser(req); + TimelineEntity entity = getTimelineReaderManager().getEntity( + callerUGI != null && (userId == null || userId.isEmpty()) ? + callerUGI.getUserName() : userId, + clusterId, flowId, flowRunId, appId, entityType, entityId, + parseFieldsStr(fields), callerUGI); + if (entity == null) { + throw new NotFoundException("Entity is not found for userId=" + userId + + ", clusterId=" + clusterId + ", appId=" + appId + ", entityType=" + + entityType + ", entityId=" + entityId); + } else { + return entity; + } + } + + @GET + @Path("/entities/{clusterId}/{appId}/{entityType}") + @Produces(MediaType.APPLICATION_JSON) + public Set getEntities( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("clusterId") String clusterId, + @PathParam("appId") String appId, + @PathParam("entityType") String entityType, + @QueryParam("userId") String userId, + @QueryParam("flowId") String flowId, + @QueryParam("flowRunId") Long flowRunId, + @QueryParam("limit") Long limit, + @QueryParam("createdTimeBegin") Long createdTimeBegin, + @QueryParam("createdTimeEnd") Long createdTimeEnd, + @QueryParam("modifiedTimeBegin") Long modifiedTimeBegin, + @QueryParam("modifiedTimeEnd") Long modifiedTimeEnd, + @QueryParam("relatesTo") String relatesTo, + @QueryParam("isRelatedTo") String isRelatedTo, + @QueryParam("infoFilters") String infoFilters, + @QueryParam("configFilters") String configFilters, + @QueryParam("metricFilters") String metricFilters, + @QueryParam("eventFilters") String eventFilters, + @QueryParam("fields") String fields) throws IOException { + init(res); + UserGroupInformation callerUGI = getUser(req); + return getTimelineReaderManager().getEntities( + callerUGI != null && (userId == null || userId.isEmpty()) ? + callerUGI.getUserName() : userId, clusterId, flowId, + flowRunId, appId, entityType, limit, createdTimeBegin, createdTimeEnd, + modifiedTimeBegin, modifiedTimeBegin, parseKeyStrValuesStr(relatesTo), + parseKeyStrValuesStr(isRelatedTo), parseKeyObjValueStr(infoFilters), + parseKeyStrValueStr(configFilters), parseValuesStr(metricFilters), + parseValuesStr(eventFilters), parseFieldsStr(fields), callerUGI); + } + + private static EnumSet parseFieldsStr(String str) { + EnumSet enums = EnumSet.noneOf(Field.class); + if (str == null) { + return enums; + } + Iterator fieldStrs = COMMA_SPLITTER.split(str).iterator(); + while (fieldStrs.hasNext()) { + enums.add(Field.valueOf(fieldStrs.next())); + } + return enums; + } + + private static Map parseKeyStrValueStr(String str) { + Map map = new HashMap<>(); + if (str == null) { + return map; + } + Iterator pairs = COMMA_SPLITTER.split(str).iterator(); + while (pairs.hasNext()) { + String pair = pairs.next(); + Iterator tokens = COLON_SPLITTER.split(pair).iterator(); + map.put(tokens.next(), tokens.next()); + } + return map; + } + + private static Map parseKeyObjValueStr(String str) { + Map map = new HashMap<>(); + if (str == null) { + return map; + } + Iterator pairs = COMMA_SPLITTER.split(str).iterator(); + while (pairs.hasNext()) { + String pair = pairs.next(); + Iterator tokens = COLON_SPLITTER.split(pair).iterator(); + String key = tokens.next(); + String value = tokens.next(); + try { + map.put(key, GenericObjectMapper.OBJECT_READER.readValue(value)); + } catch (IOException e) { + map.put(key, value); + } + } + return map; + } + + private static Map> parseKeyStrValuesStr(String str) { + Map> map = new HashMap<>(); + if (str == null) { + return map; + } + Iterator pairs = COMMA_SPLITTER.split(str).iterator(); + while (pairs.hasNext()) { + String pair = pairs.next(); + Iterator tokens = COLON_SPLITTER.split(pair).iterator(); + String key = tokens.next(); + String value = tokens.next(); + Set values = map.get(key); + if (values == null) { + values = new HashSet<>(); + map.put(key, values); + } + values.add(value); + } + return map; + } + + private static Map> parseKeyObjValuesStr(String str) { + Map> map = new HashMap<>(); + if (str == null) { + return map; + } + Iterator pairs = COMMA_SPLITTER.split(str).iterator(); + while (pairs.hasNext()) { + String pair = pairs.next(); + Iterator tokens = COLON_SPLITTER.split(pair).iterator(); + String key = tokens.next(); + String value = tokens.next(); + Set values = map.get(key); + if (values == null) { + values = new HashSet<>(); + map.put(key, values); + } + values.add(value); + try { + values.add(GenericObjectMapper.OBJECT_READER.readValue(value)); + } catch (IOException e) { + values.add(value); + } + } + return map; + } + + private static Set parseValuesStr(String str) { + Set set = new HashSet<>(); + if (str == null) { + return set; + } + Iterator values = COMMA_SPLITTER.split(str).iterator(); + while (values.hasNext()) { + set.add(values.next()); + } + return set; + } + + private TimelineReaderManager getTimelineReaderManager() { + return (TimelineReaderManager) context.getAttribute( + TimelineReaderServer.TIMELINE_READER_MANAGER_ATTR); + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index f9f1d1d..f6b657f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; @@ -153,27 +154,6 @@ private static void fillFields(TimelineEntity finalEntity, } } - private static boolean matchFilter(Object infoValue, Object filterValue) { - return infoValue.equals(filterValue); - } - - private static boolean matchFilters(Map entityInfo, - Map filters) { - if (entityInfo == null || entityInfo.isEmpty()) { - return false; - } - for (Map.Entry filter : filters.entrySet()) { - Object infoValue = entityInfo.get(filter.getKey()); - if (infoValue == null) { - return false; - } - if (!matchFilter(infoValue, filter.getValue())) { - return false; - } - } - return true; - } - private String getFlowRunPath(String userId, String clusterId, String flowId, Long flowRunId, String appId) throws IOException { @@ -207,36 +187,6 @@ private String getFlowRunPath(String userId, String clusterId, String flowId, throw new IOException("Unable to get flow info"); } - private static boolean matchMetricFilters(Set metrics, - Set metricFilters) { - Set tempMetrics = new HashSet(); - for (TimelineMetric metric : metrics) { - tempMetrics.add(metric.getId()); - } - - for (String metricFilter : metricFilters) { - if (!tempMetrics.contains(metricFilter)) { - return false; - } - } - return true; - } - - private static boolean matchEventFilters(Set entityEvents, - Set eventFilters) { - Set tempEvents = new HashSet(); - for (TimelineEvent event : entityEvents) { - tempEvents.add(event.getId()); - } - - for (String eventFilter : eventFilters) { - if (!tempEvents.contains(eventFilter)) { - return false; - } - } - return true; - } - private static TimelineEntity createEntityToBeReturned(TimelineEntity entity, EnumSet fieldsToRetrieve) { TimelineEntity entityToBeReturned = new TimelineEntity(); @@ -254,23 +204,6 @@ private static boolean isTimeInRange(Long time, Long timeBegin, return (time >= timeBegin) && (time <= timeEnd); } - private static boolean matchRelations( - Map> entityRelations, - Map> relations) { - for (Map.Entry> relation : relations.entrySet()) { - Set ids = entityRelations.get(relation.getKey()); - if (ids == null) { - return false; - } - for (String id : relation.getValue()) { - if (!ids.contains(id)) { - return false; - } - } - } - return true; - } - private static void mergeEntities(TimelineEntity entity1, TimelineEntity entity2) { // Ideally created time wont change except in the case of issue from client. @@ -393,27 +326,32 @@ public int compare(Long l1, Long l2) { continue; } if (relatesTo != null && !relatesTo.isEmpty() && - !matchRelations(entity.getRelatesToEntities(), relatesTo)) { + !TimelineReaderUtils + .matchRelations(entity.getRelatesToEntities(), relatesTo)) { continue; } if (isRelatedTo != null && !isRelatedTo.isEmpty() && - !matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) { + !TimelineReaderUtils + .matchRelations(entity.getIsRelatedToEntities(), isRelatedTo)) { continue; } if (infoFilters != null && !infoFilters.isEmpty() && - !matchFilters(entity.getInfo(), infoFilters)) { + !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { continue; } if (configFilters != null && !configFilters.isEmpty() && - !matchFilters(entity.getConfigs(), configFilters)) { + !TimelineReaderUtils.matchFilters( + entity.getConfigs(), configFilters)) { continue; } if (metricFilters != null && !metricFilters.isEmpty() && - !matchMetricFilters(entity.getMetrics(), metricFilters)) { + !TimelineReaderUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { continue; } if (eventFilters != null && !eventFilters.isEmpty() && - !matchEventFilters(entity.getEvents(), eventFilters)) { + !TimelineReaderUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { continue; } TimelineEntity entityToBeReturned = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 2fff98d..129ae77 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -67,7 +67,7 @@ @Override public TimelineWriteResponse write(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities entities) throws IOException { + boolean newApp, TimelineEntities entities) throws IOException { TimelineWriteResponse response = new TimelineWriteResponse(); for (TimelineEntity entity : entities.getEntities()) { write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java new file mode 100644 index 0000000..ec356b0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -0,0 +1,381 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timelineservice.storage.app2flow.App2FlowColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.app2flow.App2FlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.app2flow.App2FlowTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineReaderUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; + +public class HBaseTimelineReaderImpl + extends AbstractService implements TimelineReader { + + private static final Log LOG = LogFactory + .getLog(HBaseTimelineReaderImpl.class); + + private Configuration hbaseConf = null; + private Connection conn; + private EntityTable entityTable; + private App2FlowTable app2FlowTable; + + public HBaseTimelineReaderImpl() { + super(HBaseTimelineReaderImpl.class.getName()); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + hbaseConf = HBaseConfiguration.create(conf); + conn = ConnectionFactory.createConnection(hbaseConf); + entityTable = new EntityTable(); + app2FlowTable = new App2FlowTable(); + } + + @Override + protected void serviceStop() throws Exception { + if (conn != null) { + LOG.info("closing the hbase Connection"); + conn.close(); + } + super.serviceStop(); + } + + @Override + public TimelineEntity getEntity(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + String entityId, EnumSet fieldsToRetrieve) + throws IOException { + validateParams(userId, clusterId, appId, entityType, entityId, true); + // In reality both should be null or neither should be null + if (flowId == null || flowRunId == null) { + FlowContext context = lookupFlowContext(clusterId, appId); + flowId = context.flowId; + flowRunId = context.flowRunId; + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + + byte[] rowKey = EntityRowKey.getRowKey( + clusterId, userId, flowId, flowRunId, appId, entityType, entityId); + Get get = new Get(rowKey); + return getEntity( + entityTable.getResult(hbaseConf, conn, get), fieldsToRetrieve, + false, 0L, 0L, false, 0L, 0L, null, null, null, null, null, null); + } + + @Override + public Set getEntities(String userId, String clusterId, + String flowId, Long flowRunId, String appId, String entityType, + Long limit, Long createdTimeBegin, Long createdTimeEnd, + Long modifiedTimeBegin, Long modifiedTimeEnd, + Map> relatesTo, Map> isRelatedTo, + Map infoFilters, Map configFilters, + Set metricFilters, Set eventFilters, + EnumSet fieldsToRetrieve) throws IOException { + validateParams(userId, clusterId, appId, entityType, null, false); + // In reality both should be null or neither should be null + if (flowId == null || flowRunId == null) { + FlowContext context = lookupFlowContext(clusterId, appId); + flowId = context.flowId; + flowRunId = context.flowRunId; + } + if (limit == null) { + limit = TimelineReader.DEFAULT_LIMIT; + } + if (createdTimeBegin == null) { + createdTimeBegin = 0L; + } + if (createdTimeEnd == null) { + createdTimeEnd = Long.MAX_VALUE; + } + if (modifiedTimeBegin == null) { + modifiedTimeBegin = 0L; + } + if (modifiedTimeEnd == null) { + modifiedTimeEnd = Long.MAX_VALUE; + } + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.noneOf(Field.class); + } + + NavigableSet entities = new TreeSet<>(); + // Scan through part of the table to find the entities belong to one app and + // one type + Scan scan = new Scan(); + scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( + userId, clusterId, flowId, flowRunId, appId, entityType)); + scan.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = entityTable.getResultScanner(hbaseConf, conn, scan); + for (Result result : scanner) { + TimelineEntity entity = getEntity(result, fieldsToRetrieve, + true, createdTimeBegin, createdTimeEnd, + true, modifiedTimeBegin, modifiedTimeEnd, + isRelatedTo, relatesTo, infoFilters, configFilters, eventFilters, + metricFilters); + if (entity == null) { + continue; + } + if (entities.size() > limit) { + entities.pollLast(); + } + entities.add(entity); + } + return entities; + } + + private FlowContext lookupFlowContext(String clusterId, String appId) + throws IOException { + byte[] rowKey = App2FlowRowKey.getRowKey(clusterId, appId); + Get get = new Get(rowKey); + Result result = app2FlowTable.getResult(hbaseConf, conn, get); + if (result != null) { + return new FlowContext( + App2FlowColumn.FLOW_ID.readResult(result).toString(), + ((Number) App2FlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); + } else { + throw new IOException( + "Unable to find the context flow ID and flow run ID for clusterId=" + + clusterId + ", appId=" + appId); + } + } + + private static class FlowContext { + private String flowId; + private Long flowRunId; + public FlowContext(String flowId, Long flowRunId) { + this.flowId = flowId; + this.flowRunId = flowRunId; + } + } + + private static void validateParams(String userId, String clusterId, + String appId, String entityType, String entityId, boolean checkEntityId) { + Preconditions.checkNotNull(userId, "userId shouldn't be null"); + Preconditions.checkNotNull(clusterId, "clusterId shouldn't be null"); + Preconditions.checkNotNull(appId, "appId shouldn't be null"); + Preconditions.checkNotNull(entityType, "entityType shouldn't be null"); + if (checkEntityId) { + Preconditions.checkNotNull(entityId, "entityId shouldn't be null"); + } + } + + private static TimelineEntity getEntity( + Result result, EnumSet fieldsToRetrieve, + boolean checkCreatedTime, long createdTimeBegin, long createdTimeEnd, + boolean checkModifiedTime, long modifiedTimeBegin, long modifiedTimeEnd, + Map> isRelatedTo, Map> relatesTo, + Map infoFilters, Map configFilters, + Set eventFilters, Set metricFilters) + throws IOException { + if (result == null || result.isEmpty()) { + return null; + } + TimelineEntity entity = new TimelineEntity(); + entity.setType(EntityColumn.TYPE.readResult(result).toString()); + entity.setId(EntityColumn.ID.readResult(result).toString()); + + // fetch created time + entity.setCreatedTime( + ((Number) EntityColumn.CREATED_TIME.readResult(result)).longValue()); + if (checkCreatedTime && (entity.getCreatedTime() < createdTimeBegin || + entity.getCreatedTime() > createdTimeEnd)) { + return null; + } + + // fetch modified time + entity.setCreatedTime( + ((Number) EntityColumn.MODIFIED_TIME.readResult(result)).longValue()); + if (checkModifiedTime && (entity.getModifiedTime() < modifiedTimeBegin || + entity.getModifiedTime() > modifiedTimeEnd)) { + return null; + } + + // fetch is related to entities + boolean checkIsRelatedTo = isRelatedTo != null && isRelatedTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.IS_RELATED_TO) || checkIsRelatedTo) { + readConnections(entity, result, EntityColumnPrefix.IS_RELATED_TO); + if (checkIsRelatedTo && !TimelineReaderUtils.matchRelations( + entity.getIsRelatedToEntities(), isRelatedTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.IS_RELATED_TO)) { + entity.getIsRelatedToEntities().clear(); + } + } + + // fetch relates to entities + boolean checkRelatesTo = relatesTo != null && relatesTo.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.RELATES_TO) || checkRelatesTo) { + readConnections(entity, result, EntityColumnPrefix.RELATES_TO); + if (checkRelatesTo && !TimelineReaderUtils.matchRelations( + entity.getRelatesToEntities(), relatesTo)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.RELATES_TO)) { + entity.getRelatesToEntities().clear(); + } + } + + // fetch info + boolean checkInfo = infoFilters != null && infoFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.INFO) || checkInfo) { + // TODO + if (checkInfo && + !TimelineReaderUtils.matchFilters(entity.getInfo(), infoFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.INFO)) { + entity.getInfo().clear(); + } + } + + // fetch configs + boolean checkConfigs = configFilters != null && configFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.CONFIGS) || checkConfigs) { + readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG); + if (checkConfigs && !TimelineReaderUtils.matchFilters( + entity.getConfigs(), configFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.CONFIGS)) { + entity.getConfigs().clear(); + } + } + + // fetch events + boolean checkEvents = eventFilters != null && eventFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.EVENTS) || checkEvents) { + readEvents(entity, result); + if (checkEvents && !TimelineReaderUtils.matchEventFilters( + entity.getEvents(), eventFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.EVENTS)) { + entity.getEvents().clear(); + } + } + + // fetch metrics + boolean checkMetrics = metricFilters != null && metricFilters.size() > 0; + if (fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(Field.METRICS) || checkMetrics) { + readMetrics(entity, result); + if (checkMetrics && !TimelineReaderUtils.matchMetricFilters( + entity.getMetrics(), metricFilters)) { + return null; + } + if (!fieldsToRetrieve.contains(Field.ALL) && + !fieldsToRetrieve.contains(Field.METRICS)) { + entity.getMetrics().clear(); + } + } + return entity; + } + + private static void readConnections( + TimelineEntity entity, Result result, EntityColumnPrefix prefix) + throws IOException { + // isRelatedTo and relatesTo are of type Map> + Map columns = prefix.readResults(result); + for (Map.Entry column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded( + column.getValue().toString())) { + if (prefix.equals(EntityColumnPrefix.IS_RELATED_TO)) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + private static void readKeyValuePairs( + TimelineEntity entity, Result result, EntityColumnPrefix prefix) + throws IOException { + // info and configuration are of type Map + Map columns = prefix.readResults(result); + if (prefix.equals(EntityColumnPrefix.CONFIG)) { + for (Map.Entry column : columns.entrySet()) { + entity.addConfig(column.getKey(), column.getKey().toString()); + } + } else { + entity.addInfo(columns); + } + } + + private static void readEvents(TimelineEntity entity, Result result) + throws IOException { + // TODO + } + + private static void readMetrics(TimelineEntity entity, Result result) + throws IOException { + NavigableMap> metricsResult = + EntityColumnPrefix.METRIC.readTimeseriesResults(result); + for (Map.Entry> metricResult: + metricsResult.entrySet()) { + TimelineMetric metric = new TimelineMetric(); + metric.setId(metricResult.getKey()); + // Simply assume that if the value set contains more than 1 elements, the + // metric is a TIME_SERIES metric, otherwise, it's a SINGLE_VALUE metric + metric.setType(metricResult.getValue().size() > 1 ? + TimelineMetric.Type.TIME_SERIES : TimelineMetric.Type.SINGLE_VALUE); + metric.addValues(metricResult.getValue()); + entity.addMetric(metric); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index e48ca60..7d908d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -36,6 +36,9 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.server.timelineservice.storage.app2flow.App2FlowColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.app2flow.App2FlowRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.app2flow.App2FlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; @@ -54,6 +57,7 @@ private Connection conn; private TypedBufferedMutator entityTable; + private TypedBufferedMutator app2FlowTable; private static final Log LOG = LogFactory .getLog(HBaseTimelineWriterImpl.class); @@ -76,6 +80,7 @@ protected void serviceInit(Configuration conf) throws Exception { Configuration hbaseConf = HBaseConfiguration.create(conf); conn = ConnectionFactory.createConnection(hbaseConf); entityTable = new EntityTable().getTableMutator(hbaseConf, conn); + app2FlowTable = new App2FlowTable().getTableMutator(hbaseConf, conn); } /** @@ -84,7 +89,7 @@ protected void serviceInit(Configuration conf) throws Exception { @Override public TimelineWriteResponse write(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities data) throws IOException { + boolean newApp,TimelineEntities data) throws IOException { TimelineWriteResponse putStatus = new TimelineWriteResponse(); for (TimelineEntity te : data.getEntities()) { @@ -96,7 +101,7 @@ public TimelineWriteResponse write(String clusterId, String userId, byte[] rowKey = EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId, - te); + te.getType(), te.getId()); storeInfo(rowKey, te, flowVersion); storeEvents(rowKey, te.getEvents()); @@ -106,8 +111,18 @@ public TimelineWriteResponse write(String clusterId, String userId, EntityColumnPrefix.IS_RELATED_TO); storeRelations(rowKey, te.getRelatesToEntities(), EntityColumnPrefix.RELATES_TO); + + LOG.info(te.getIdentifier() + " is written"); } + + if (newApp) { + byte[] rowKey = App2FlowRowKey.getRowKey(clusterId, appId); + App2FlowColumn.FLOW_ID.store(rowKey, app2FlowTable, null, flowName); + App2FlowColumn.FLOW_RUN_ID.store(rowKey, app2FlowTable, null, flowRunId); + LOG.info("clusterId=" + clusterId + ", appId=" + appId + ", flowId=" + + flowName + ", flowRunId=" + flowRunId +" is written"); + } return putStatus; } @@ -225,6 +240,11 @@ protected void serviceStop() throws Exception { // The close API performs flushing and releases any resources held entityTable.close(); } + if (app2FlowTable != null) { + LOG.info("closing app_flow table"); + // The close API performs flushing and releases any resources held + app2FlowTable.close(); + } if (conn != null) { LOG.info("closing the hbase Connection"); conn.close(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java index 5b4442c..26cf2e8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java @@ -126,7 +126,7 @@ protected void serviceStop() throws Exception { @Override public TimelineWriteResponse write(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities entities) throws IOException { + boolean newApp, TimelineEntities entities) throws IOException { TimelineWriteResponse response = new TimelineWriteResponse(); TimelineCollectorContext currContext = new TimelineCollectorContext( clusterId, userId, flowName, flowVersion, flowRunId, appId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java index a5cc2ab..000701e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.yarn.server.timelineservice.storage.app2flow.App2FlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; /** @@ -120,6 +121,7 @@ private static void createAllTables(Configuration hbaseConf) throw new IOException("Cannot create table since admin is null"); } new EntityTable().createTable(admin, hbaseConf); + new App2FlowTable().createTable(admin, hbaseConf); } finally { if (conn != null) { conn.close(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java index 494e8ad..24fc41a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriter.java @@ -44,6 +44,7 @@ * @param flowVersion context flow version * @param flowRunId * @param appId context app ID + * @param newApp the flag to indicate if the entity is assocated to a new app * @param data * a {@link TimelineEntities} object. * @return a {@link TimelineWriteResponse} object. @@ -51,7 +52,7 @@ */ TimelineWriteResponse write(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities data) throws IOException; + boolean newApp, TimelineEntities data) throws IOException; /** * Aggregates the entity information to the timeline store based on which diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/App2FlowColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/App2FlowColumn.java new file mode 100644 index 0000000..4690186 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/App2FlowColumn.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.app2flow; + + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +import java.io.IOException; + +/** + * Identifies fully qualified columns for the {@link App2FlowTable}. + */ +public enum App2FlowColumn implements Column { + + /** + * The flow ID + */ + FLOW_ID(App2FlowColumnFamily.MAPPING, "flow_id"), + + /** + * The flow run ID + */ + FLOW_RUN_ID(App2FlowColumnFamily.MAPPING, "flow_run_id"); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + private final String columnQualifier; + private final byte[] columnQualifierBytes; + + private App2FlowColumn(ColumnFamily columnFamily, + String columnQualifier) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + // Future-proof by ensuring the right column prefix hygiene. + this.columnQualifierBytes = + Bytes.toBytes(Separator.SPACE.encode(columnQualifier)); + this.column = new ColumnHelper(columnFamily); + } + + /** + * @return the column name value + */ + private String getColumnQualifier() { + return columnQualifier; + } + + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, Long timestamp, + Object inputValue) throws IOException { + column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, + inputValue); + } + + public Object readResult(Result result) throws IOException { + return column.readResult(result, columnQualifierBytes); + } + + /** + * Retrieve an {@link App2FlowColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnQualifier Name of the column to retrieve + * @return the corresponding {@link App2FlowColumn} or null + */ + public static final App2FlowColumn columnFor(String columnQualifier) { + + // Match column based on value, assume column family matches. + for (App2FlowColumn ec : App2FlowColumn.values()) { + // Find a match based only on name. + if (ec.getColumnQualifier().equals(columnQualifier)) { + return ec; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link App2FlowColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} + * if and only if {@code a.equals(b) & x.equals(y)} or + * {@code (x == y == null)} + * + * @param columnFamily The columnFamily for which to retrieve the column. + * @param name Name of the column to retrieve + * @return the corresponding {@link App2FlowColumn} or null if both arguments + * don't match. + */ + public static final App2FlowColumn columnFor( + App2FlowColumnFamily columnFamily, String name) { + + for (App2FlowColumn ec : App2FlowColumn.values()) { + // Find a match based column family and on name. + if (ec.columnFamily.equals(columnFamily) + && ec.getColumnQualifier().equals(name)) { + return ec; + } + } + + // Default to null + return null; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/App2FlowColumnFamily.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/App2FlowColumnFamily.java new file mode 100644 index 0000000..08692b5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/App2FlowColumnFamily.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.app2flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents the app_flow table column families. + */ +public enum App2FlowColumnFamily implements ColumnFamily { + /** + * Mapping column family houses known columns such as flowId and flowRunId + */ + MAPPING("m"); + + /** + * Byte representation of this column family. + */ + private final byte[] bytes; + + /** + * @param value create a column family with this name. Must be lower case and + * without spaces. + */ + private App2FlowColumnFamily(String value) { + // column families should be lower case and not contain any spaces. + this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); + } + + public byte[] getBytes() { + return Bytes.copy(bytes); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/App2FlowRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/App2FlowRowKey.java new file mode 100644 index 0000000..15015e8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/App2FlowRowKey.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.app2flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents a rowkey for the app_flow table. + */ +public class App2FlowRowKey { + /** + * Constructs a row key prefix for the app_flow table as follows: + * {@code clusterId!AppId} + * + * @param clusterId + * @param appId + * @return byte array with the row key + */ + public static byte[] getRowKey(String clusterId, String appId) { + return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId)); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/App2FlowTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/App2FlowTable.java new file mode 100644 index 0000000..df08c3a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/App2FlowTable.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.app2flow; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants; + +import java.io.IOException; + +/** + * The app_flow table as column families mapping. Mapping stores + * appId to flowId and flowRunId mapping information + * + * Example app_flow table record: + * + *
+ * |--------------------------------------|
+ * |  Row       | Column Family           |
+ * |  key       | info                    |
+ * |--------------------------------------|
+ * | clusterId! | flowId:                 |
+ * | AppId!     | foo@daily_hive_report   |
+ * |            |                         |
+ * |            | flowRunId:              |
+ * |            | 1452828720457           |
+ * |            |                         |
+ * |            |                         |
+ * |            |                         |
+ * |--------------------------------------|
+ * 
+ */ +public class App2FlowTable extends BaseTable { + /** app_flow prefix */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".app-flow"; + + /** config param name that specifies the app_flow table name */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** + * config param name that specifies the TTL for metrics column family in + * app_flow table + */ + private static final String METRICS_TTL_CONF_NAME = PREFIX + + ".table.metrics.ttl"; + + /** default value for app_flow table name */ + private static final String DEFAULT_TABLE_NAME = "timelineservice.app_flow"; + + /** default TTL is 30 days for metrics timeseries */ + private static final int DEFAULT_METRICS_TTL = 2592000; + + /** default max number of versions */ + private static final int DEFAULT_METRICS_MAX_VERSIONS = 1000; + + private static final Log LOG = LogFactory.getLog(App2FlowTable.class); + + public App2FlowTable() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable + * (org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor app2FlowTableDescp = new HTableDescriptor(table); + HColumnDescriptor mappCF = + new HColumnDescriptor(App2FlowColumnFamily.MAPPING.getBytes()); + mappCF.setBloomFilterType(BloomType.ROWCOL); + app2FlowTableDescp.addFamily(mappCF); + + app2FlowTableDescp + .setRegionSplitPolicyClassName( + "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); + app2FlowTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", + TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); + admin.createTable(app2FlowTableDescp, + TimelineHBaseSchemaConstants.getUsernameSplits()); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/package-info.java new file mode 100644 index 0000000..ec31b70 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/app2flow/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.app2flow; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java index e8d8b5c..abba79a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -94,6 +96,20 @@ public ResultScanner getResultScanner(Configuration hbaseConf, } /** + * + * @param hbaseConf used to read settings that override defaults + * @param conn used to create table from + * @param get that specifies what single row you want to get from this table + * @return result of get operation + * @throws IOException + */ + public Result getResult(Configuration hbaseConf, Connection conn, Get get) + throws IOException { + Table table = conn.getTable(getTableName(hbaseConf)); + return table.get(get); + } + + /** * Get the table name for this table. * * @param hbaseConf diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java deleted file mode 100644 index 5518a27..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.common; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.util.Bytes; - -/** - * contains the constants used in the context of schema accesses for - * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} - * information - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class TimelineEntitySchemaConstants { - - /** - * Used to create a pre-split for tables starting with a username in the - * prefix. TODO: this may have to become a config variable (string with - * separators) so that different installations can presplit based on their own - * commonly occurring names. - */ - private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"), - Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"), - Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"), - Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"), - Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"), - Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"), - Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"), - Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"), - Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"), - Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"), - Bytes.toBytes("z") }; - - /** - * The length at which keys auto-split - */ - public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4"; - - /** - * @return splits for splits where a user is a prefix. - */ - public final static byte[][] getUsernameSplits() { - byte[][] kloon = USERNAME_SPLITS.clone(); - // Deep copy. - for (int row = 0; row < USERNAME_SPLITS.length; row++) { - kloon[row] = Bytes.copy(USERNAME_SPLITS[row]); - } - return kloon; - } - -} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java new file mode 100644 index 0000000..bbf498a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * contains the constants used in the context of schema accesses for + * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * information + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TimelineHBaseSchemaConstants { + + /** + * Used to create a pre-split for tables starting with a username in the + * prefix. TODO: this may have to become a config variable (string with + * separators) so that different installations can presplit based on their own + * commonly occurring names. + */ + private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"), + Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"), + Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"), + Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"), + Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"), + Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"), + Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"), + Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"), + Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"), + Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"), + Bytes.toBytes("z") }; + + /** + * The length at which keys auto-split + */ + public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4"; + + /** + * @return splits for splits where a user is a prefix. + */ + public final static byte[][] getUsernameSplits() { + byte[][] kloon = USERNAME_SPLITS.clone(); + // Deep copy. + for (int row = 0; row < USERNAME_SPLITS.length; row++) { + kloon[row] = Bytes.copy(USERNAME_SPLITS[row]); + } + return kloon; + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java new file mode 100644 index 0000000..809aa00 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + + +public class TimelineReaderUtils { + /** + * + * @param entityRelations the relations of an entity + * @param relationFilters the relations for filtering + * @return + */ + public static boolean matchRelations( + Map> entityRelations, + Map> relationFilters) { + for (Map.Entry> relation : relationFilters.entrySet()) { + Set ids = entityRelations.get(relation.getKey()); + if (ids == null) { + return false; + } + for (String id : relation.getValue()) { + if (!ids.contains(id)) { + return false; + } + } + } + return true; + } + + /** + * + * @param map the map of key/value pairs in an entity + * @param filters the map of key/value pairs for filtering + * @return + */ + public static boolean matchFilters(Map map, + Map filters) { + for (Map.Entry filter : filters.entrySet()) { + Object value = map.get(filter.getKey()); + if (value == null) { + return false; + } + if (!value.equals(filter.getValue())) { + return false; + } + } + return true; + } + + /** + * + * @param entityEvents the set of event objects in an entity + * @param eventFilters the set of event Ids for filtering + * @return + */ + public static boolean matchEventFilters(Set entityEvents, + Set eventFilters) { + Set eventIds = new HashSet(); + for (TimelineEvent event : entityEvents) { + eventIds.add(event.getId()); + } + for (String eventFilter : eventFilters) { + if (!eventIds.contains(eventFilter)) { + return false; + } + } + return true; + } + + /** + * + * @param metrics the set of metric objects in an entity + * @param metricFilters the set of metric Ids for filtering + * @return + */ + public static boolean matchMetricFilters(Set metrics, + Set metricFilters) { + Set metricIds = new HashSet(); + for (TimelineMetric metric : metrics) { + metricIds.add(metric.getId()); + } + + for (String metricFilter : metricFilters) { + if (!metricIds.contains(metricFilter)) { + return false; + } + } + return true; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 61958c2..75b68a5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -54,17 +54,45 @@ /** * Constructs a row key prefix for the entity table as follows: - * {@code userName!clusterId!flowId!flowRunId!AppId} + * {@code userName!clusterId!flowId!flowRunId!AppId!entityType} * * @param clusterId * @param userId * @param flowId * @param flowRunId * @param appId + * @param entityType * @return byte array with the row key prefix */ + public static byte[] getRowKeyPrefix(String clusterId, String userId, + String flowId, Long flowRunId, String appId, String entityType) { + byte[] first = + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId, + flowId)); + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId)); + byte[] third = + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType)); + return Separator.QUALIFIERS.join(first, second, third); + } + + /** + * Constructs a row key for the entity table as follows: + * {@code userName!clusterId!flowId!flowRunId!AppId!entityType!entityId} + * + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @param appId + * @param entityType + * @param entityId + * @return byte array with the row key + */ public static byte[] getRowKey(String clusterId, String userId, - String flowId, Long flowRunId, String appId, TimelineEntity te) { + String flowId, Long flowRunId, String appId, String entityType, + String entityId) { byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId, flowId)); @@ -72,8 +100,8 @@ // time. byte[] second = Bytes.toBytes(EntityRowKey.invert(flowRunId)); byte[] third = - Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(), - te.getId())); + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, + entityId)); return Separator.QUALIFIERS.join(first, second, third); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java index 61f7c4c..a138b90 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEntitySchemaConstants; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants; /** * The entity table as column families info, config and metrics. Info stores @@ -142,9 +142,9 @@ public void createTable(Admin admin, Configuration hbaseConf) entityTableDescp .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", - TimelineEntitySchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); + TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); admin.createTable(entityTableDescp, - TimelineEntitySchemaConstants.getUsernameSplits()); + TimelineHBaseSchemaConstants.getUsernameSplits()); LOG.info("Status of table creation for " + table.getNameAsString() + "=" + admin.tableExists(table)); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 50a9f60..9d3b975 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -58,7 +58,7 @@ public void testWriteEntityToFile() throws Exception { fsi.init(new YarnConfiguration()); fsi.start(); fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L, - "app_id", te); + "app_id", false, te); String fileName = fsi.getOutputRoot() + "/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/" + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java index 6abf240..48d7086 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.timelineservice.storage.app2flow.App2FlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; @@ -69,6 +71,8 @@ public static void setupBeforeClass() throws Exception { private static void createSchema() throws IOException { new EntityTable() .createTable(util.getHBaseAdmin(), util.getConfiguration()); + new App2FlowTable() + .createTable(util.getHBaseAdmin(), util.getConfiguration()); } @Test @@ -130,6 +134,7 @@ public void testWriteEntityToHBase() throws Exception { te.addEntity(entity); HBaseTimelineWriterImpl hbi = null; + HBaseTimelineReaderImpl hbr = null; try { Configuration c1 = util.getConfiguration(); hbi = new HBaseTimelineWriterImpl(c1); @@ -140,9 +145,30 @@ public void testWriteEntityToHBase() throws Exception { String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; String appName = "some app name"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.write(cluster, user, flow, flowVersion, runid, appName, true, te); hbi.stop(); + hbr = new HBaseTimelineReaderImpl(); + hbr.init(c1); + hbr.start(); + TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, + entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); + TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName, + entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); + Set es1 = hbr.getEntities(user, cluster, flow, runid, + appName, entity.getType(), null, null, null, null, null, null, null, + null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); + Set es2 = hbr.getEntities(user, cluster, null, null, + appName, entity.getType(), null, null, null, null, null, null, null, + null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); + hbr.stop(); + assertNotNull(e1); + assertNotNull(e2); + assertEquals(e1, e2); + assertEquals(1, es1.size()); + assertEquals(1, es2.size()); + assertEquals(es1, es2); + // scan the table and see that entity exists Scan s = new Scan(); byte[] startRow = @@ -289,7 +315,8 @@ private void testAdditionalEntity() throws IOException { String flowVersion = "1111F01C2287BA"; long runid = 1009876543218L; String appName = "some app name"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + hbi.write(cluster, user, flow, flowVersion, runid, appName, false, + entities); hbi.stop(); // scan the table and see that entity exists Scan s = new Scan(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java index dece83d..397a596 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java @@ -59,7 +59,8 @@ public void testPhoenixWriterBasic() throws Exception { int numEntity = 12; TimelineEntities te = TestTimelineWriterImpl.getStandardTestTimelineEntities(numEntity); - writer.write("cluster_1", "user1", "testFlow", "version1", 1l, "app_test_1", te); + writer.write("cluster_1", "user1", "testFlow", "version1", 1l, "app_test_1", + false, te); // Verify if we're storing all entities String sql = "SELECT COUNT(entity_id) FROM " + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME;