diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
index bee9618..67a65fa 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/pom.xml
@@ -98,6 +98,12 @@
test-jar
+ org.apache.hadoop
+ hadoop-yarn-server-timelineservice
+ test
+ test-jar
+
+
org.hsqldb
hsqldb
test
diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
index fde9e64..6fdf0db 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
@@ -51,8 +51,8 @@
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineReaderImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -242,7 +242,7 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
}
// Cleanup test file
String testRoot =
- FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
+ TestFileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
File testRootFolder = new File(testRoot);
if(testRootFolder.isDirectory()) {
FileUtils.deleteDirectory(testRootFolder);
@@ -254,7 +254,7 @@ public void testMRNewTimelineServiceEventHandling() throws Exception {
private void checkNewTimelineEvent(ApplicationId appId,
ApplicationReport appReport) throws IOException {
String tmpRoot =
- FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+ TestFileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+ "/entities/";
File tmpRootFolder = new File(tmpRoot);
@@ -276,7 +276,7 @@ private void checkNewTimelineEvent(ApplicationId appId,
// check for job event file
String jobEventFileName = appId.toString().replaceAll("application", "job")
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
String jobEventFilePath = outputDirJob + jobEventFileName;
File jobEventFile = new File(jobEventFilePath);
@@ -299,7 +299,7 @@ private void checkNewTimelineEvent(ApplicationId appId,
// check for job event file
String appEventFileName = appId.toString()
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
String appEventFilePath = outputAppDir + appEventFileName;
File appEventFile = new File(appEventFilePath);
@@ -318,7 +318,7 @@ private void checkNewTimelineEvent(ApplicationId appId,
taskFolder.isDirectory());
String taskEventFileName = appId.toString().replaceAll("application", "task")
- + "_m_000000" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ + "_m_000000" + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
String taskEventFilePath = outputDirTask + taskEventFileName;
File taskEventFile = new File(taskEventFilePath);
@@ -336,7 +336,7 @@ private void checkNewTimelineEvent(ApplicationId appId,
String taskAttemptEventFileName = appId.toString().replaceAll(
"application", "attempt") + "_m_000000_0" +
- FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
String taskAttemptEventFilePath = outputDirTaskAttempt +
taskAttemptEventFileName;
@@ -370,7 +370,7 @@ private void verifyEntity(File entityFile, String eventId,
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().length() > 0) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
- FileSystemTimelineReaderImpl.getTimelineRecordFromJSON(
+ TestFileSystemTimelineReaderImpl.getTimelineRecordFromJSON(
strLine.trim(),
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.class);
if (eventId == null) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 22c16e3..acd3e19 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -86,7 +86,7 @@
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@@ -487,7 +487,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
LOG.info("Started checkTimelineV2 ");
// For PoC check in /tmp/timeline_service_data YARN-3264
String tmpRoot =
- FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+ TestFileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
+ "/entities/";
File tmpRootFolder = new File(tmpRoot);
@@ -510,7 +510,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
String appTimestampFileName =
"appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_000001"
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT",
appTimestampFileName);
@@ -525,7 +525,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
String containerMetricsTimestampFileName =
"container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_01_000001"
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File containerEntityFile = verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_CONTAINER.toString(),
containerMetricsTimestampFileName);
@@ -556,7 +556,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
// Verify RM posting Application life cycle Events are getting published
String appMetricsTimestampFileName =
"application_" + appId.getClusterTimestamp() + "_000" + appId.getId()
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File appEntityFile =
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_APPLICATION.toString(),
@@ -589,7 +589,7 @@ private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
String appAttemptMetricsTimestampFileName =
"appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_000001"
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File appAttemptEntityFile =
verifyEntityTypeFileExists(basePath,
TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index f25fb48..2a39d46 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -158,6 +158,12 @@
log4j
log4j
+
+ org.apache.hadoop
+ hadoop-yarn-server-timelineservice
+ test
+ test-jar
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
index 13c67f8..b03f258 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
@@ -64,8 +64,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineReaderImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -75,7 +75,7 @@
public class TestSystemMetricsPublisherForV2 {
/**
- * is the folder where the FileSystemTimelineWriterImpl writes the entities
+ * is the folder where the TestFileSystemTimelineWriterImpl writes the entities
*/
protected static File testRootDir = new File("target",
TestSystemMetricsPublisherForV2.class.getName() + "-localDir")
@@ -143,7 +143,7 @@ private static Configuration getTimelineV2Conf() {
conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_EVENTS_ENABLED,
true);
try {
- conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+ conf.set(TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
testRootDir.getCanonicalPath());
} catch (IOException e) {
e.printStackTrace();
@@ -203,7 +203,7 @@ public void testPublishApplicationMetrics() throws Exception {
// file name is .thist
String timelineServiceFileName =
appId.toString()
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File appFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(appFile.exists());
verifyEntity(appFile, 3, ApplicationMetricsConstants.CREATED_EVENT_TYPE);
@@ -237,7 +237,7 @@ public void testPublishAppAttemptMetrics() throws Exception {
// file name is .thist
String timelineServiceFileName =
appAttemptId.toString()
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File appFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(appFile.exists());
verifyEntity(appFile,2, AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
@@ -268,7 +268,7 @@ public void testPublishContainerMetrics() throws Exception {
// file name is .thist
String timelineServiceFileName =
containerId.toString()
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+ + TestFileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
File appFile = new File(outputDirApp, timelineServiceFileName);
Assert.assertTrue(appFile.exists());
verifyEntity(appFile, 2,
@@ -294,7 +294,7 @@ private static void verifyEntity(File entityFile, long expectedEvents,
reader = new BufferedReader(new FileReader(entityFile));
while ((strLine = reader.readLine()) != null) {
if (strLine.trim().length() > 0) {
- TimelineEntity entity = FileSystemTimelineReaderImpl.
+ TimelineEntity entity = TestFileSystemTimelineReaderImpl.
getTimelineRecordFromJSON(strLine.trim(), TimelineEntity.class);
for (TimelineEvent event : entity.getEvents()) {
if (event.getId().equals(eventForCreatedTime)) {
@@ -315,7 +315,7 @@ private static void verifyEntity(File entityFile, long expectedEvents,
private String getTimelineEntityDir(RMApp app) {
String outputDirApp =
testRootDir.getAbsolutePath() + "/"
- + FileSystemTimelineWriterImpl.ENTITIES_DIR + "/"
+ + TestFileSystemTimelineWriterImpl.ENTITIES_DIR + "/"
+ YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/"
+ app.getUser() + "/"
+ app.getName() + "/"
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index a8f88e5..9758320 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -36,7 +36,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import com.google.common.annotations.VisibleForTesting;
@@ -61,7 +61,7 @@
public void serviceInit(Configuration conf) throws Exception {
writer = ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
- FileSystemTimelineWriterImpl.class,
+ HBaseTimelineWriterImpl.class,
TimelineWriter.class), conf);
writer.init(conf);
// create a single dedicated thread for flushing the writer on a periodic
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
index 97725e6..110d1dc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
@@ -41,7 +41,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
@@ -81,7 +81,7 @@ protected void serviceInit(Configuration conf) throws Exception {
private TimelineReader createTimelineReaderStore(Configuration conf) {
TimelineReader readerStore = ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
- FileSystemTimelineReaderImpl.class, TimelineReader.class), conf);
+ HBaseTimelineReaderImpl.class, TimelineReader.class), conf);
LOG.info("Using store " + readerStore.getClass().getName());
readerStore.init(conf);
return readerStore;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
deleted file mode 100644
index 00aa686..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.Charset;
-import java.util.Comparator;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVParser;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
-import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * File System based implementation for TimelineReader.
- */
-public class FileSystemTimelineReaderImpl extends AbstractService
- implements TimelineReader {
-
- private static final Log LOG =
- LogFactory.getLog(FileSystemTimelineReaderImpl.class);
-
- private String rootPath;
- private static final String ENTITIES_DIR = "entities";
-
- /** Default extension for output files. */
- private static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
-
- @VisibleForTesting
- /** Default extension for output files. */
- static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv";
-
- @VisibleForTesting
- /** Config param for timeline service file system storage root. */
- static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
- YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
-
- @VisibleForTesting
- /** Default value for storage location on local disk. */
- static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT =
- "/tmp/timeline_service_data";
-
- private final CSVFormat csvFormat =
- CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
-
- public FileSystemTimelineReaderImpl() {
- super(FileSystemTimelineReaderImpl.class.getName());
- }
-
- @VisibleForTesting
- String getRootPath() {
- return rootPath;
- }
-
- private static ObjectMapper mapper;
-
- static {
- mapper = new ObjectMapper();
- YarnJacksonJaxbJsonProvider.configObjectMapper(mapper);
- }
-
- /**
- * Deserialize a POJO object from a JSON string.
- *
- * @param Describes the type of class to be returned.
- * @param clazz class to be deserialized.
- * @param jsonString JSON string to deserialize.
- * @return An object based on class type. Used typically for
- * TimelineEntity object.
- * @throws IOException if the underlying input source has problems during
- * parsing.
- * @throws JsonMappingException if parser has problems parsing content.
- * @throws JsonGenerationException if there is a problem in JSON writing.
- */
- public static T getTimelineRecordFromJSON(
- String jsonString, Class clazz)
- throws JsonGenerationException, JsonMappingException, IOException {
- return mapper.readValue(jsonString, clazz);
- }
-
- private static void fillFields(TimelineEntity finalEntity,
- TimelineEntity real, EnumSet fields) {
- if (fields.contains(Field.ALL)) {
- fields = EnumSet.allOf(Field.class);
- }
- for (Field field : fields) {
- switch(field) {
- case CONFIGS:
- finalEntity.setConfigs(real.getConfigs());
- break;
- case METRICS:
- finalEntity.setMetrics(real.getMetrics());
- break;
- case INFO:
- finalEntity.setInfo(real.getInfo());
- break;
- case IS_RELATED_TO:
- finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
- break;
- case RELATES_TO:
- finalEntity.setIsRelatedToEntities(real.getIsRelatedToEntities());
- break;
- case EVENTS:
- finalEntity.setEvents(real.getEvents());
- break;
- default:
- continue;
- }
- }
- }
-
- private String getFlowRunPath(String userId, String clusterId,
- String flowName, Long flowRunId, String appId) throws IOException {
- if (userId != null && flowName != null && flowRunId != null) {
- return userId + "/" + flowName + "/" + flowRunId;
- }
- if (clusterId == null || appId == null) {
- throw new IOException("Unable to get flow info");
- }
- String appFlowMappingFile = rootPath + "/" + ENTITIES_DIR + "/" +
- clusterId + "/" + APP_FLOW_MAPPING_FILE;
- try (BufferedReader reader =
- new BufferedReader(new InputStreamReader(
- new FileInputStream(
- appFlowMappingFile), Charset.forName("UTF-8")));
- CSVParser parser = new CSVParser(reader, csvFormat)) {
- for (CSVRecord record : parser.getRecords()) {
- if (record.size() < 4) {
- continue;
- }
- String applicationId = record.get("APP");
- if (applicationId != null && !applicationId.trim().isEmpty() &&
- !applicationId.trim().equals(appId)) {
- continue;
- }
- return record.get(1).trim() + "/" + record.get(2).trim() + "/" +
- record.get(3).trim();
- }
- parser.close();
- }
- throw new IOException("Unable to get flow info");
- }
-
- private static TimelineEntity createEntityToBeReturned(TimelineEntity entity,
- EnumSet fieldsToRetrieve) {
- TimelineEntity entityToBeReturned = new TimelineEntity();
- entityToBeReturned.setIdentifier(entity.getIdentifier());
- entityToBeReturned.setCreatedTime(entity.getCreatedTime());
- if (fieldsToRetrieve != null) {
- fillFields(entityToBeReturned, entity, fieldsToRetrieve);
- }
- return entityToBeReturned;
- }
-
- private static boolean isTimeInRange(Long time, Long timeBegin,
- Long timeEnd) {
- return (time >= timeBegin) && (time <= timeEnd);
- }
-
- private static void mergeEntities(TimelineEntity entity1,
- TimelineEntity entity2) {
- // Ideally created time wont change except in the case of issue from client.
- if (entity2.getCreatedTime() != null && entity2.getCreatedTime() > 0) {
- entity1.setCreatedTime(entity2.getCreatedTime());
- }
- for (Entry configEntry : entity2.getConfigs().entrySet()) {
- entity1.addConfig(configEntry.getKey(), configEntry.getValue());
- }
- for (Entry infoEntry : entity2.getInfo().entrySet()) {
- entity1.addInfo(infoEntry.getKey(), infoEntry.getValue());
- }
- for (Entry> isRelatedToEntry :
- entity2.getIsRelatedToEntities().entrySet()) {
- String type = isRelatedToEntry.getKey();
- for (String entityId : isRelatedToEntry.getValue()) {
- entity1.addIsRelatedToEntity(type, entityId);
- }
- }
- for (Entry> relatesToEntry :
- entity2.getRelatesToEntities().entrySet()) {
- String type = relatesToEntry.getKey();
- for (String entityId : relatesToEntry.getValue()) {
- entity1.addRelatesToEntity(type, entityId);
- }
- }
- for (TimelineEvent event : entity2.getEvents()) {
- entity1.addEvent(event);
- }
- for (TimelineMetric metric2 : entity2.getMetrics()) {
- boolean found = false;
- for (TimelineMetric metric1 : entity1.getMetrics()) {
- if (metric1.getId().equals(metric2.getId())) {
- metric1.addValues(metric2.getValues());
- found = true;
- break;
- }
- }
- if (!found) {
- entity1.addMetric(metric2);
- }
- }
- }
-
- private static TimelineEntity readEntityFromFile(BufferedReader reader)
- throws IOException {
- TimelineEntity entity =
- getTimelineRecordFromJSON(reader.readLine(), TimelineEntity.class);
- String entityStr = "";
- while ((entityStr = reader.readLine()) != null) {
- if (entityStr.trim().isEmpty()) {
- continue;
- }
- TimelineEntity anotherEntity =
- getTimelineRecordFromJSON(entityStr, TimelineEntity.class);
- if (!entity.getId().equals(anotherEntity.getId()) ||
- !entity.getType().equals(anotherEntity.getType())) {
- continue;
- }
- mergeEntities(entity, anotherEntity);
- }
- return entity;
- }
-
- private Set getEntities(File dir, String entityType,
- TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
- throws IOException {
- // First sort the selected entities based on created/start time.
- Map> sortedEntities =
- new TreeMap<>(
- new Comparator() {
- @Override
- public int compare(Long l1, Long l2) {
- return l2.compareTo(l1);
- }
- }
- );
- for (File entityFile : dir.listFiles()) {
- if (!entityFile.getName().contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
- continue;
- }
- try (BufferedReader reader =
- new BufferedReader(
- new InputStreamReader(
- new FileInputStream(
- entityFile), Charset.forName("UTF-8")))) {
- TimelineEntity entity = readEntityFromFile(reader);
- if (!entity.getType().equals(entityType)) {
- continue;
- }
- if (!isTimeInRange(entity.getCreatedTime(),
- filters.getCreatedTimeBegin(), filters.getCreatedTimeEnd())) {
- continue;
- }
- if (filters.getRelatesTo() != null &&
- !filters.getRelatesTo().getFilterList().isEmpty() &&
- !TimelineStorageUtils.matchRelatesTo(entity,
- filters.getRelatesTo())) {
- continue;
- }
- if (filters.getIsRelatedTo() != null &&
- !filters.getIsRelatedTo().getFilterList().isEmpty() &&
- !TimelineStorageUtils.matchIsRelatedTo(entity,
- filters.getIsRelatedTo())) {
- continue;
- }
- if (filters.getInfoFilters() != null &&
- !filters.getInfoFilters().getFilterList().isEmpty() &&
- !TimelineStorageUtils.matchInfoFilters(entity,
- filters.getInfoFilters())) {
- continue;
- }
- if (filters.getConfigFilters() != null &&
- !filters.getConfigFilters().getFilterList().isEmpty() &&
- !TimelineStorageUtils.matchConfigFilters(entity,
- filters.getConfigFilters())) {
- continue;
- }
- if (filters.getMetricFilters() != null &&
- !filters.getMetricFilters().getFilterList().isEmpty() &&
- !TimelineStorageUtils.matchMetricFilters(entity,
- filters.getMetricFilters())) {
- continue;
- }
- if (filters.getEventFilters() != null &&
- !filters.getEventFilters().getFilterList().isEmpty() &&
- !TimelineStorageUtils.matchEventFilters(entity,
- filters.getEventFilters())) {
- continue;
- }
- TimelineEntity entityToBeReturned = createEntityToBeReturned(
- entity, dataToRetrieve.getFieldsToRetrieve());
- Set entitiesCreatedAtSameTime =
- sortedEntities.get(entityToBeReturned.getCreatedTime());
- if (entitiesCreatedAtSameTime == null) {
- entitiesCreatedAtSameTime = new HashSet();
- }
- entitiesCreatedAtSameTime.add(entityToBeReturned);
- sortedEntities.put(
- entityToBeReturned.getCreatedTime(), entitiesCreatedAtSameTime);
- }
- }
-
- Set entities = new HashSet();
- long entitiesAdded = 0;
- for (Set entitySet : sortedEntities.values()) {
- for (TimelineEntity entity : entitySet) {
- entities.add(entity);
- ++entitiesAdded;
- if (entitiesAdded >= filters.getLimit()) {
- return entities;
- }
- }
- }
- return entities;
- }
-
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
- DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
- super.serviceInit(conf);
- }
-
- @Override
- public TimelineEntity getEntity(TimelineReaderContext context,
- TimelineDataToRetrieve dataToRetrieve) throws IOException {
- String flowRunPath = getFlowRunPath(context.getUserId(),
- context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
- context.getAppId());
- File dir = new File(new File(rootPath, ENTITIES_DIR),
- context.getClusterId() + "/" + flowRunPath + "/" + context.getAppId() +
- "/" + context.getEntityType());
- File entityFile = new File(
- dir, context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
- try (BufferedReader reader =
- new BufferedReader(new InputStreamReader(
- new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
- TimelineEntity entity = readEntityFromFile(reader);
- return createEntityToBeReturned(
- entity, dataToRetrieve.getFieldsToRetrieve());
- } catch (FileNotFoundException e) {
- LOG.info("Cannot find entity {id:" + context.getEntityId() + " , type:" +
- context.getEntityType() + "}. Will send HTTP 404 in response.");
- return null;
- }
- }
-
- @Override
- public Set getEntities(TimelineReaderContext context,
- TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
- throws IOException {
- String flowRunPath = getFlowRunPath(context.getUserId(),
- context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
- context.getAppId());
- File dir =
- new File(new File(rootPath, ENTITIES_DIR),
- context.getClusterId() + "/" + flowRunPath + "/" +
- context.getAppId() + "/" + context.getEntityType());
- return getEntities(dir, context.getEntityType(), filters, dataToRetrieve);
- }
-}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
deleted file mode 100644
index 74a03ac..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-
-/**
- * This implements a local file based backend for storing application timeline
- * information.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class FileSystemTimelineWriterImpl extends AbstractService
- implements TimelineWriter {
-
- private String outputRoot;
-
- /** Config param for timeline service storage tmp root for FILE YARN-3264. */
- public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT
- = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
-
- /** default value for storage location on local disk. */
- public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
- = "/tmp/timeline_service_data";
-
- public static final String ENTITIES_DIR = "entities";
-
- /** Default extension for output files. */
- public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
-
- FileSystemTimelineWriterImpl() {
- super((FileSystemTimelineWriterImpl.class.getName()));
- }
-
- @Override
- public TimelineWriteResponse write(String clusterId, String userId,
- String flowName, String flowVersion, long flowRunId, String appId,
- TimelineEntities entities) throws IOException {
- TimelineWriteResponse response = new TimelineWriteResponse();
- for (TimelineEntity entity : entities.getEntities()) {
- write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity,
- response);
- }
- return response;
- }
-
- private synchronized void write(String clusterId, String userId,
- String flowName, String flowVersion, long flowRun, String appId,
- TimelineEntity entity, TimelineWriteResponse response)
- throws IOException {
- PrintWriter out = null;
- try {
- String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,
- escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId,
- entity.getType());
- String fileName = dir + entity.getId() +
- TIMELINE_SERVICE_STORAGE_EXTENSION;
- out =
- new PrintWriter(new BufferedWriter(new OutputStreamWriter(
- new FileOutputStream(fileName, true), "UTF-8")));
- out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
- out.write("\n");
- } catch (IOException ioe) {
- TimelineWriteError error = new TimelineWriteError();
- error.setEntityId(entity.getId());
- error.setEntityType(entity.getType());
- /*
- * TODO: set an appropriate error code after PoC could possibly be:
- * error.setErrorCode(TimelineWriteError.IO_EXCEPTION);
- */
- response.addError(error);
- } finally {
- if (out != null) {
- out.close();
- }
- }
- }
-
- @Override
- public TimelineWriteResponse aggregate(TimelineEntity data,
- TimelineAggregationTrack track) throws IOException {
- return null;
-
- }
-
- public String getOutputRoot() {
- return outputRoot;
- }
-
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
- DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
- }
-
- @Override
- public void serviceStart() throws Exception {
- mkdirs(outputRoot, ENTITIES_DIR);
- }
-
- @Override
- public void flush() throws IOException {
- // no op
- }
-
- private static String mkdirs(String... dirStrs) throws IOException {
- StringBuilder path = new StringBuilder();
- for (String dirStr : dirStrs) {
- path.append(dirStr).append('/');
- File dir = new File(path.toString());
- if (!dir.exists()) {
- if (!dir.mkdirs()) {
- throw new IOException("Could not create directories for " + dir);
- }
- }
- }
- return path.toString();
- }
-
- // specifically escape the separator character
- private static String escape(String str) {
- return str.replace(File.separatorChar, '_');
- }
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
index 0bddf1b..41df895 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java
@@ -37,7 +37,7 @@
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TestFileSystemTimelineReaderImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImplTest;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -60,12 +60,12 @@
@BeforeClass
public static void setup() throws Exception {
- TestFileSystemTimelineReaderImpl.setup();
+ FileSystemTimelineReaderImplTest.setup();
}
@AfterClass
public static void tearDown() throws Exception {
- TestFileSystemTimelineReaderImpl.tearDown();
+ FileSystemTimelineReaderImplTest.tearDown();
}
@Before
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImplTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImplTest.java
new file mode 100644
index 0000000..273cf88
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImplTest.java
@@ -0,0 +1,804 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters;
+import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
+import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class FileSystemTimelineReaderImplTest {
+
+ private static final String rootDir =
+ TestFileSystemTimelineReaderImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
+ TestFileSystemTimelineReaderImpl reader;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ loadEntityData();
+ // Create app flow mapping file.
+ CSVFormat format =
+ CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
+ String appFlowMappingFile = rootDir + "/entities/cluster1/" +
+ TestFileSystemTimelineReaderImpl.APP_FLOW_MAPPING_FILE;
+ try (PrintWriter out =
+ new PrintWriter(new BufferedWriter(
+ new FileWriter(appFlowMappingFile, true)));
+ CSVPrinter printer = new CSVPrinter(out, format)){
+ printer.printRecord("app1", "user1", "flow1", 1);
+ printer.printRecord("app2","user1","flow1,flow",1);
+ printer.close();
+ }
+ (new File(rootDir)).deleteOnExit();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ FileUtils.deleteDirectory(new File(rootDir));
+ }
+
+ @Before
+ public void init() throws Exception {
+ reader = new TestFileSystemTimelineReaderImpl();
+ Configuration conf = new YarnConfiguration();
+ conf.set(TestFileSystemTimelineReaderImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+ rootDir);
+ reader.init(conf);
+ }
+
+ private static void writeEntityFile(TimelineEntity entity, File dir)
+ throws Exception {
+ if (!dir.exists()) {
+ if (!dir.mkdirs()) {
+ throw new IOException("Could not create directories for " + dir);
+ }
+ }
+ String fileName = dir.getAbsolutePath() + "/" + entity.getId() + ".thist";
+ try (PrintWriter out =
+ new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)))){
+ out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
+ out.write("\n");
+ out.close();
+ }
+ }
+
+ private static void loadEntityData() throws Exception {
+ File appDir = new File(rootDir +
+ "/entities/cluster1/user1/flow1/1/app1/app/");
+ TimelineEntity entity11 = new TimelineEntity();
+ entity11.setId("id_1");
+ entity11.setType("app");
+ entity11.setCreatedTime(1425016502000L);
+ Map info1 = new HashMap();
+ info1.put("info1", "val1");
+ info1.put("info2", "val5");
+ entity11.addInfo(info1);
+ TimelineEvent event = new TimelineEvent();
+ event.setId("event_1");
+ event.setTimestamp(1425016502003L);
+ entity11.addEvent(event);
+ Set metrics = new HashSet();
+ TimelineMetric metric1 = new TimelineMetric();
+ metric1.setId("metric1");
+ metric1.setType(TimelineMetric.Type.SINGLE_VALUE);
+ metric1.addValue(1425016502006L, 113);
+ metrics.add(metric1);
+ TimelineMetric metric2 = new TimelineMetric();
+ metric2.setId("metric2");
+ metric2.setType(TimelineMetric.Type.TIME_SERIES);
+ metric2.addValue(1425016502016L, 34);
+ metrics.add(metric2);
+ entity11.setMetrics(metrics);
+ Map configs = new HashMap();
+ configs.put("config_1", "127");
+ entity11.setConfigs(configs);
+ entity11.addRelatesToEntity("flow", "flow1");
+ entity11.addIsRelatedToEntity("type1", "tid1_1");
+ writeEntityFile(entity11, appDir);
+ TimelineEntity entity12 = new TimelineEntity();
+ entity12.setId("id_1");
+ entity12.setType("app");
+ configs.clear();
+ configs.put("config_2", "23");
+ configs.put("config_3", "abc");
+ entity12.addConfigs(configs);
+ metrics.clear();
+ TimelineMetric metric12 = new TimelineMetric();
+ metric12.setId("metric2");
+ metric12.setType(TimelineMetric.Type.TIME_SERIES);
+ metric12.addValue(1425016502032L, 48);
+ metric12.addValue(1425016502054L, 51);
+ metrics.add(metric12);
+ TimelineMetric metric3 = new TimelineMetric();
+ metric3.setId("metric3");
+ metric3.setType(TimelineMetric.Type.SINGLE_VALUE);
+ metric3.addValue(1425016502060L, 23L);
+ metrics.add(metric3);
+ entity12.setMetrics(metrics);
+ entity12.addIsRelatedToEntity("type1", "tid1_2");
+ entity12.addIsRelatedToEntity("type2", "tid2_1`");
+ TimelineEvent event15 = new TimelineEvent();
+ event15.setId("event_5");
+ event15.setTimestamp(1425016502017L);
+ entity12.addEvent(event15);
+ writeEntityFile(entity12, appDir);
+
+ TimelineEntity entity2 = new TimelineEntity();
+ entity2.setId("id_2");
+ entity2.setType("app");
+ entity2.setCreatedTime(1425016501050L);
+ Map info2 = new HashMap();
+ info1.put("info2", 4);
+ entity2.addInfo(info2);
+ Map configs2 = new HashMap();
+ configs2.put("config_1", "129");
+ configs2.put("config_3", "def");
+ entity2.setConfigs(configs2);
+ TimelineEvent event2 = new TimelineEvent();
+ event2.setId("event_2");
+ event2.setTimestamp(1425016501003L);
+ entity2.addEvent(event2);
+ Set metrics2 = new HashSet();
+ TimelineMetric metric21 = new TimelineMetric();
+ metric21.setId("metric1");
+ metric21.setType(TimelineMetric.Type.SINGLE_VALUE);
+ metric21.addValue(1425016501006L, 300);
+ metrics2.add(metric21);
+ TimelineMetric metric22 = new TimelineMetric();
+ metric22.setId("metric2");
+ metric22.setType(TimelineMetric.Type.TIME_SERIES);
+ metric22.addValue(1425016501056L, 31);
+ metric22.addValue(1425016501084L, 70);
+ metrics2.add(metric22);
+ TimelineMetric metric23 = new TimelineMetric();
+ metric23.setId("metric3");
+ metric23.setType(TimelineMetric.Type.SINGLE_VALUE);
+ metric23.addValue(1425016502060L, 23L);
+ metrics2.add(metric23);
+ entity2.setMetrics(metrics2);
+ entity2.addRelatesToEntity("flow", "flow2");
+ writeEntityFile(entity2, appDir);
+
+ TimelineEntity entity3 = new TimelineEntity();
+ entity3.setId("id_3");
+ entity3.setType("app");
+ entity3.setCreatedTime(1425016501050L);
+ Map info3 = new HashMap();
+ info3.put("info2", 3.5);
+ info3.put("info4", 20);
+ entity3.addInfo(info3);
+ Map configs3 = new HashMap();
+ configs3.put("config_1", "123");
+ configs3.put("config_3", "abc");
+ entity3.setConfigs(configs3);
+ TimelineEvent event3 = new TimelineEvent();
+ event3.setId("event_2");
+ event3.setTimestamp(1425016501003L);
+ entity3.addEvent(event3);
+ TimelineEvent event4 = new TimelineEvent();
+ event4.setId("event_4");
+ event4.setTimestamp(1425016502006L);
+ entity3.addEvent(event4);
+ Set metrics3 = new HashSet();
+ TimelineMetric metric31 = new TimelineMetric();
+ metric31.setId("metric1");
+ metric31.setType(TimelineMetric.Type.SINGLE_VALUE);
+ metric31.addValue(1425016501006L, 124);
+ metrics3.add(metric31);
+ TimelineMetric metric32 = new TimelineMetric();
+ metric32.setId("metric2");
+ metric32.setType(TimelineMetric.Type.TIME_SERIES);
+ metric32.addValue(1425016501056L, 31);
+ metric32.addValue(1425016501084L, 74);
+ metrics3.add(metric32);
+ entity3.setMetrics(metrics3);
+ entity3.addIsRelatedToEntity("type1", "tid1_2");
+ writeEntityFile(entity3, appDir);
+
+ TimelineEntity entity4 = new TimelineEntity();
+ entity4.setId("id_4");
+ entity4.setType("app");
+ entity4.setCreatedTime(1425016502050L);
+ TimelineEvent event44 = new TimelineEvent();
+ event44.setId("event_4");
+ event44.setTimestamp(1425016502003L);
+ entity4.addEvent(event44);
+ writeEntityFile(entity4, appDir);
+
+ File appDir2 = new File(rootDir +
+ "/entities/cluster1/user1/flow1,flow/1/app2/app/");
+ TimelineEntity entity5 = new TimelineEntity();
+ entity5.setId("id_5");
+ entity5.setType("app");
+ entity5.setCreatedTime(1425016502050L);
+ writeEntityFile(entity5, appDir2);
+ }
+
+ public TimelineReader getTimelineReader() {
+ return reader;
+ }
+
+ @Test
+ public void testGetEntityDefaultView() throws Exception {
+ // If no fields are specified, entity is returned with default view i.e.
+ // only the id, type and created time.
+ TimelineEntity result = reader.getEntity(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", "id_1"),
+ new TimelineDataToRetrieve(null, null, null, null));
+ Assert.assertEquals(
+ (new TimelineEntity.Identifier("app", "id_1")).toString(),
+ result.getIdentifier().toString());
+ Assert.assertEquals((Long)1425016502000L, result.getCreatedTime());
+ Assert.assertEquals(0, result.getConfigs().size());
+ Assert.assertEquals(0, result.getMetrics().size());
+ }
+
+ @Test
+ public void testGetEntityByClusterAndApp() throws Exception {
+ // Cluster and AppId should be enough to get an entity.
+ TimelineEntity result = reader.getEntity(
+ new TimelineReaderContext("cluster1", null, null, null, "app1", "app",
+ "id_1"),
+ new TimelineDataToRetrieve(null, null, null, null));
+ Assert.assertEquals(
+ (new TimelineEntity.Identifier("app", "id_1")).toString(),
+ result.getIdentifier().toString());
+ Assert.assertEquals((Long)1425016502000L, result.getCreatedTime());
+ Assert.assertEquals(0, result.getConfigs().size());
+ Assert.assertEquals(0, result.getMetrics().size());
+ }
+
+ /** This test checks whether we can handle commas in app flow mapping csv */
+ @Test
+ public void testAppFlowMappingCsv() throws Exception {
+ // Test getting an entity by cluster and app where flow entry
+ // in app flow mapping csv has commas.
+ TimelineEntity result = reader.getEntity(
+ new TimelineReaderContext("cluster1", null, null, null, "app2",
+ "app", "id_5"),
+ new TimelineDataToRetrieve(null, null, null, null));
+ Assert.assertEquals(
+ (new TimelineEntity.Identifier("app", "id_5")).toString(),
+ result.getIdentifier().toString());
+ Assert.assertEquals((Long)1425016502050L, result.getCreatedTime());
+ }
+
+ @Test
+ public void testGetEntityCustomFields() throws Exception {
+ // Specified fields in addition to default view will be returned.
+ TimelineEntity result = reader.getEntity(
+ new TimelineReaderContext("cluster1","user1", "flow1", 1L, "app1",
+ "app", "id_1"),
+ new TimelineDataToRetrieve(null, null,
+ EnumSet.of(Field.INFO, Field.CONFIGS, Field.METRICS), null));
+ Assert.assertEquals(
+ (new TimelineEntity.Identifier("app", "id_1")).toString(),
+ result.getIdentifier().toString());
+ Assert.assertEquals((Long)1425016502000L, result.getCreatedTime());
+ Assert.assertEquals(3, result.getConfigs().size());
+ Assert.assertEquals(3, result.getMetrics().size());
+ Assert.assertEquals(2, result.getInfo().size());
+ // No events will be returned
+ Assert.assertEquals(0, result.getEvents().size());
+ }
+
+ @Test
+ public void testGetEntityAllFields() throws Exception {
+ // All fields of TimelineEntity will be returned.
+ TimelineEntity result = reader.getEntity(
+ new TimelineReaderContext("cluster1","user1", "flow1", 1L, "app1",
+ "app", "id_1"),
+ new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
+ Assert.assertEquals(
+ (new TimelineEntity.Identifier("app", "id_1")).toString(),
+ result.getIdentifier().toString());
+ Assert.assertEquals((Long)1425016502000L, result.getCreatedTime());
+ Assert.assertEquals(3, result.getConfigs().size());
+ Assert.assertEquals(3, result.getMetrics().size());
+ // All fields including events will be returned.
+ Assert.assertEquals(2, result.getEvents().size());
+ }
+
+ @Test
+ public void testGetAllEntities() throws Exception {
+ Set result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null), new TimelineEntityFilters(),
+ new TimelineDataToRetrieve(null, null, EnumSet.of(Field.ALL), null));
+ // All 4 entities will be returned
+ Assert.assertEquals(4, result.size());
+ }
+
+ @Test
+ public void testGetEntitiesWithLimit() throws Exception {
+ Set result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(2L, null, null, null, null, null, null,
+ null, null), new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ // Needs to be rewritten once hashcode and equals for
+ // TimelineEntity is implemented
+ // Entities with id_1 and id_4 should be returned,
+ // based on created time, descending.
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_4")) {
+ Assert.fail("Entity not sorted by created time");
+ }
+ }
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(3L, null, null, null, null, null, null,
+ null, null), new TimelineDataToRetrieve());
+ // Even though 2 entities out of 4 have same created time, one entity
+ // is left out due to limit
+ Assert.assertEquals(3, result.size());
+ }
+
+ @Test
+ public void testGetEntitiesByTimeWindows() throws Exception {
+ // Get entities based on created time start and end time range.
+ Set result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, 1425016502030L, 1425016502060L, null,
+ null, null, null, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(1, result.size());
+ // Only one entity with ID id_4 should be returned.
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_4")) {
+ Assert.fail("Incorrect filtering based on created time range");
+ }
+ }
+
+ // Get entities if only created time end is specified.
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, 1425016502010L, null, null,
+ null, null, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(3, result.size());
+ for (TimelineEntity entity : result) {
+ if (entity.getId().equals("id_4")) {
+ Assert.fail("Incorrect filtering based on created time range");
+ }
+ }
+
+ // Get entities if only created time start is specified.
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, 1425016502010L, null, null, null,
+ null, null, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(1, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_4")) {
+ Assert.fail("Incorrect filtering based on created time range");
+ }
+ }
+ }
+
+ @Test
+ public void testGetFilteredEntities() throws Exception {
+ // Get entities based on info filters.
+ TimelineFilterList infoFilterList = new TimelineFilterList();
+ infoFilterList.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
+ Set result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, infoFilterList,
+ null, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(1, result.size());
+ // Only one entity with ID id_3 should be returned.
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_3")) {
+ Assert.fail("Incorrect filtering based on info filters");
+ }
+ }
+
+ // Get entities based on config filters.
+ TimelineFilterList confFilterList = new TimelineFilterList();
+ confFilterList.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", "123"));
+ confFilterList.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "abc"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null,
+ confFilterList, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(1, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_3")) {
+ Assert.fail("Incorrect filtering based on config filters");
+ }
+ }
+
+ // Get entities based on event filters.
+ TimelineFilterList eventFilters = new TimelineFilterList();
+ eventFilters.addFilter(
+ new TimelineExistsFilter(TimelineCompareOp.EQUAL,"event_2"));
+ eventFilters.addFilter(
+ new TimelineExistsFilter(TimelineCompareOp.EQUAL,"event_4"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null, null,
+ null, eventFilters),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(1, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_3")) {
+ Assert.fail("Incorrect filtering based on event filters");
+ }
+ }
+
+ // Get entities based on metric filters.
+ TimelineFilterList metricFilterList = new TimelineFilterList();
+ metricFilterList.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.GREATER_OR_EQUAL, "metric3", 0L));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null, null,
+ metricFilterList, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ // Two entities with IDs' id_1 and id_2 should be returned.
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on metric filters");
+ }
+ }
+
+ // Get entities based on complex config filters.
+ TimelineFilterList list1 = new TimelineFilterList();
+ list1.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_1", "129"));
+ list1.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "def"));
+ TimelineFilterList list2 = new TimelineFilterList();
+ list2.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23"));
+ list2.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_3", "abc"));
+ TimelineFilterList confFilterList1 =
+ new TimelineFilterList(Operator.OR, list1, list2);
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null,
+ confFilterList1, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on config filters");
+ }
+ }
+
+ TimelineFilterList list3 = new TimelineFilterList();
+ list3.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.NOT_EQUAL, "config_1", "123"));
+ list3.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.NOT_EQUAL, "config_3", "abc"));
+ TimelineFilterList list4 = new TimelineFilterList();
+ list4.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "config_2", "23"));
+ TimelineFilterList confFilterList2 =
+ new TimelineFilterList(Operator.OR, list3, list4);
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null,
+ confFilterList2, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on config filters");
+ }
+ }
+
+ TimelineFilterList confFilterList3 = new TimelineFilterList();
+ confFilterList3.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.NOT_EQUAL, "config_1", "127"));
+ confFilterList3.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.NOT_EQUAL, "config_3", "abc"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null,
+ confFilterList3, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(1, result.size());
+ for(TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on config filters");
+ }
+ }
+
+ TimelineFilterList confFilterList4 = new TimelineFilterList();
+ confFilterList4.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.EQUAL, "config_dummy", "dummy"));
+ confFilterList4.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.EQUAL, "config_3", "def"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null,
+ confFilterList4, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(0, result.size());
+
+ TimelineFilterList confFilterList5 = new TimelineFilterList(Operator.OR);
+ confFilterList5.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.EQUAL, "config_dummy", "dummy"));
+ confFilterList5.addFilter(new TimelineKeyValueFilter(
+ TimelineCompareOp.EQUAL, "config_3", "def"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null,
+ confFilterList5, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(1, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on config filters");
+ }
+ }
+
+ // Get entities based on complex metric filters.
+ TimelineFilterList list6 = new TimelineFilterList();
+ list6.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.GREATER_THAN, "metric1", 200));
+ list6.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.EQUAL, "metric3", 23));
+ TimelineFilterList list7 = new TimelineFilterList();
+ list7.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.GREATER_OR_EQUAL, "metric2", 74));
+ TimelineFilterList metricFilterList1 =
+ new TimelineFilterList(Operator.OR, list6, list7);
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null, null,
+ metricFilterList1, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ // Two entities with IDs' id_2 and id_3 should be returned.
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_2") && !entity.getId().equals("id_3")) {
+ Assert.fail("Incorrect filtering based on metric filters");
+ }
+ }
+
+ TimelineFilterList metricFilterList2 = new TimelineFilterList();
+ metricFilterList2.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.LESS_THAN, "metric2", 70));
+ metricFilterList2.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null, null,
+ metricFilterList2, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(1, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1")) {
+ Assert.fail("Incorrect filtering based on metric filters");
+ }
+ }
+
+ TimelineFilterList metricFilterList3 = new TimelineFilterList();
+ metricFilterList3.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.LESS_THAN, "dummy_metric", 30));
+ metricFilterList3.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null, null,
+ metricFilterList3, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(0, result.size());
+
+ TimelineFilterList metricFilterList4 = new TimelineFilterList(Operator.OR);
+ metricFilterList4.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.LESS_THAN, "dummy_metric", 30));
+ metricFilterList4.addFilter(new TimelineCompareFilter(
+ TimelineCompareOp.LESS_OR_EQUAL, "metric3", 23));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null, null,
+ metricFilterList4, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on metric filters");
+ }
+ }
+
+ TimelineFilterList metricFilterList5 =
+ new TimelineFilterList(new TimelineCompareFilter(
+ TimelineCompareOp.NOT_EQUAL, "metric2", 74));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, null, null,
+ metricFilterList5, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_2")) {
+ Assert.fail("Incorrect filtering based on metric filters");
+ }
+ }
+
+ TimelineFilterList infoFilterList1 = new TimelineFilterList();
+ infoFilterList1.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
+ infoFilterList1.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.NOT_EQUAL, "info4", 20));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, infoFilterList1,
+ null, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(0, result.size());
+
+ TimelineFilterList infoFilterList2 = new TimelineFilterList(Operator.OR);
+ infoFilterList2.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", 3.5));
+ infoFilterList2.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info1", "val1"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, infoFilterList2,
+ null, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(2, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1") && !entity.getId().equals("id_3")) {
+ Assert.fail("Incorrect filtering based on info filters");
+ }
+ }
+
+ TimelineFilterList infoFilterList3 = new TimelineFilterList();
+ infoFilterList3.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1));
+ infoFilterList3.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, infoFilterList3,
+ null, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(0, result.size());
+
+ TimelineFilterList infoFilterList4 = new TimelineFilterList(Operator.OR);
+ infoFilterList4.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "dummy_info", 1));
+ infoFilterList4.addFilter(
+ new TimelineKeyValueFilter(TimelineCompareOp.EQUAL, "info2", "val5"));
+ result = reader.getEntities(
+ new TimelineReaderContext("cluster1", "user1", "flow1", 1L, "app1",
+ "app", null),
+ new TimelineEntityFilters(null, null, null, null, null, infoFilterList4,
+ null, null, null),
+ new TimelineDataToRetrieve());
+ Assert.assertEquals(1, result.size());
+ for (TimelineEntity entity : result) {
+ if (!entity.getId().equals("id_1")) {
+ Assert.fail("Incorrect filtering based on info filters");
+ }
+ }
+ }
+
+ @Test
+ public void testGetEntitiesByRelations() throws Exception {
+ // Get entities based on relatesTo.
+ TimelineFilterList relatesTo = new TimelineFilterList(Operator.OR);
+ Set