diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml index bee9618..67a65fa 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml @@ -98,6 +98,12 @@ test-jar + org.apache.hadoop + hadoop-yarn-server-timelineservice + test + test-jar + + org.hsqldb hsqldb test diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java index fde9e64..6fdf0db 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -51,8 +51,8 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.Assert; import org.junit.Test; @@ -242,7 +242,7 @@ public void testMRNewTimelineServiceEventHandling() throws Exception { } // Cleanup test file String testRoot = - FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT; + TestFileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT; File testRootFolder = new File(testRoot); if(testRootFolder.isDirectory()) { FileUtils.deleteDirectory(testRootFolder); @@ -254,7 +254,7 @@ public void testMRNewTimelineServiceEventHandling() throws Exception { private void checkNewTimelineEvent(ApplicationId appId, ApplicationReport appReport) throws IOException { String tmpRoot = - FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + TestFileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + "/entities/"; File tmpRootFolder = new File(tmpRoot); @@ -276,7 +276,7 @@ private void checkNewTimelineEvent(ApplicationId appId, // check for job event file String jobEventFileName = appId.toString().replaceAll("application", "job") - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; String jobEventFilePath = outputDirJob + jobEventFileName; File jobEventFile = new File(jobEventFilePath); @@ -299,7 +299,7 @@ private void checkNewTimelineEvent(ApplicationId appId, // check for job event file String appEventFileName = appId.toString() - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; String appEventFilePath = outputAppDir + appEventFileName; File appEventFile = new File(appEventFilePath); @@ -318,7 +318,7 @@ private void checkNewTimelineEvent(ApplicationId appId, taskFolder.isDirectory()); String taskEventFileName = appId.toString().replaceAll("application", "task") - + "_m_000000" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + "_m_000000" + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; String taskEventFilePath = outputDirTask + taskEventFileName; File taskEventFile = new File(taskEventFilePath); @@ -336,7 +336,7 @@ private void checkNewTimelineEvent(ApplicationId appId, String taskAttemptEventFileName = appId.toString().replaceAll( "application", "attempt") + "_m_000000_0" + - FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; String taskAttemptEventFilePath = outputDirTaskAttempt + taskAttemptEventFileName; @@ -370,7 +370,7 @@ private void verifyEntity(File entityFile, String eventId, while ((strLine = reader.readLine()) != null) { if (strLine.trim().length() > 0) { org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = - FileSystemTimelineReaderImpl.getTimelineRecordFromJSON( + TestFileSystemTimelineReaderImpl.getTimelineRecordFromJSON( strLine.trim(), org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.class); if (eventId == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 22c16e3..acd3e19 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -86,7 +86,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; @@ -487,7 +487,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId, LOG.info("Started checkTimelineV2 "); // For PoC check in /tmp/timeline_service_data YARN-3264 String tmpRoot = - FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + TestFileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + "/entities/"; File tmpRootFolder = new File(tmpRoot); @@ -510,7 +510,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId, String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId() + "_000001" - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT", appTimestampFileName); @@ -525,7 +525,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId, String containerMetricsTimestampFileName = "container_" + appId.getClusterTimestamp() + "_000" + appId.getId() + "_01_000001" - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; File containerEntityFile = verifyEntityTypeFileExists(basePath, TimelineEntityType.YARN_CONTAINER.toString(), containerMetricsTimestampFileName); @@ -556,7 +556,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId, // Verify RM posting Application life cycle Events are getting published String appMetricsTimestampFileName = "application_" + appId.getClusterTimestamp() + "_000" + appId.getId() - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; File appEntityFile = verifyEntityTypeFileExists(basePath, TimelineEntityType.YARN_APPLICATION.toString(), @@ -589,7 +589,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId, String appAttemptMetricsTimestampFileName = "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId() + "_000001" - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; File appAttemptEntityFile = verifyEntityTypeFileExists(basePath, TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index f25fb48..2a39d46 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -158,6 +158,12 @@ log4j log4j + + org.apache.hadoop + hadoop-yarn-server-timelineservice + test + test-jar + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java index 13c67f8..b03f258 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java @@ -64,8 +64,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.AfterClass; import org.junit.Assert; @@ -75,7 +75,7 @@ public class TestSystemMetricsPublisherForV2 { /** - * is the folder where the FileSystemTimelineWriterImpl writes the entities + * is the folder where the TestFileSystemTimelineWriterImpl writes the entities */ protected static File testRootDir = new File("target", TestSystemMetricsPublisherForV2.class.getName() + "-localDir") @@ -143,7 +143,7 @@ private static Configuration getTimelineV2Conf() { conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED, true); try { - conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, + conf.set(TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, testRootDir.getCanonicalPath()); } catch (IOException e) { e.printStackTrace(); @@ -203,7 +203,7 @@ public void testPublishApplicationMetrics() throws Exception { // file name is .thist String timelineServiceFileName = appId.toString() - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; File appFile = new File(outputDirApp, timelineServiceFileName); Assert.assertTrue(appFile.exists()); verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE); @@ -237,7 +237,7 @@ public void testPublishAppAttemptMetrics() throws Exception { // file name is .thist String timelineServiceFileName = appAttemptId.toString() - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; File appFile = new File(outputDirApp, timelineServiceFileName); Assert.assertTrue(appFile.exists()); verifyEntity(appFile,2, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE); @@ -268,7 +268,7 @@ public void testPublishContainerMetrics() throws Exception { // file name is .thist String timelineServiceFileName = containerId.toString() - + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; File appFile = new File(outputDirApp, timelineServiceFileName); Assert.assertTrue(appFile.exists()); verifyEntity(appFile, 2, @@ -294,7 +294,7 @@ private static void verifyEntity(File entityFile, long expectedEvents, reader = new BufferedReader(new FileReader(entityFile)); while ((strLine = reader.readLine()) != null) { if (strLine.trim().length() > 0) { - TimelineEntity entity = FileSystemTimelineReaderImpl. + TimelineEntity entity = TestFileSystemTimelineReaderImpl. getTimelineRecordFromJSON(strLine.trim(), TimelineEntity.class); for (TimelineEvent event : entity.getEvents()) { if (event.getId().equals(eventForCreatedTime)) { @@ -315,7 +315,7 @@ private static void verifyEntity(File entityFile, long expectedEvents, private String getTimelineEntityDir(RMApp app) { String outputDirApp = testRootDir.getAbsolutePath() + "/" - + FileSystemTimelineWriterImpl.ENTITIES_DIR + "/" + + TestFileSystemTimelineWriterImpl.ENTITIES_DIR + "/" + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" + app.getUser() + "/" + app.getName() + "/" diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index a8f88e5..9758320 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import com.google.common.annotations.VisibleForTesting; @@ -61,7 +61,7 @@ public void serviceInit(Configuration conf) throws Exception { writer = ReflectionUtils.newInstance(conf.getClass( YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, - FileSystemTimelineWriterImpl.class, + HBaseTimelineWriterImpl.class, TimelineWriter.class), conf); writer.init(conf); // create a single dedicated thread for flushing the writer on a periodic diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java index 97725e6..110d1dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java @@ -41,7 +41,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; @@ -81,7 +81,7 @@ protected void serviceInit(Configuration conf) throws Exception { private TimelineReader createTimelineReaderStore(Configuration conf) { TimelineReader readerStore = ReflectionUtils.newInstance(conf.getClass( YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, - FileSystemTimelineReaderImpl.class, TimelineReader.class), conf); + HBaseTimelineReaderImpl.class, TimelineReader.class), conf); LOG.info("Using store " + readerStore.getClass().getName()); readerStore.init(conf); return readerStore; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java deleted file mode 100644 index 00aa686..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ /dev/null @@ -1,405 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.Charset; -import java.util.Comparator; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; - -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVRecord; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; -import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.map.JsonMappingException; -import org.codehaus.jackson.map.ObjectMapper; - -import com.google.common.annotations.VisibleForTesting; - -/** - * File System based implementation for TimelineReader. - */ -public class FileSystemTimelineReaderImpl extends AbstractService - implements TimelineReader { - - private static final Log LOG = - LogFactory.getLog(FileSystemTimelineReaderImpl.class); - - private String rootPath; - private static final String ENTITIES_DIR = "entities"; - - /** Default extension for output files. */ - private static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; - - @VisibleForTesting - /** Default extension for output files. */ - static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv"; - - @VisibleForTesting - /** Config param for timeline service file system storage root. */ - static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT = - YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; - - @VisibleForTesting - /** Default value for storage location on local disk. */ - static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT = - "/tmp/timeline_service_data"; - - private final CSVFormat csvFormat = - CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN"); - - public FileSystemTimelineReaderImpl() { - super(FileSystemTimelineReaderImpl.class.getName()); - } - - @VisibleForTesting - String getRootPath() { - return rootPath; - } - - private static ObjectMapper mapper; - - static { - mapper = new ObjectMapper(); - YarnJacksonJaxbJsonProvider.configObjectMapper(mapper); - } - - /** - * Deserialize a POJO object from a JSON string. - * - * @param Describes the type of class to be returned. - * @param clazz class to be deserialized. - * @param jsonString JSON string to deserialize. - * @return An object based on class type. Used typically for - * TimelineEntity object. - * @throws IOException if the underlying input source has problems during - * parsing. - * @throws JsonMappingException if parser has problems parsing content. - * @throws JsonGenerationException if there is a problem in JSON writing. - */ - public static T getTimelineRecordFromJSON( - String jsonString, Class clazz) - throws JsonGenerationException, JsonMappingException, IOException { - return mapper.readValue(jsonString, clazz); - } - - private static void fillFields(TimelineEntity finalEntity, - TimelineEntity real, EnumSet fields) { - if (fields.contains(Field.ALL)) { - fields = EnumSet.allOf(Field.class); - } - for (Field field : fields) { - switch(field) { - case CONFIGS: - finalEntity.setConfigs(real.getConfigs()); - break; - case METRICS: - finalEntity.setMetrics(real.getMetrics()); - break; - case INFO: - finalEntity.setInfo(real.getInfo()); - break; - case IS_RELATED_TO: - finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); - break; - case RELATES_TO: - finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); - break; - case EVENTS: - finalEntity.setEvents(real.getEvents()); - break; - default: - continue; - } - } - } - - private String getFlowRunPath(String userId, String clusterId, - String flowName, Long flowRunId, String appId) throws IOException { - if (userId != null && flowName != null && flowRunId != null) { - return userId + "/" + flowName + "/" + flowRunId; - } - if (clusterId == null || appId == null) { - throw new IOException("Unable to get flow info"); - } - String appFlowMappingFile = rootPath + "/" + ENTITIES_DIR + "/" + - clusterId + "/" + APP_FLOW_MAPPING_FILE; - try (BufferedReader reader = - new BufferedReader(new InputStreamReader( - new FileInputStream( - appFlowMappingFile), Charset.forName("UTF-8"))); - CSVParser parser = new CSVParser(reader, csvFormat)) { - for (CSVRecord record : parser.getRecords()) { - if (record.size() < 4) { - continue; - } - String applicationId = record.get("APP"); - if (applicationId != null && !applicationId.trim().isEmpty() && - !applicationId.trim().equals(appId)) { - continue; - } - return record.get(1).trim() + "/" + record.get(2).trim() + "/" + - record.get(3).trim(); - } - parser.close(); - } - throw new IOException("Unable to get flow info"); - } - - private static TimelineEntity createEntityToBeReturned(TimelineEntity entity, - EnumSet fieldsToRetrieve) { - TimelineEntity entityToBeReturned = new TimelineEntity(); - entityToBeReturned.setIdentifier(entity.getIdentifier()); - entityToBeReturned.setCreatedTime(entity.getCreatedTime()); - if (fieldsToRetrieve != null) { - fillFields(entityToBeReturned, entity, fieldsToRetrieve); - } - return entityToBeReturned; - } - - private static boolean isTimeInRange(Long time, Long timeBegin, - Long timeEnd) { - return (time >= timeBegin) && (time <= timeEnd); - } - - private static void mergeEntities(TimelineEntity entity1, - TimelineEntity entity2) { - // Ideally created time wont change except in the case of issue from client. - if (entity2.getCreatedTime() != null && entity2.getCreatedTime() > 0) { - entity1.setCreatedTime(entity2.getCreatedTime()); - } - for (Entry configEntry : entity2.getConfigs().entrySet()) { - entity1.addConfig(configEntry.getKey(), configEntry.getValue()); - } - for (Entry infoEntry : entity2.getInfo().entrySet()) { - entity1.addInfo(infoEntry.getKey(), infoEntry.getValue()); - } - for (Entry> isRelatedToEntry : - entity2.getIsRelatedToEntities().entrySet()) { - String type = isRelatedToEntry.getKey(); - for (String entityId : isRelatedToEntry.getValue()) { - entity1.addIsRelatedToEntity(type, entityId); - } - } - for (Entry> relatesToEntry : - entity2.getRelatesToEntities().entrySet()) { - String type = relatesToEntry.getKey(); - for (String entityId : relatesToEntry.getValue()) { - entity1.addRelatesToEntity(type, entityId); - } - } - for (TimelineEvent event : entity2.getEvents()) { - entity1.addEvent(event); - } - for (TimelineMetric metric2 : entity2.getMetrics()) { - boolean found = false; - for (TimelineMetric metric1 : entity1.getMetrics()) { - if (metric1.getId().equals(metric2.getId())) { - metric1.addValues(metric2.getValues()); - found = true; - break; - } - } - if (!found) { - entity1.addMetric(metric2); - } - } - } - - private static TimelineEntity readEntityFromFile(BufferedReader reader) - throws IOException { - TimelineEntity entity = - getTimelineRecordFromJSON(reader.readLine(), TimelineEntity.class); - String entityStr = ""; - while ((entityStr = reader.readLine()) != null) { - if (entityStr.trim().isEmpty()) { - continue; - } - TimelineEntity anotherEntity = - getTimelineRecordFromJSON(entityStr, TimelineEntity.class); - if (!entity.getId().equals(anotherEntity.getId()) || - !entity.getType().equals(anotherEntity.getType())) { - continue; - } - mergeEntities(entity, anotherEntity); - } - return entity; - } - - private Set getEntities(File dir, String entityType, - TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) - throws IOException { - // First sort the selected entities based on created/start time. - Map> sortedEntities = - new TreeMap<>( - new Comparator() { - @Override - public int compare(Long l1, Long l2) { - return l2.compareTo(l1); - } - } - ); - for (File entityFile : dir.listFiles()) { - if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) { - continue; - } - try (BufferedReader reader = - new BufferedReader( - new InputStreamReader( - new FileInputStream( - entityFile), Charset.forName("UTF-8")))) { - TimelineEntity entity = readEntityFromFile(reader); - if (!entity.getType().equals(entityType)) { - continue; - } - if (!isTimeInRange(entity.getCreatedTime(), - filters.getCreatedTimeBegin(), filters.getCreatedTimeEnd())) { - continue; - } - if (filters.getRelatesTo() != null && - !filters.getRelatesTo().getFilterList().isEmpty() && - !TimelineStorageUtils.matchRelatesTo(entity, - filters.getRelatesTo())) { - continue; - } - if (filters.getIsRelatedTo() != null && - !filters.getIsRelatedTo().getFilterList().isEmpty() && - !TimelineStorageUtils.matchIsRelatedTo(entity, - filters.getIsRelatedTo())) { - continue; - } - if (filters.getInfoFilters() != null && - !filters.getInfoFilters().getFilterList().isEmpty() && - !TimelineStorageUtils.matchInfoFilters(entity, - filters.getInfoFilters())) { - continue; - } - if (filters.getConfigFilters() != null && - !filters.getConfigFilters().getFilterList().isEmpty() && - !TimelineStorageUtils.matchConfigFilters(entity, - filters.getConfigFilters())) { - continue; - } - if (filters.getMetricFilters() != null && - !filters.getMetricFilters().getFilterList().isEmpty() && - !TimelineStorageUtils.matchMetricFilters(entity, - filters.getMetricFilters())) { - continue; - } - if (filters.getEventFilters() != null && - !filters.getEventFilters().getFilterList().isEmpty() && - !TimelineStorageUtils.matchEventFilters(entity, - filters.getEventFilters())) { - continue; - } - TimelineEntity entityToBeReturned = createEntityToBeReturned( - entity, dataToRetrieve.getFieldsToRetrieve()); - Set entitiesCreatedAtSameTime = - sortedEntities.get(entityToBeReturned.getCreatedTime()); - if (entitiesCreatedAtSameTime == null) { - entitiesCreatedAtSameTime = new HashSet(); - } - entitiesCreatedAtSameTime.add(entityToBeReturned); - sortedEntities.put( - entityToBeReturned.getCreatedTime(), entitiesCreatedAtSameTime); - } - } - - Set entities = new HashSet(); - long entitiesAdded = 0; - for (Set entitySet : sortedEntities.values()) { - for (TimelineEntity entity : entitySet) { - entities.add(entity); - ++entitiesAdded; - if (entitiesAdded >= filters.getLimit()) { - return entities; - } - } - } - return entities; - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, - DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); - super.serviceInit(conf); - } - - @Override - public TimelineEntity getEntity(TimelineReaderContext context, - TimelineDataToRetrieve dataToRetrieve) throws IOException { - String flowRunPath = getFlowRunPath(context.getUserId(), - context.getClusterId(), context.getFlowName(), context.getFlowRunId(), - context.getAppId()); - File dir = new File(new File(rootPath, ENTITIES_DIR), - context.getClusterId() + "/" + flowRunPath + "/" + context.getAppId() + - "/" + context.getEntityType()); - File entityFile = new File( - dir, context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION); - try (BufferedReader reader = - new BufferedReader(new InputStreamReader( - new FileInputStream(entityFile), Charset.forName("UTF-8")))) { - TimelineEntity entity = readEntityFromFile(reader); - return createEntityToBeReturned( - entity, dataToRetrieve.getFieldsToRetrieve()); - } catch (FileNotFoundException e) { - LOG.info("Cannot find entity {id:" + context.getEntityId() + " , type:" + - context.getEntityType() + "}. Will send HTTP 404 in response."); - return null; - } - } - - @Override - public Set getEntities(TimelineReaderContext context, - TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) - throws IOException { - String flowRunPath = getFlowRunPath(context.getUserId(), - context.getClusterId(), context.getFlowName(), context.getFlowRunId(), - context.getAppId()); - File dir = - new File(new File(rootPath, ENTITIES_DIR), - context.getClusterId() + "/" + flowRunPath + "/" + - context.getAppId() + "/" + context.getEntityType()); - return getEntities(dir, context.getEntityType(), filters, dataToRetrieve); - } -} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java deleted file mode 100644 index 74a03ac..0000000 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; - -/** - * This implements a local file based backend for storing application timeline - * information. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class FileSystemTimelineWriterImpl extends AbstractService - implements TimelineWriter { - - private String outputRoot; - - /** Config param for timeline service storage tmp root for FILE YARN-3264. */ - public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT - = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; - - /** default value for storage location on local disk. */ - public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT - = "/tmp/timeline_service_data"; - - public static final String ENTITIES_DIR = "entities"; - - /** Default extension for output files. */ - public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; - - FileSystemTimelineWriterImpl() { - super((FileSystemTimelineWriterImpl.class.getName())); - } - - @Override - public TimelineWriteResponse write(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntities entities) throws IOException { - TimelineWriteResponse response = new TimelineWriteResponse(); - for (TimelineEntity entity : entities.getEntities()) { - write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity, - response); - } - return response; - } - - private synchronized void write(String clusterId, String userId, - String flowName, String flowVersion, long flowRun, String appId, - TimelineEntity entity, TimelineWriteResponse response) - throws IOException { - PrintWriter out = null; - try { - String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId, - escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId, - entity.getType()); - String fileName = dir + entity.getId() + - TIMELINE_SERVICE_STORAGE_EXTENSION; - out = - new PrintWriter(new BufferedWriter(new OutputStreamWriter( - new FileOutputStream(fileName, true), "UTF-8"))); - out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); - out.write("\n"); - } catch (IOException ioe) { - TimelineWriteError error = new TimelineWriteError(); - error.setEntityId(entity.getId()); - error.setEntityType(entity.getType()); - /* - * TODO: set an appropriate error code after PoC could possibly be: - * error.setErrorCode(TimelineWriteError.IO_EXCEPTION); - */ - response.addError(error); - } finally { - if (out != null) { - out.close(); - } - } - } - - @Override - public TimelineWriteResponse aggregate(TimelineEntity data, - TimelineAggregationTrack track) throws IOException { - return null; - - } - - public String getOutputRoot() { - return outputRoot; - } - - @Override - public void serviceInit(Configuration conf) throws Exception { - outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, - DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); - } - - @Override - public void serviceStart() throws Exception { - mkdirs(outputRoot, ENTITIES_DIR); - } - - @Override - public void flush() throws IOException { - // no op - } - - private static String mkdirs(String... dirStrs) throws IOException { - StringBuilder path = new StringBuilder(); - for (String dirStr : dirStrs) { - path.append(dirStr).append('/'); - File dir = new File(path.toString()); - if (!dir.exists()) { - if (!dir.mkdirs()) { - throw new IOException("Could not create directories for " + dir); - } - } - } - return path.toString(); - } - - // specifically escape the separator character - private static String escape(String str) { - return str.replace(File.separatorChar, '_'); - } -} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java index 0bddf1b..41df895 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; -import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImplTest; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -60,12 +60,12 @@ @BeforeClass public static void setup() throws Exception { - TestFileSystemTimelineReaderImpl.setup(); + FileSystemTimelineReaderImplTest.setup(); } @AfterClass public static void tearDown() throws Exception { - TestFileSystemTimelineReaderImpl.tearDown(); + FileSystemTimelineReaderImplTest.tearDown(); } @Before diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImplTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImplTest.java new file mode 100644 index 0000000..273cf88 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImplTest.java @@ -0,0 +1,804 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class FileSystemTimelineReaderImplTest { + + private static final String rootDir = + TestFileSystemTimelineReaderImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT; + TestFileSystemTimelineReaderImpl reader; + + @BeforeClass + public static void setup() throws Exception { + loadEntityData(); + // Create app flow mapping file. + CSVFormat format = + CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN"); + String appFlowMappingFile = rootDir + "/entities/cluster1/" + + TestFileSystemTimelineReaderImpl.APP_FLOW_MAPPING_FILE; + try (PrintWriter out = + new PrintWriter(new BufferedWriter( + new FileWriter(appFlowMappingFile, true))); + CSVPrinter printer = new CSVPrinter(out, format)){ + printer.printRecord("app1", "user1", "flow1", 1); + printer.printRecord("app2","user1","flow1,flow",1); + printer.close(); + } + (new File(rootDir)).deleteOnExit(); + } + + @AfterClass + public static void tearDown() throws Exception { + FileUtils.deleteDirectory(new File(rootDir)); + } + + @Before + public void init() throws Exception { + reader = new TestFileSystemTimelineReaderImpl(); + Configuration conf = new YarnConfiguration(); + conf.set(TestFileSystemTimelineReaderImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, + rootDir); + reader.init(conf); + } + + private static void writeEntityFile(TimelineEntity entity, File dir) + throws Exception { + if (!dir.exists()) { + if (!dir.mkdirs()) { + throw new IOException("Could not create directories for " + dir); + } + } + String fileName = dir.getAbsolutePath() + "/" + entity.getId() + ".thist"; + try (PrintWriter out = + new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)))){ + out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); + out.write("\n"); + out.close(); + } + } + + private static void loadEntityData() throws Exception { + File appDir = new File(rootDir + + "/entities/cluster1/user1/flow1/1/app1/app/"); + TimelineEntity entity11 = new TimelineEntity(); + entity11.setId("id_1"); + entity11.setType("app"); + entity11.setCreatedTime(1425016502000L); + Map info1 = new HashMap(); + info1.put("info1", "val1"); + info1.put("info2", "val5"); + entity11.addInfo(info1); + TimelineEvent event = new TimelineEvent(); + event.setId("event_1"); + event.setTimestamp(1425016502003L); + entity11.addEvent(event); + Set metrics = new HashSet(); + TimelineMetric metric1 = new TimelineMetric(); + metric1.setId("metric1"); + metric1.setType(TimelineMetric.Type.SINGLE_VALUE); + metric1.addValue(1425016502006L, 113); + metrics.add(metric1); + TimelineMetric metric2 = new TimelineMetric(); + metric2.setId("metric2"); + metric2.setType(TimelineMetric.Type.TIME_SERIES); + metric2.addValue(1425016502016L, 34); + metrics.add(metric2); + entity11.setMetrics(metrics); + Map configs = new HashMap(); + configs.put("config_1", "127"); + entity11.setConfigs(configs); + entity11.addRelatesToEntity("flow", "flow1"); + entity11.addIsRelatedToEntity("type1", "tid1_1"); + writeEntityFile(entity11, appDir); + TimelineEntity entity12 = new TimelineEntity(); + entity12.setId("id_1"); + entity12.setType("app"); + configs.clear(); + configs.put("config_2", "23"); + configs.put("config_3", "abc"); + entity12.addConfigs(configs); + metrics.clear(); + TimelineMetric metric12 = new TimelineMetric(); + metric12.setId("metric2"); + metric12.setType(TimelineMetric.Type.TIME_SERIES); + metric12.addValue(1425016502032L, 48); + metric12.addValue(1425016502054L, 51); + metrics.add(metric12); + TimelineMetric metric3 = new TimelineMetric(); + metric3.setId("metric3"); + metric3.setType(TimelineMetric.Type.SINGLE_VALUE); + metric3.addValue(1425016502060L, 23L); + metrics.add(metric3); + entity12.setMetrics(metrics); + entity12.addIsRelatedToEntity("type1", "tid1_2"); + entity12.addIsRelatedToEntity("type2", "tid2_1`"); + TimelineEvent event15 = new TimelineEvent(); + event15.setId("event_5"); + event15.setTimestamp(1425016502017L); + entity12.addEvent(event15); + writeEntityFile(entity12, appDir); + + TimelineEntity entity2 = new TimelineEntity(); + entity2.setId("id_2"); + entity2.setType("app"); + entity2.setCreatedTime(1425016501050L); + Map info2 = new HashMap(); + info1.put("info2", 4); + entity2.addInfo(info2); + Map configs2 = new HashMap(); + configs2.put("config_1", "129"); + configs2.put("config_3", "def"); + entity2.setConfigs(configs2); + TimelineEvent event2 = new TimelineEvent(); + event2.setId("event_2"); + event2.setTimestamp(1425016501003L); + entity2.addEvent(event2); + Set metrics2 = new HashSet(); + TimelineMetric metric21 = new TimelineMetric(); + metric21.setId("metric1"); + metric21.setType(TimelineMetric.Type.SINGLE_VALUE); + metric21.addValue(1425016501006L, 300); + metrics2.add(metric21); + TimelineMetric metric22 = new TimelineMetric(); + metric22.setId("metric2"); + metric22.setType(TimelineMetric.Type.TIME_SERIES); + metric22.addValue(1425016501056L, 31); + metric22.addValue(1425016501084L, 70); + metrics2.add(metric22); + TimelineMetric metric23 = new TimelineMetric(); + metric23.setId("metric3"); + metric23.setType(TimelineMetric.Type.SINGLE_VALUE); + metric23.addValue(1425016502060L, 23L); + metrics2.add(metric23); + entity2.setMetrics(metrics2); + entity2.addRelatesToEntity("flow", "flow2"); + writeEntityFile(entity2, appDir); + + TimelineEntity entity3 = new TimelineEntity(); + entity3.setId("id_3"); + entity3.setType("app"); + entity3.setCreatedTime(1425016501050L); + Map info3 = new HashMap(); + info3.put("info2", 3.5); + info3.put("info4", 20); + entity3.addInfo(info3); + Map configs3 = new HashMap(); + configs3.put("config_1", "123"); + configs3.put("config_3", "abc"); + entity3.setConfigs(configs3); + TimelineEvent event3 = new TimelineEvent(); + event3.setId("event_2"); + event3.setTimestamp(1425016501003L); + entity3.addEvent(event3); + TimelineEvent event4 = new TimelineEvent(); + event4.setId("event_4"); + event4.setTimestamp(1425016502006L); + entity3.addEvent(event4); + Set metrics3 = new HashSet(); + TimelineMetric metric31 = new TimelineMetric(); + metric31.setId("metric1"); + metric31.setType(TimelineMetric.Type.SINGLE_VALUE); + metric31.addValue(1425016501006L, 124); + metrics3.add(metric31); + TimelineMetric metric32 = new TimelineMetric(); + metric32.setId("metric2"); + metric32.setType(TimelineMetric.Type.TIME_SERIES); + metric32.addValue(1425016501056L, 31); + metric32.addValue(1425016501084L, 74); + metrics3.add(metric32); + entity3.setMetrics(metrics3); + entity3.addIsRelatedToEntity("type1", "tid1_2"); + writeEntityFile(entity3, appDir); + + TimelineEntity entity4 = new TimelineEntity(); + entity4.setId("id_4"); + entity4.setType("app"); + entity4.setCreatedTime(1425016502050L); + TimelineEvent event44 = new TimelineEvent(); + event44.setId("event_4"); + event44.setTimestamp(1425016502003L); + entity4.addEvent(event44); + writeEntityFile(entity4, appDir); + + File appDir2 = new File(rootDir + + "/entities/cluster1/user1/flow1,flow/1/app2/app/"); + TimelineEntity entity5 = new TimelineEntity(); + entity5.setId("id_5"); + entity5.setType("app"); + entity5.setCreatedTime(1425016502050L); + writeEntityFile(entity5, appDir2); + } + + public TimelineReader getTimelineReader() { + return reader; + } + + @Test + public void testGetEntityDefaultView() throws Exception { + // If no fields are specified, entity is returned with default view i.e. + // only the id, type and created time. + TimelineEntity result = reader.getEntity( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", "id_1"), + new TimelineDataToRetrieve(null, null, null, null)); + Assert.assertEquals( + (new TimelineEntity.Identifier("app", "id_1")).toString(), + result.getIdentifier().toString()); + Assert.assertEquals((Long)1425016502000L, result.getCreatedTime()); + Assert.assertEquals(0, result.getConfigs().size()); + Assert.assertEquals(0, result.getMetrics().size()); + } + + @Test + public void testGetEntityByClusterAndApp() throws Exception { + // Cluster and AppId should be enough to get an entity. + TimelineEntity result = reader.getEntity( + new TimelineReaderContext("cluster1", null, null, null, "app1", "app", + "id_1"), + new TimelineDataToRetrieve(null, null, null, null)); + Assert.assertEquals( + (new TimelineEntity.Identifier("app", "id_1")).toString(), + result.getIdentifier().toString()); + Assert.assertEquals((Long)1425016502000L, result.getCreatedTime()); + Assert.assertEquals(0, result.getConfigs().size()); + Assert.assertEquals(0, result.getMetrics().size()); + } + + /** This test checks whether we can handle commas in app flow mapping csv */ + @Test + public void testAppFlowMappingCsv() throws Exception { + // Test getting an entity by cluster and app where flow entry + // in app flow mapping csv has commas. + TimelineEntity result = reader.getEntity( + new TimelineReaderContext("cluster1", null, null, null, "app2", + "app", "id_5"), + new TimelineDataToRetrieve(null, null, null, null)); + Assert.assertEquals( + (new TimelineEntity.Identifier("app", "id_5")).toString(), + result.getIdentifier().toString()); + Assert.assertEquals((Long)1425016502050L, result.getCreatedTime()); + } + + @Test + public void testGetEntityCustomFields() throws Exception { + // Specified fields in addition to default view will be returned. + TimelineEntity result = reader.getEntity( + new TimelineReaderContext("cluster1","user1", "flow1", 1L, "app1", + "app", "id_1"), + new TimelineDataToRetrieve(null, null, + EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS), null)); + Assert.assertEquals( + (new TimelineEntity.Identifier("app", "id_1")).toString(), + result.getIdentifier().toString()); + Assert.assertEquals((Long)1425016502000L, result.getCreatedTime()); + Assert.assertEquals(3, result.getConfigs().size()); + Assert.assertEquals(3, result.getMetrics().size()); + Assert.assertEquals(2, result.getInfo().size()); + // No events will be returned + Assert.assertEquals(0, result.getEvents().size()); + } + + @Test + public void testGetEntityAllFields() throws Exception { + // All fields of TimelineEntity will be returned. + TimelineEntity result = reader.getEntity( + new TimelineReaderContext("cluster1","user1", "flow1", 1L, "app1", + "app", "id_1"), + new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); + Assert.assertEquals( + (new TimelineEntity.Identifier("app", "id_1")).toString(), + result.getIdentifier().toString()); + Assert.assertEquals((Long)1425016502000L, result.getCreatedTime()); + Assert.assertEquals(3, result.getConfigs().size()); + Assert.assertEquals(3, result.getMetrics().size()); + // All fields including events will be returned. + Assert.assertEquals(2, result.getEvents().size()); + } + + @Test + public void testGetAllEntities() throws Exception { + Set result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), new TimelineEntityFilters(), + new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); + // All 4 entities will be returned + Assert.assertEquals(4, result.size()); + } + + @Test + public void testGetEntitiesWithLimit() throws Exception { + Set result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(2L, null, null, null, null, null, null, + null, null), new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + // Needs to be rewritten once hashcode and equals for + // TimelineEntity is implemented + // Entities with id_1 and id_4 should be returned, + // based on created time, descending. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) { + Assert.fail("Entity not sorted by created time"); + } + } + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(3L, null, null, null, null, null, null, + null, null), new TimelineDataToRetrieve()); + // Even though 2 entities out of 4 have same created time, one entity + // is left out due to limit + Assert.assertEquals(3, result.size()); + } + + @Test + public void testGetEntitiesByTimeWindows() throws Exception { + // Get entities based on created time start and end time range. + Set result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, 1425016502030L, 1425016502060L, null, + null, null, null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + // Only one entity with ID id_4 should be returned. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_4")) { + Assert.fail("Incorrect filtering based on created time range"); + } + } + + // Get entities if only created time end is specified. + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, 1425016502010L, null, null, + null, null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(3, result.size()); + for (TimelineEntity entity : result) { + if (entity.getId().equals("id_4")) { + Assert.fail("Incorrect filtering based on created time range"); + } + } + + // Get entities if only created time start is specified. + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, 1425016502010L, null, null, null, + null, null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_4")) { + Assert.fail("Incorrect filtering based on created time range"); + } + } + } + + @Test + public void testGetFilteredEntities() throws Exception { + // Get entities based on info filters. + TimelineFilterList infoFilterList = new TimelineFilterList(); + infoFilterList.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5)); + Set result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, infoFilterList, + null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + // Only one entity with ID id_3 should be returned. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_3")) { + Assert.fail("Incorrect filtering based on info filters"); + } + } + + // Get entities based on config filters. + TimelineFilterList confFilterList = new TimelineFilterList(); + confFilterList.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", "123")); + confFilterList.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "abc")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, + confFilterList, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_3")) { + Assert.fail("Incorrect filtering based on config filters"); + } + } + + // Get entities based on event filters. + TimelineFilterList eventFilters = new TimelineFilterList(); + eventFilters.addFilter( + new TimelineExistsFilter(TimelineCompareOp.EQUAL,"event_2")); + eventFilters.addFilter( + new TimelineExistsFilter(TimelineCompareOp.EQUAL,"event_4")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + null, eventFilters), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_3")) { + Assert.fail("Incorrect filtering based on event filters"); + } + } + + // Get entities based on metric filters. + TimelineFilterList metricFilterList = new TimelineFilterList(); + metricFilterList.addFilter(new TimelineCompareFilter( + TimelineCompareOp.GREATER_OR_EQUAL, "metric3", 0L)); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + // Two entities with IDs' id_1 and id_2 should be returned. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on metric filters"); + } + } + + // Get entities based on complex config filters. + TimelineFilterList list1 = new TimelineFilterList(); + list1.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", "129")); + list1.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "def")); + TimelineFilterList list2 = new TimelineFilterList(); + list2.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23")); + list2.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "abc")); + TimelineFilterList confFilterList1 = + new TimelineFilterList(Operator.OR, list1, list2); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, + confFilterList1, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on config filters"); + } + } + + TimelineFilterList list3 = new TimelineFilterList(); + list3.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.NOT_EQUAL, "config_1", "123")); + list3.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.NOT_EQUAL, "config_3", "abc")); + TimelineFilterList list4 = new TimelineFilterList(); + list4.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23")); + TimelineFilterList confFilterList2 = + new TimelineFilterList(Operator.OR, list3, list4); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, + confFilterList2, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on config filters"); + } + } + + TimelineFilterList confFilterList3 = new TimelineFilterList(); + confFilterList3.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.NOT_EQUAL, "config_1", "127")); + confFilterList3.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.NOT_EQUAL, "config_3", "abc")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, + confFilterList3, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + for(TimelineEntity entity : result) { + if (!entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on config filters"); + } + } + + TimelineFilterList confFilterList4 = new TimelineFilterList(); + confFilterList4.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.EQUAL, "config_dummy", "dummy")); + confFilterList4.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.EQUAL, "config_3", "def")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, + confFilterList4, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(0, result.size()); + + TimelineFilterList confFilterList5 = new TimelineFilterList(Operator.OR); + confFilterList5.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.EQUAL, "config_dummy", "dummy")); + confFilterList5.addFilter(new TimelineKeyValueFilter( + TimelineCompareOp.EQUAL, "config_3", "def")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, + confFilterList5, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on config filters"); + } + } + + // Get entities based on complex metric filters. + TimelineFilterList list6 = new TimelineFilterList(); + list6.addFilter(new TimelineCompareFilter( + TimelineCompareOp.GREATER_THAN, "metric1", 200)); + list6.addFilter(new TimelineCompareFilter( + TimelineCompareOp.EQUAL, "metric3", 23)); + TimelineFilterList list7 = new TimelineFilterList(); + list7.addFilter(new TimelineCompareFilter( + TimelineCompareOp.GREATER_OR_EQUAL, "metric2", 74)); + TimelineFilterList metricFilterList1 = + new TimelineFilterList(Operator.OR, list6, list7); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList1, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + // Two entities with IDs' id_2 and id_3 should be returned. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_2") && !entity.getId().equals("id_3")) { + Assert.fail("Incorrect filtering based on metric filters"); + } + } + + TimelineFilterList metricFilterList2 = new TimelineFilterList(); + metricFilterList2.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_THAN, "metric2", 70)); + metricFilterList2.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23)); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList2, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1")) { + Assert.fail("Incorrect filtering based on metric filters"); + } + } + + TimelineFilterList metricFilterList3 = new TimelineFilterList(); + metricFilterList3.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_THAN, "dummy_metric", 30)); + metricFilterList3.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23)); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList3, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(0, result.size()); + + TimelineFilterList metricFilterList4 = new TimelineFilterList(Operator.OR); + metricFilterList4.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_THAN, "dummy_metric", 30)); + metricFilterList4.addFilter(new TimelineCompareFilter( + TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23)); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList4, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on metric filters"); + } + } + + TimelineFilterList metricFilterList5 = + new TimelineFilterList(new TimelineCompareFilter( + TimelineCompareOp.NOT_EQUAL, "metric2", 74)); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, null, null, + metricFilterList5, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { + Assert.fail("Incorrect filtering based on metric filters"); + } + } + + TimelineFilterList infoFilterList1 = new TimelineFilterList(); + infoFilterList1.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5)); + infoFilterList1.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.NOT_EQUAL, "info4", 20)); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, infoFilterList1, + null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(0, result.size()); + + TimelineFilterList infoFilterList2 = new TimelineFilterList(Operator.OR); + infoFilterList2.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5)); + infoFilterList2.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info1", "val1")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, infoFilterList2, + null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) { + Assert.fail("Incorrect filtering based on info filters"); + } + } + + TimelineFilterList infoFilterList3 = new TimelineFilterList(); + infoFilterList3.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1)); + infoFilterList3.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, infoFilterList3, + null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(0, result.size()); + + TimelineFilterList infoFilterList4 = new TimelineFilterList(Operator.OR); + infoFilterList4.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1)); + infoFilterList4.addFilter( + new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5")); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, null, infoFilterList4, + null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1")) { + Assert.fail("Incorrect filtering based on info filters"); + } + } + } + + @Test + public void testGetEntitiesByRelations() throws Exception { + // Get entities based on relatesTo. + TimelineFilterList relatesTo = new TimelineFilterList(Operator.OR); + Set relatesToIds = + new HashSet(Arrays.asList((Object)"flow1")); + relatesTo.addFilter(new TimelineKeyValuesFilter( + TimelineCompareOp.EQUAL, "flow", relatesToIds)); + Set result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, relatesTo, null, null, + null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(1, result.size()); + // Only one entity with ID id_1 should be returned. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1")) { + Assert.fail("Incorrect filtering based on relatesTo"); + } + } + + // Get entities based on isRelatedTo. + TimelineFilterList isRelatedTo = new TimelineFilterList(Operator.OR); + Set isRelatedToIds = + new HashSet(Arrays.asList((Object)"tid1_2")); + isRelatedTo.addFilter(new TimelineKeyValuesFilter( + TimelineCompareOp.EQUAL, "type1", isRelatedToIds)); + result = reader.getEntities( + new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", + "app", null), + new TimelineEntityFilters(null, null, null, null, isRelatedTo, null, + null, null, null), + new TimelineDataToRetrieve()); + Assert.assertEquals(2, result.size()); + // Two entities with IDs' id_1 and id_3 should be returned. + for (TimelineEntity entity : result) { + if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) { + Assert.fail("Incorrect filtering based on isRelatedTo"); + } + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImplTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImplTest.java new file mode 100644 index 0000000..6b57cb4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImplTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.junit.Test; + +public class FileSystemTimelineWriterImplTest { + + /** + * Unit test for PoC YARN 3264 + * @throws Exception + */ + @Test + public void testWriteEntityToFile() throws Exception { + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "hello"; + String type = "world"; + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(1425016501000L); + te.addEntity(entity); + + TimelineMetric metric = new TimelineMetric(); + String metricId = "CPU"; + metric.setId(metricId); + metric.setType(TimelineMetric.Type.SINGLE_VALUE); + metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM); + metric.addValue(1425016501000L, 1234567L); + + TimelineEntity entity2 = new TimelineEntity(); + String id2 = "metric"; + String type2 = "app"; + entity2.setId(id2); + entity2.setType(type2); + entity2.setCreatedTime(1425016503000L); + entity2.addMetric(metric); + te.addEntity(entity2); + + Map aggregatedMetrics = + new HashMap(); + aggregatedMetrics.put(metricId, metric); + + TestFileSystemTimelineWriterImpl fsi = null; + try { + fsi = new TestFileSystemTimelineWriterImpl(); + fsi.init(new YarnConfiguration()); + fsi.start(); + fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L, + "app_id", te); + + String fileName = fsi.getOutputRoot() + + "/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/" + + type + "/" + id + + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + Path path = Paths.get(fileName); + File f = new File(fileName); + assertTrue(f.exists() && !f.isDirectory()); + List data = Files.readAllLines(path, StandardCharsets.UTF_8); + // ensure there's only one entity + 1 new line + assertTrue("data size is:" + data.size(), data.size() == 2); + String d = data.get(0); + // confirm the contents same as what was written + assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity)); + + // verify aggregated metrics + String fileName2 = fsi.getOutputRoot() + + "/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/" + + type2 + "/" + id2 + + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + Path path2 = Paths.get(fileName2); + File file = new File(fileName2); + assertTrue(file.exists() && !file.isDirectory()); + List data2 = Files.readAllLines(path2, StandardCharsets.UTF_8); + // ensure there's only one entity + 1 new line + assertTrue("data size is:" + data.size(), data2.size() == 2); + String metricToString = data2.get(0); + // confirm the contents same as what was written + assertEquals(metricToString, + TimelineUtils.dumpTimelineRecordtoJSON(entity2)); + + // delete the directory + File outputDir = new File(fsi.getOutputRoot()); + FileUtils.deleteDirectory(outputDir); + assertTrue(!(f.exists())); + } finally { + if (fsi != null) { + fsi.close(); + FileUtils.deleteDirectory(new File(fsi.getOutputRoot())); + } + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java index 2af7817..c210a3b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java @@ -18,22 +18,28 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; -import java.io.BufferedWriter; +import java.io.BufferedReader; import java.io.File; -import java.io.FileWriter; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; -import java.io.PrintWriter; -import java.util.Arrays; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.Comparator; import java.util.EnumSet; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; +import java.util.TreeMap; import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVPrinter; -import org.apache.commons.io.FileUtils; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; @@ -41,764 +47,359 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; -public class TestFileSystemTimelineReaderImpl { +import com.google.common.annotations.VisibleForTesting; - private static final String rootDir = - FileSystemTimelineReaderImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT; - FileSystemTimelineReaderImpl reader; - - @BeforeClass - public static void setup() throws Exception { - loadEntityData(); - // Create app flow mapping file. - CSVFormat format = - CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN"); - String appFlowMappingFile = rootDir + "/entities/cluster1/" + - FileSystemTimelineReaderImpl.APP_FLOW_MAPPING_FILE; - try (PrintWriter out = - new PrintWriter(new BufferedWriter( - new FileWriter(appFlowMappingFile, true))); - CSVPrinter printer = new CSVPrinter(out, format)){ - printer.printRecord("app1", "user1", "flow1", 1); - printer.printRecord("app2","user1","flow1,flow",1); - printer.close(); - } - (new File(rootDir)).deleteOnExit(); - } - - @AfterClass - public static void tearDown() throws Exception { - FileUtils.deleteDirectory(new File(rootDir)); - } - - @Before - public void init() throws Exception { - reader = new FileSystemTimelineReaderImpl(); - Configuration conf = new YarnConfiguration(); - conf.set(FileSystemTimelineReaderImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, - rootDir); - reader.init(conf); - } - - private static void writeEntityFile(TimelineEntity entity, File dir) - throws Exception { - if (!dir.exists()) { - if (!dir.mkdirs()) { - throw new IOException("Could not create directories for " + dir); - } - } - String fileName = dir.getAbsolutePath() + "/" + entity.getId() + ".thist"; - try (PrintWriter out = - new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)))){ - out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); - out.write("\n"); - out.close(); - } - } +/** + * File System based implementation for TimelineReader. + */ +public class TestFileSystemTimelineReaderImpl extends AbstractService + implements TimelineReader { - private static void loadEntityData() throws Exception { - File appDir = new File(rootDir + - "/entities/cluster1/user1/flow1/1/app1/app/"); - TimelineEntity entity11 = new TimelineEntity(); - entity11.setId("id_1"); - entity11.setType("app"); - entity11.setCreatedTime(1425016502000L); - Map info1 = new HashMap(); - info1.put("info1", "val1"); - info1.put("info2", "val5"); - entity11.addInfo(info1); - TimelineEvent event = new TimelineEvent(); - event.setId("event_1"); - event.setTimestamp(1425016502003L); - entity11.addEvent(event); - Set metrics = new HashSet(); - TimelineMetric metric1 = new TimelineMetric(); - metric1.setId("metric1"); - metric1.setType(TimelineMetric.Type.SINGLE_VALUE); - metric1.addValue(1425016502006L, 113); - metrics.add(metric1); - TimelineMetric metric2 = new TimelineMetric(); - metric2.setId("metric2"); - metric2.setType(TimelineMetric.Type.TIME_SERIES); - metric2.addValue(1425016502016L, 34); - metrics.add(metric2); - entity11.setMetrics(metrics); - Map configs = new HashMap(); - configs.put("config_1", "127"); - entity11.setConfigs(configs); - entity11.addRelatesToEntity("flow", "flow1"); - entity11.addIsRelatedToEntity("type1", "tid1_1"); - writeEntityFile(entity11, appDir); - TimelineEntity entity12 = new TimelineEntity(); - entity12.setId("id_1"); - entity12.setType("app"); - configs.clear(); - configs.put("config_2", "23"); - configs.put("config_3", "abc"); - entity12.addConfigs(configs); - metrics.clear(); - TimelineMetric metric12 = new TimelineMetric(); - metric12.setId("metric2"); - metric12.setType(TimelineMetric.Type.TIME_SERIES); - metric12.addValue(1425016502032L, 48); - metric12.addValue(1425016502054L, 51); - metrics.add(metric12); - TimelineMetric metric3 = new TimelineMetric(); - metric3.setId("metric3"); - metric3.setType(TimelineMetric.Type.SINGLE_VALUE); - metric3.addValue(1425016502060L, 23L); - metrics.add(metric3); - entity12.setMetrics(metrics); - entity12.addIsRelatedToEntity("type1", "tid1_2"); - entity12.addIsRelatedToEntity("type2", "tid2_1`"); - TimelineEvent event15 = new TimelineEvent(); - event15.setId("event_5"); - event15.setTimestamp(1425016502017L); - entity12.addEvent(event15); - writeEntityFile(entity12, appDir); + private static final Log LOG = + LogFactory.getLog(TestFileSystemTimelineReaderImpl.class); - TimelineEntity entity2 = new TimelineEntity(); - entity2.setId("id_2"); - entity2.setType("app"); - entity2.setCreatedTime(1425016501050L); - Map info2 = new HashMap(); - info1.put("info2", 4); - entity2.addInfo(info2); - Map configs2 = new HashMap(); - configs2.put("config_1", "129"); - configs2.put("config_3", "def"); - entity2.setConfigs(configs2); - TimelineEvent event2 = new TimelineEvent(); - event2.setId("event_2"); - event2.setTimestamp(1425016501003L); - entity2.addEvent(event2); - Set metrics2 = new HashSet(); - TimelineMetric metric21 = new TimelineMetric(); - metric21.setId("metric1"); - metric21.setType(TimelineMetric.Type.SINGLE_VALUE); - metric21.addValue(1425016501006L, 300); - metrics2.add(metric21); - TimelineMetric metric22 = new TimelineMetric(); - metric22.setId("metric2"); - metric22.setType(TimelineMetric.Type.TIME_SERIES); - metric22.addValue(1425016501056L, 31); - metric22.addValue(1425016501084L, 70); - metrics2.add(metric22); - TimelineMetric metric23 = new TimelineMetric(); - metric23.setId("metric3"); - metric23.setType(TimelineMetric.Type.SINGLE_VALUE); - metric23.addValue(1425016502060L, 23L); - metrics2.add(metric23); - entity2.setMetrics(metrics2); - entity2.addRelatesToEntity("flow", "flow2"); - writeEntityFile(entity2, appDir); + private String rootPath; + private static final String ENTITIES_DIR = "entities"; - TimelineEntity entity3 = new TimelineEntity(); - entity3.setId("id_3"); - entity3.setType("app"); - entity3.setCreatedTime(1425016501050L); - Map info3 = new HashMap(); - info3.put("info2", 3.5); - info3.put("info4", 20); - entity3.addInfo(info3); - Map configs3 = new HashMap(); - configs3.put("config_1", "123"); - configs3.put("config_3", "abc"); - entity3.setConfigs(configs3); - TimelineEvent event3 = new TimelineEvent(); - event3.setId("event_2"); - event3.setTimestamp(1425016501003L); - entity3.addEvent(event3); - TimelineEvent event4 = new TimelineEvent(); - event4.setId("event_4"); - event4.setTimestamp(1425016502006L); - entity3.addEvent(event4); - Set metrics3 = new HashSet(); - TimelineMetric metric31 = new TimelineMetric(); - metric31.setId("metric1"); - metric31.setType(TimelineMetric.Type.SINGLE_VALUE); - metric31.addValue(1425016501006L, 124); - metrics3.add(metric31); - TimelineMetric metric32 = new TimelineMetric(); - metric32.setId("metric2"); - metric32.setType(TimelineMetric.Type.TIME_SERIES); - metric32.addValue(1425016501056L, 31); - metric32.addValue(1425016501084L, 74); - metrics3.add(metric32); - entity3.setMetrics(metrics3); - entity3.addIsRelatedToEntity("type1", "tid1_2"); - writeEntityFile(entity3, appDir); + /** Default extension for output files. */ + private static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; - TimelineEntity entity4 = new TimelineEntity(); - entity4.setId("id_4"); - entity4.setType("app"); - entity4.setCreatedTime(1425016502050L); - TimelineEvent event44 = new TimelineEvent(); - event44.setId("event_4"); - event44.setTimestamp(1425016502003L); - entity4.addEvent(event44); - writeEntityFile(entity4, appDir); + @VisibleForTesting + /** Default extension for output files. */ + static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv"; - File appDir2 = new File(rootDir + - "/entities/cluster1/user1/flow1,flow/1/app2/app/"); - TimelineEntity entity5 = new TimelineEntity(); - entity5.setId("id_5"); - entity5.setType("app"); - entity5.setCreatedTime(1425016502050L); - writeEntityFile(entity5, appDir2); - } + @VisibleForTesting + /** Config param for timeline service file system storage root. */ + static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; - public TimelineReader getTimelineReader() { - return reader; - } + @VisibleForTesting + /** Default value for storage location on local disk. */ + static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT = + "/tmp/timeline_service_data"; - @Test - public void testGetEntityDefaultView() throws Exception { - // If no fields are specified, entity is returned with default view i.e. - // only the id, type and created time. - TimelineEntity result = reader.getEntity( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", "id_1"), - new TimelineDataToRetrieve(null, null, null, null)); - Assert.assertEquals( - (new TimelineEntity.Identifier("app", "id_1")).toString(), - result.getIdentifier().toString()); - Assert.assertEquals((Long)1425016502000L, result.getCreatedTime()); - Assert.assertEquals(0, result.getConfigs().size()); - Assert.assertEquals(0, result.getMetrics().size()); - } + private final CSVFormat csvFormat = + CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN"); - @Test - public void testGetEntityByClusterAndApp() throws Exception { - // Cluster and AppId should be enough to get an entity. - TimelineEntity result = reader.getEntity( - new TimelineReaderContext("cluster1", null, null, null, "app1", "app", - "id_1"), - new TimelineDataToRetrieve(null, null, null, null)); - Assert.assertEquals( - (new TimelineEntity.Identifier("app", "id_1")).toString(), - result.getIdentifier().toString()); - Assert.assertEquals((Long)1425016502000L, result.getCreatedTime()); - Assert.assertEquals(0, result.getConfigs().size()); - Assert.assertEquals(0, result.getMetrics().size()); + public TestFileSystemTimelineReaderImpl() { + super(TestFileSystemTimelineReaderImpl.class.getName()); } - /** This test checks whether we can handle commas in app flow mapping csv */ - @Test - public void testAppFlowMappingCsv() throws Exception { - // Test getting an entity by cluster and app where flow entry - // in app flow mapping csv has commas. - TimelineEntity result = reader.getEntity( - new TimelineReaderContext("cluster1", null, null, null, "app2", - "app", "id_5"), - new TimelineDataToRetrieve(null, null, null, null)); - Assert.assertEquals( - (new TimelineEntity.Identifier("app", "id_5")).toString(), - result.getIdentifier().toString()); - Assert.assertEquals((Long)1425016502050L, result.getCreatedTime()); + @VisibleForTesting + String getRootPath() { + return rootPath; } - @Test - public void testGetEntityCustomFields() throws Exception { - // Specified fields in addition to default view will be returned. - TimelineEntity result = reader.getEntity( - new TimelineReaderContext("cluster1","user1", "flow1", 1L, "app1", - "app", "id_1"), - new TimelineDataToRetrieve(null, null, - EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS), null)); - Assert.assertEquals( - (new TimelineEntity.Identifier("app", "id_1")).toString(), - result.getIdentifier().toString()); - Assert.assertEquals((Long)1425016502000L, result.getCreatedTime()); - Assert.assertEquals(3, result.getConfigs().size()); - Assert.assertEquals(3, result.getMetrics().size()); - Assert.assertEquals(2, result.getInfo().size()); - // No events will be returned - Assert.assertEquals(0, result.getEvents().size()); - } + private static ObjectMapper mapper; - @Test - public void testGetEntityAllFields() throws Exception { - // All fields of TimelineEntity will be returned. - TimelineEntity result = reader.getEntity( - new TimelineReaderContext("cluster1","user1", "flow1", 1L, "app1", - "app", "id_1"), - new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); - Assert.assertEquals( - (new TimelineEntity.Identifier("app", "id_1")).toString(), - result.getIdentifier().toString()); - Assert.assertEquals((Long)1425016502000L, result.getCreatedTime()); - Assert.assertEquals(3, result.getConfigs().size()); - Assert.assertEquals(3, result.getMetrics().size()); - // All fields including events will be returned. - Assert.assertEquals(2, result.getEvents().size()); + static { + mapper = new ObjectMapper(); + YarnJacksonJaxbJsonProvider.configObjectMapper(mapper); } - @Test - public void testGetAllEntities() throws Exception { - Set result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), new TimelineEntityFilters(), - new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null)); - // All 4 entities will be returned - Assert.assertEquals(4, result.size()); + /** + * Deserialize a POJO object from a JSON string. + * + * @param Describes the type of class to be returned. + * @param clazz class to be deserialized. + * @param jsonString JSON string to deserialize. + * @return An object based on class type. Used typically for + * TimelineEntity object. + * @throws IOException if the underlying input source has problems during + * parsing. + * @throws JsonMappingException if parser has problems parsing content. + * @throws JsonGenerationException if there is a problem in JSON writing. + */ + public static T getTimelineRecordFromJSON( + String jsonString, Class clazz) + throws JsonGenerationException, JsonMappingException, IOException { + return mapper.readValue(jsonString, clazz); } - @Test - public void testGetEntitiesWithLimit() throws Exception { - Set result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(2L, null, null, null, null, null, null, - null, null), new TimelineDataToRetrieve()); - Assert.assertEquals(2, result.size()); - // Needs to be rewritten once hashcode and equals for - // TimelineEntity is implemented - // Entities with id_1 and id_4 should be returned, - // based on created time, descending. - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) { - Assert.fail("Entity not sorted by created time"); + private static void fillFields(TimelineEntity finalEntity, + TimelineEntity real, EnumSet fields) { + if (fields.contains(Field.ALL)) { + fields = EnumSet.allOf(Field.class); + } + for (Field field : fields) { + switch(field) { + case CONFIGS: + finalEntity.setConfigs(real.getConfigs()); + break; + case METRICS: + finalEntity.setMetrics(real.getMetrics()); + break; + case INFO: + finalEntity.setInfo(real.getInfo()); + break; + case IS_RELATED_TO: + finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); + break; + case RELATES_TO: + finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities()); + break; + case EVENTS: + finalEntity.setEvents(real.getEvents()); + break; + default: + continue; } } - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(3L, null, null, null, null, null, null, - null, null), new TimelineDataToRetrieve()); - // Even though 2 entities out of 4 have same created time, one entity - // is left out due to limit - Assert.assertEquals(3, result.size()); } - @Test - public void testGetEntitiesByTimeWindows() throws Exception { - // Get entities based on created time start and end time range. - Set result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, 1425016502030L, 1425016502060L, null, - null, null, null, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(1, result.size()); - // Only one entity with ID id_4 should be returned. - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_4")) { - Assert.fail("Incorrect filtering based on created time range"); - } + private String getFlowRunPath(String userId, String clusterId, + String flowName, Long flowRunId, String appId) throws IOException { + if (userId != null && flowName != null && flowRunId != null) { + return userId + "/" + flowName + "/" + flowRunId; } - - // Get entities if only created time end is specified. - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, 1425016502010L, null, null, - null, null, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(3, result.size()); - for (TimelineEntity entity : result) { - if (entity.getId().equals("id_4")) { - Assert.fail("Incorrect filtering based on created time range"); - } + if (clusterId == null || appId == null) { + throw new IOException("Unable to get flow info"); } - - // Get entities if only created time start is specified. - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, 1425016502010L, null, null, null, - null, null, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(1, result.size()); - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_4")) { - Assert.fail("Incorrect filtering based on created time range"); + String appFlowMappingFile = rootPath + "/" + ENTITIES_DIR + "/" + + clusterId + "/" + APP_FLOW_MAPPING_FILE; + try (BufferedReader reader = + new BufferedReader(new InputStreamReader( + new FileInputStream( + appFlowMappingFile), Charset.forName("UTF-8"))); + CSVParser parser = new CSVParser(reader, csvFormat)) { + for (CSVRecord record : parser.getRecords()) { + if (record.size() < 4) { + continue; + } + String applicationId = record.get("APP"); + if (applicationId != null && !applicationId.trim().isEmpty() && + !applicationId.trim().equals(appId)) { + continue; + } + return record.get(1).trim() + "/" + record.get(2).trim() + "/" + + record.get(3).trim(); } + parser.close(); } + throw new IOException("Unable to get flow info"); } - @Test - public void testGetFilteredEntities() throws Exception { - // Get entities based on info filters. - TimelineFilterList infoFilterList = new TimelineFilterList(); - infoFilterList.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5)); - Set result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, infoFilterList, - null, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(1, result.size()); - // Only one entity with ID id_3 should be returned. - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_3")) { - Assert.fail("Incorrect filtering based on info filters"); - } + private static TimelineEntity createEntityToBeReturned(TimelineEntity entity, + EnumSet fieldsToRetrieve) { + TimelineEntity entityToBeReturned = new TimelineEntity(); + entityToBeReturned.setIdentifier(entity.getIdentifier()); + entityToBeReturned.setCreatedTime(entity.getCreatedTime()); + if (fieldsToRetrieve != null) { + fillFields(entityToBeReturned, entity, fieldsToRetrieve); } + return entityToBeReturned; + } - // Get entities based on config filters. - TimelineFilterList confFilterList = new TimelineFilterList(); - confFilterList.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", "123")); - confFilterList.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "abc")); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, null, - confFilterList, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(1, result.size()); - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_3")) { - Assert.fail("Incorrect filtering based on config filters"); - } - } + private static boolean isTimeInRange(Long time, Long timeBegin, + Long timeEnd) { + return (time >= timeBegin) && (time <= timeEnd); + } - // Get entities based on event filters. - TimelineFilterList eventFilters = new TimelineFilterList(); - eventFilters.addFilter( - new TimelineExistsFilter(TimelineCompareOp.EQUAL,"event_2")); - eventFilters.addFilter( - new TimelineExistsFilter(TimelineCompareOp.EQUAL,"event_4")); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, null, null, - null, eventFilters), - new TimelineDataToRetrieve()); - Assert.assertEquals(1, result.size()); - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_3")) { - Assert.fail("Incorrect filtering based on event filters"); - } + private static void mergeEntities(TimelineEntity entity1, + TimelineEntity entity2) { + // Ideally created time wont change except in the case of issue from client. + if (entity2.getCreatedTime() != null && entity2.getCreatedTime() > 0) { + entity1.setCreatedTime(entity2.getCreatedTime()); } - - // Get entities based on metric filters. - TimelineFilterList metricFilterList = new TimelineFilterList(); - metricFilterList.addFilter(new TimelineCompareFilter( - TimelineCompareOp.GREATER_OR_EQUAL, "metric3", 0L)); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, null, null, - metricFilterList, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(2, result.size()); - // Two entities with IDs' id_1 and id_2 should be returned. - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { - Assert.fail("Incorrect filtering based on metric filters"); - } + for (Entry configEntry : entity2.getConfigs().entrySet()) { + entity1.addConfig(configEntry.getKey(), configEntry.getValue()); } - - // Get entities based on complex config filters. - TimelineFilterList list1 = new TimelineFilterList(); - list1.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", "129")); - list1.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "def")); - TimelineFilterList list2 = new TimelineFilterList(); - list2.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23")); - list2.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "abc")); - TimelineFilterList confFilterList1 = - new TimelineFilterList(Operator.OR, list1, list2); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, null, - confFilterList1, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(2, result.size()); - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { - Assert.fail("Incorrect filtering based on config filters"); - } + for (Entry infoEntry : entity2.getInfo().entrySet()) { + entity1.addInfo(infoEntry.getKey(), infoEntry.getValue()); } - - TimelineFilterList list3 = new TimelineFilterList(); - list3.addFilter(new TimelineKeyValueFilter( - TimelineCompareOp.NOT_EQUAL, "config_1", "123")); - list3.addFilter(new TimelineKeyValueFilter( - TimelineCompareOp.NOT_EQUAL, "config_3", "abc")); - TimelineFilterList list4 = new TimelineFilterList(); - list4.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23")); - TimelineFilterList confFilterList2 = - new TimelineFilterList(Operator.OR, list3, list4); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, null, - confFilterList2, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(2, result.size()); - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { - Assert.fail("Incorrect filtering based on config filters"); + for (Entry> isRelatedToEntry : + entity2.getIsRelatedToEntities().entrySet()) { + String type = isRelatedToEntry.getKey(); + for (String entityId : isRelatedToEntry.getValue()) { + entity1.addIsRelatedToEntity(type, entityId); } } - - TimelineFilterList confFilterList3 = new TimelineFilterList(); - confFilterList3.addFilter(new TimelineKeyValueFilter( - TimelineCompareOp.NOT_EQUAL, "config_1", "127")); - confFilterList3.addFilter(new TimelineKeyValueFilter( - TimelineCompareOp.NOT_EQUAL, "config_3", "abc")); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, null, - confFilterList3, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(1, result.size()); - for(TimelineEntity entity : result) { - if (!entity.getId().equals("id_2")) { - Assert.fail("Incorrect filtering based on config filters"); + for (Entry> relatesToEntry : + entity2.getRelatesToEntities().entrySet()) { + String type = relatesToEntry.getKey(); + for (String entityId : relatesToEntry.getValue()) { + entity1.addRelatesToEntity(type, entityId); } } - - TimelineFilterList confFilterList4 = new TimelineFilterList(); - confFilterList4.addFilter(new TimelineKeyValueFilter( - TimelineCompareOp.EQUAL, "config_dummy", "dummy")); - confFilterList4.addFilter(new TimelineKeyValueFilter( - TimelineCompareOp.EQUAL, "config_3", "def")); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, null, - confFilterList4, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(0, result.size()); - - TimelineFilterList confFilterList5 = new TimelineFilterList(Operator.OR); - confFilterList5.addFilter(new TimelineKeyValueFilter( - TimelineCompareOp.EQUAL, "config_dummy", "dummy")); - confFilterList5.addFilter(new TimelineKeyValueFilter( - TimelineCompareOp.EQUAL, "config_3", "def")); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, null, - confFilterList5, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(1, result.size()); - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_2")) { - Assert.fail("Incorrect filtering based on config filters"); - } + for (TimelineEvent event : entity2.getEvents()) { + entity1.addEvent(event); } - - // Get entities based on complex metric filters. - TimelineFilterList list6 = new TimelineFilterList(); - list6.addFilter(new TimelineCompareFilter( - TimelineCompareOp.GREATER_THAN, "metric1", 200)); - list6.addFilter(new TimelineCompareFilter( - TimelineCompareOp.EQUAL, "metric3", 23)); - TimelineFilterList list7 = new TimelineFilterList(); - list7.addFilter(new TimelineCompareFilter( - TimelineCompareOp.GREATER_OR_EQUAL, "metric2", 74)); - TimelineFilterList metricFilterList1 = - new TimelineFilterList(Operator.OR, list6, list7); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, null, null, - metricFilterList1, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(2, result.size()); - // Two entities with IDs' id_2 and id_3 should be returned. - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_2") && !entity.getId().equals("id_3")) { - Assert.fail("Incorrect filtering based on metric filters"); + for (TimelineMetric metric2 : entity2.getMetrics()) { + boolean found = false; + for (TimelineMetric metric1 : entity1.getMetrics()) { + if (metric1.getId().equals(metric2.getId())) { + metric1.addValues(metric2.getValues()); + found = true; + break; + } } - } - - TimelineFilterList metricFilterList2 = new TimelineFilterList(); - metricFilterList2.addFilter(new TimelineCompareFilter( - TimelineCompareOp.LESS_THAN, "metric2", 70)); - metricFilterList2.addFilter(new TimelineCompareFilter( - TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23)); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, null, null, - metricFilterList2, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(1, result.size()); - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_1")) { - Assert.fail("Incorrect filtering based on metric filters"); + if (!found) { + entity1.addMetric(metric2); } } + } - TimelineFilterList metricFilterList3 = new TimelineFilterList(); - metricFilterList3.addFilter(new TimelineCompareFilter( - TimelineCompareOp.LESS_THAN, "dummy_metric", 30)); - metricFilterList3.addFilter(new TimelineCompareFilter( - TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23)); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, null, null, - metricFilterList3, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(0, result.size()); - - TimelineFilterList metricFilterList4 = new TimelineFilterList(Operator.OR); - metricFilterList4.addFilter(new TimelineCompareFilter( - TimelineCompareOp.LESS_THAN, "dummy_metric", 30)); - metricFilterList4.addFilter(new TimelineCompareFilter( - TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23)); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, null, null, - metricFilterList4, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(2, result.size()); - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { - Assert.fail("Incorrect filtering based on metric filters"); + private static TimelineEntity readEntityFromFile(BufferedReader reader) + throws IOException { + TimelineEntity entity = + getTimelineRecordFromJSON(reader.readLine(), TimelineEntity.class); + String entityStr = ""; + while ((entityStr = reader.readLine()) != null) { + if (entityStr.trim().isEmpty()) { + continue; } - } - - TimelineFilterList metricFilterList5 = - new TimelineFilterList(new TimelineCompareFilter( - TimelineCompareOp.NOT_EQUAL, "metric2", 74)); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, null, null, - metricFilterList5, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(2, result.size()); - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) { - Assert.fail("Incorrect filtering based on metric filters"); + TimelineEntity anotherEntity = + getTimelineRecordFromJSON(entityStr, TimelineEntity.class); + if (!entity.getId().equals(anotherEntity.getId()) || + !entity.getType().equals(anotherEntity.getType())) { + continue; } + mergeEntities(entity, anotherEntity); } + return entity; + } - TimelineFilterList infoFilterList1 = new TimelineFilterList(); - infoFilterList1.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5)); - infoFilterList1.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.NOT_EQUAL, "info4", 20)); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, infoFilterList1, - null, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(0, result.size()); - - TimelineFilterList infoFilterList2 = new TimelineFilterList(Operator.OR); - infoFilterList2.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5)); - infoFilterList2.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info1", "val1")); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, infoFilterList2, - null, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(2, result.size()); - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) { - Assert.fail("Incorrect filtering based on info filters"); + private Set getEntities(File dir, String entityType, + TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) + throws IOException { + // First sort the selected entities based on created/start time. + Map> sortedEntities = + new TreeMap<>( + new Comparator() { + @Override + public int compare(Long l1, Long l2) { + return l2.compareTo(l1); + } + } + ); + for (File entityFile : dir.listFiles()) { + if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) { + continue; } - } - - TimelineFilterList infoFilterList3 = new TimelineFilterList(); - infoFilterList3.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1)); - infoFilterList3.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5")); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, infoFilterList3, - null, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(0, result.size()); - - TimelineFilterList infoFilterList4 = new TimelineFilterList(Operator.OR); - infoFilterList4.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1)); - infoFilterList4.addFilter( - new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5")); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, null, infoFilterList4, - null, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(1, result.size()); - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_1")) { - Assert.fail("Incorrect filtering based on info filters"); + try (BufferedReader reader = + new BufferedReader( + new InputStreamReader( + new FileInputStream( + entityFile), Charset.forName("UTF-8")))) { + TimelineEntity entity = readEntityFromFile(reader); + if (!entity.getType().equals(entityType)) { + continue; + } + if (!isTimeInRange(entity.getCreatedTime(), + filters.getCreatedTimeBegin(), filters.getCreatedTimeEnd())) { + continue; + } + if (filters.getRelatesTo() != null && + !filters.getRelatesTo().getFilterList().isEmpty() && + !TimelineStorageUtils.matchRelatesTo(entity, + filters.getRelatesTo())) { + continue; + } + if (filters.getIsRelatedTo() != null && + !filters.getIsRelatedTo().getFilterList().isEmpty() && + !TimelineStorageUtils.matchIsRelatedTo(entity, + filters.getIsRelatedTo())) { + continue; + } + if (filters.getInfoFilters() != null && + !filters.getInfoFilters().getFilterList().isEmpty() && + !TimelineStorageUtils.matchInfoFilters(entity, + filters.getInfoFilters())) { + continue; + } + if (filters.getConfigFilters() != null && + !filters.getConfigFilters().getFilterList().isEmpty() && + !TimelineStorageUtils.matchConfigFilters(entity, + filters.getConfigFilters())) { + continue; + } + if (filters.getMetricFilters() != null && + !filters.getMetricFilters().getFilterList().isEmpty() && + !TimelineStorageUtils.matchMetricFilters(entity, + filters.getMetricFilters())) { + continue; + } + if (filters.getEventFilters() != null && + !filters.getEventFilters().getFilterList().isEmpty() && + !TimelineStorageUtils.matchEventFilters(entity, + filters.getEventFilters())) { + continue; + } + TimelineEntity entityToBeReturned = createEntityToBeReturned( + entity, dataToRetrieve.getFieldsToRetrieve()); + Set entitiesCreatedAtSameTime = + sortedEntities.get(entityToBeReturned.getCreatedTime()); + if (entitiesCreatedAtSameTime == null) { + entitiesCreatedAtSameTime = new HashSet(); + } + entitiesCreatedAtSameTime.add(entityToBeReturned); + sortedEntities.put( + entityToBeReturned.getCreatedTime(), entitiesCreatedAtSameTime); } } - } - @Test - public void testGetEntitiesByRelations() throws Exception { - // Get entities based on relatesTo. - TimelineFilterList relatesTo = new TimelineFilterList(Operator.OR); - Set relatesToIds = - new HashSet(Arrays.asList((Object)"flow1")); - relatesTo.addFilter(new TimelineKeyValuesFilter( - TimelineCompareOp.EQUAL, "flow", relatesToIds)); - Set result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, relatesTo, null, null, - null, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(1, result.size()); - // Only one entity with ID id_1 should be returned. - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_1")) { - Assert.fail("Incorrect filtering based on relatesTo"); + Set entities = new HashSet(); + long entitiesAdded = 0; + for (Set entitySet : sortedEntities.values()) { + for (TimelineEntity entity : entitySet) { + entities.add(entity); + ++entitiesAdded; + if (entitiesAdded >= filters.getLimit()) { + return entities; + } } } + return entities; + } - // Get entities based on isRelatedTo. - TimelineFilterList isRelatedTo = new TimelineFilterList(Operator.OR); - Set isRelatedToIds = - new HashSet(Arrays.asList((Object)"tid1_2")); - isRelatedTo.addFilter(new TimelineKeyValuesFilter( - TimelineCompareOp.EQUAL, "type1", isRelatedToIds)); - result = reader.getEntities( - new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1", - "app", null), - new TimelineEntityFilters(null, null, null, null, isRelatedTo, null, - null, null, null), - new TimelineDataToRetrieve()); - Assert.assertEquals(2, result.size()); - // Two entities with IDs' id_1 and id_3 should be returned. - for (TimelineEntity entity : result) { - if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) { - Assert.fail("Incorrect filtering based on isRelatedTo"); - } + @Override + public void serviceInit(Configuration conf) throws Exception { + rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, + DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); + super.serviceInit(conf); + } + + @Override + public TimelineEntity getEntity(TimelineReaderContext context, + TimelineDataToRetrieve dataToRetrieve) throws IOException { + String flowRunPath = getFlowRunPath(context.getUserId(), + context.getClusterId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId()); + File dir = new File(new File(rootPath, ENTITIES_DIR), + context.getClusterId() + "/" + flowRunPath + "/" + context.getAppId() + + "/" + context.getEntityType()); + File entityFile = new File( + dir, context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION); + try (BufferedReader reader = + new BufferedReader(new InputStreamReader( + new FileInputStream(entityFile), Charset.forName("UTF-8")))) { + TimelineEntity entity = readEntityFromFile(reader); + return createEntityToBeReturned( + entity, dataToRetrieve.getFieldsToRetrieve()); + } catch (FileNotFoundException e) { + LOG.info("Cannot find entity {id:" + context.getEntityId() + " , type:" + + context.getEntityType() + "}. Will send HTTP 404 in response."); + return null; } } -} + + @Override + public Set getEntities(TimelineReaderContext context, + TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) + throws IOException { + String flowRunPath = getFlowRunPath(context.getUserId(), + context.getClusterId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId()); + File dir = + new File(new File(rootPath, ENTITIES_DIR), + context.getClusterId() + "/" + flowRunPath + "/" + + context.getAppId() + "/" + context.getEntityType()); + return getEntities(dir, context.getEntityType(), filters, dataToRetrieve); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 2f79daa..98ae2b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -17,112 +17,140 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import java.io.BufferedWriter; import java.io.File; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; -import org.junit.Test; - -public class TestFileSystemTimelineWriterImpl { - - /** - * Unit test for PoC YARN 3264 - * @throws Exception - */ - @Test - public void testWriteEntityToFile() throws Exception { - TimelineEntities te = new TimelineEntities(); - TimelineEntity entity = new TimelineEntity(); - String id = "hello"; - String type = "world"; - entity.setId(id); - entity.setType(type); - entity.setCreatedTime(1425016501000L); - te.addEntity(entity); - - TimelineMetric metric = new TimelineMetric(); - String metricId = "CPU"; - metric.setId(metricId); - metric.setType(TimelineMetric.Type.SINGLE_VALUE); - metric.setRealtimeAggregationOp(TimelineMetricOperation.SUM); - metric.addValue(1425016501000L, 1234567L); - - TimelineEntity entity2 = new TimelineEntity(); - String id2 = "metric"; - String type2 = "app"; - entity2.setId(id2); - entity2.setType(type2); - entity2.setCreatedTime(1425016503000L); - entity2.addMetric(metric); - te.addEntity(entity2); - - Map aggregatedMetrics = - new HashMap(); - aggregatedMetrics.put(metricId, metric); - - FileSystemTimelineWriterImpl fsi = null; + +/** + * This implements a local file based backend for storing application timeline + * information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TestFileSystemTimelineWriterImpl extends AbstractService + implements TimelineWriter { + + private String outputRoot; + + /** Config param for timeline service storage tmp root for FILE YARN-3264. */ + public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT + = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; + + /** default value for storage location on local disk. */ + public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT + = "/tmp/timeline_service_data"; + + public static final String ENTITIES_DIR = "entities"; + + /** Default extension for output files. */ + public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; + + TestFileSystemTimelineWriterImpl() { + super((TestFileSystemTimelineWriterImpl.class.getName())); + } + + @Override + public TimelineWriteResponse write(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntities entities) throws IOException { + TimelineWriteResponse response = new TimelineWriteResponse(); + for (TimelineEntity entity : entities.getEntities()) { + write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity, + response); + } + return response; + } + + private synchronized void write(String clusterId, String userId, + String flowName, String flowVersion, long flowRun, String appId, + TimelineEntity entity, TimelineWriteResponse response) + throws IOException { + PrintWriter out = null; try { - fsi = new FileSystemTimelineWriterImpl(); - fsi.init(new YarnConfiguration()); - fsi.start(); - fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L, - "app_id", te); - - String fileName = fsi.getOutputRoot() + - "/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/" + - type + "/" + id + - FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - Path path = Paths.get(fileName); - File f = new File(fileName); - assertTrue(f.exists() && !f.isDirectory()); - List data = Files.readAllLines(path, StandardCharsets.UTF_8); - // ensure there's only one entity + 1 new line - assertTrue("data size is:" + data.size(), data.size() == 2); - String d = data.get(0); - // confirm the contents same as what was written - assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity)); - - // verify aggregated metrics - String fileName2 = fsi.getOutputRoot() + - "/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/" - + type2 + "/" + id2 + - FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - Path path2 = Paths.get(fileName2); - File file = new File(fileName2); - assertTrue(file.exists() && !file.isDirectory()); - List data2 = Files.readAllLines(path2, StandardCharsets.UTF_8); - // ensure there's only one entity + 1 new line - assertTrue("data size is:" + data.size(), data2.size() == 2); - String metricToString = data2.get(0); - // confirm the contents same as what was written - assertEquals(metricToString, - TimelineUtils.dumpTimelineRecordtoJSON(entity2)); - - // delete the directory - File outputDir = new File(fsi.getOutputRoot()); - FileUtils.deleteDirectory(outputDir); - assertTrue(!(f.exists())); + String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId, + escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId, + entity.getType()); + String fileName = dir + entity.getId() + + TIMELINE_SERVICE_STORAGE_EXTENSION; + out = + new PrintWriter(new BufferedWriter(new OutputStreamWriter( + new FileOutputStream(fileName, true), "UTF-8"))); + out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); + out.write("\n"); + } catch (IOException ioe) { + TimelineWriteError error = new TimelineWriteError(); + error.setEntityId(entity.getId()); + error.setEntityType(entity.getType()); + /* + * TODO: set an appropriate error code after PoC could possibly be: + * error.setErrorCode(TimelineWriteError.IO_EXCEPTION); + */ + response.addError(error); } finally { - if (fsi != null) { - fsi.close(); - FileUtils.deleteDirectory(new File(fsi.getOutputRoot())); + if (out != null) { + out.close(); } } } + @Override + public TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException { + return null; + + } + + public String getOutputRoot() { + return outputRoot; + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, + DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); + } + + @Override + public void serviceStart() throws Exception { + mkdirs(outputRoot, ENTITIES_DIR); + } + + @Override + public void flush() throws IOException { + // no op + } + + private static String mkdirs(String... dirStrs) throws IOException { + StringBuilder path = new StringBuilder(); + for (String dirStr : dirStrs) { + path.append(dirStr).append('/'); + File dir = new File(path.toString()); + if (!dir.exists()) { + if (!dir.mkdirs()) { + throw new IOException("Could not create directories for " + dir); + } + } + } + return path.toString(); + } + + // specifically escape the separator character + private static String escape(String str) { + return str.replace(File.separatorChar, '_'); + } }