diff --git hadoop-hdfs-project/hadoop-hdfs-client/pom.xml hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
index 33c2ed9..992d1b5 100644
--- hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
+++ hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
@@ -45,6 +45,13 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
maven-surefire-plugin
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+ true
+
+
+ org.apache.ratapache-rat-plugin
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java
index a56d4d4..abe106f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java
@@ -129,6 +129,12 @@ public void setErrors(List errors) {
*/
public static final int FORBIDDEN_RELATION = 6;
+ /**
+ * Error code returned if the entity start time is before the eviction
+ * period of old data.
+ */
+ public static final int EXPIRED_ENTITY = 7;
+
private String entityId;
private String entityType;
private int errorCode;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 70b87f3..3bf25ed 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1431,6 +1431,18 @@ private static void addDeprecatedKeys() {
public static final long DEFAULT_TIMELINE_SERVICE_TTL_MS =
1000 * 60 * 60 * 24 * 7;
+ /** Timeline service rolling period. Valid values are daily, half_daily,
+ * quarter_daily, and hourly. */
+ public static final String TIMELINE_SERVICE_ROLLING_PERIOD =
+ TIMELINE_SERVICE_PREFIX + "rolling-period";
+
+ /** Roll a new database each hour. */
+ public static final String DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD =
+ "hourly";
+
+ /** Implementation specific configuration prefix for Timeline Service
+ * leveldb.
+ */
public static final String TIMELINE_SERVICE_LEVELDB_PREFIX =
TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.";
@@ -1438,13 +1450,36 @@ private static void addDeprecatedKeys() {
public static final String TIMELINE_SERVICE_LEVELDB_PATH =
TIMELINE_SERVICE_LEVELDB_PREFIX + "path";
- /** Timeline service leveldb read cache (uncompressed blocks) */
+ /** Timeline service leveldb read cache (uncompressed blocks). This is
+ * per rolling instance so should be tuned if using rolling leveldb
+ * timeline store */
public static final String TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
TIMELINE_SERVICE_LEVELDB_PREFIX + "read-cache-size";
+ /** Default leveldb read cache size if no configuration is specified. */
public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE =
100 * 1024 * 1024;
+ /** Timeline service leveldb write buffer size. */
+ public static final String TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE =
+ TIMELINE_SERVICE_LEVELDB_PREFIX + "write-buffer-size";
+
+ /** Default leveldb write buffer size if no configuration is specified. This
+ * is per rolling instance so should be tuned if using rolling leveldb
+ * timeline store. */
+ public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE =
+ 16 * 1024 * 1024;
+
+ /** Timeline service leveldb write batch size. This value can be tuned down
+ * to reduce lock time for ttl eviction. */
+ public static final String
+ TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE =
+ TIMELINE_SERVICE_LEVELDB_PREFIX + "write-batch-size";
+
+ /** Default leveldb write batch size is no configuration is specified */
+ public static final int
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE = 10000;
+
/** Timeline service leveldb start time read cache (number of entities) */
public static final String
TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE =
@@ -1468,6 +1503,16 @@ private static void addDeprecatedKeys() {
public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS =
1000 * 60 * 5;
+ /** Timeline service leveldb number of concurrent open files. Tuned this
+ * configuration to stay within system limits. This is per rolling instance
+ * so should be tuned if using rolling leveldb timeline store. */
+ public static final String TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES =
+ TIMELINE_SERVICE_LEVELDB_PREFIX + "max-open-files";
+
+ /** Default leveldb max open files if no configuration is specified. */
+ public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES =
+ 1000;
+
/** The Kerberos principal for the timeline server.*/
public static final String TIMELINE_SERVICE_PRINCIPAL =
TIMELINE_SERVICE_PREFIX + "principal";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
index 287a45a..d60e21c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
@@ -180,6 +180,11 @@
bcprov-jdk16test
+
+ de.ruedigermoeller
+ fst
+ 2.24
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java
new file mode 100644
index 0000000..0397682
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.time.FastDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Contains the logic to lookup a leveldb by timestamp so that multiple smaller
+ * databases can roll according to the configured period and evicted efficiently
+ * via operating system directory removal.
+ */
+class RollingLevelDB {
+
+ /** Logger for this class. */
+ private static final Log LOG = LogFactory.getLog(RollingLevelDB.class);
+ /** Factory to open and create new leveldb instances. */
+ private static JniDBFactory factory = new JniDBFactory();
+ /** Thread safe date formatter. */
+ private FastDateFormat fdf;
+ /** Date parser. */
+ private SimpleDateFormat sdf;
+ /** Calendar to calculate the current and next rolling period. */
+ private GregorianCalendar cal = new GregorianCalendar(
+ TimeZone.getTimeZone("GMT"));
+ /** Collection of all active rolling leveldb instances. */
+ private final TreeMap rollingdbs;
+ /** Collection of all rolling leveldb instances to evict. */
+ private final TreeMap rollingdbsToEvict;
+ /** Name of this rolling level db. */
+ private final String name;
+ /** Calculated timestamp of when to roll a new leveldb instance. */
+ private volatile long nextRollingCheckMillis = 0;
+ /** File system instance to find and create new leveldb instances. */
+ private FileSystem lfs = null;
+ /** Directory to store rolling leveldb instances. */
+ private Path rollingDBPath;
+ /** Configuration for this object. */
+ private Configuration conf;
+ /** Rolling period. */
+ private RollingPeriod rollingPeriod;
+ /**
+ * Rolling leveldb instances are evicted when their endtime is earlier than
+ * the current time minus the time to live value.
+ */
+ private long ttl;
+ /** Whether time to live is enabled. */
+ private boolean ttlEnabled;
+
+ /** Encapsulates the rolling period to date format lookup. */
+ enum RollingPeriod {
+ DAILY {
+ @Override
+ public String dateFormat() {
+ return "yyyy-MM-dd";
+ }
+ },
+ HALF_DAILY {
+ @Override
+ public String dateFormat() {
+ return "yyyy-MM-dd-HH";
+ }
+ },
+ QUARTER_DAILY {
+ @Override
+ public String dateFormat() {
+ return "yyyy-MM-dd-HH";
+ }
+ },
+ HOURLY {
+ @Override
+ public String dateFormat() {
+ return "yyyy-MM-dd-HH";
+ }
+ },
+ MINUTELY {
+ @Override
+ public String dateFormat() {
+ return "yyyy-MM-dd-HH-mm";
+ }
+ };
+ public abstract String dateFormat();
+ }
+
+ /**
+ * Convenience class for associating a write batch with its rolling leveldb
+ * instance.
+ */
+ public static class RollingWriteBatch {
+ /** Leveldb object. */
+ private final DB db;
+ /** Write batch for the db object. */
+ private final WriteBatch writeBatch;
+
+ public RollingWriteBatch(final DB db, final WriteBatch writeBatch) {
+ this.db = db;
+ this.writeBatch = writeBatch;
+ }
+
+ public DB getDB() {
+ return db;
+ }
+
+ public WriteBatch getWriteBatch() {
+ return writeBatch;
+ }
+
+ public void write() {
+ db.write(writeBatch);
+ }
+
+ public void close() {
+ IOUtils.cleanup(LOG, writeBatch);
+ }
+ }
+
+ RollingLevelDB(String name) {
+ this.name = name;
+ this.rollingdbs = new TreeMap();
+ this.rollingdbsToEvict = new TreeMap();
+ }
+
+ protected String getName() {
+ return name;
+ }
+
+ protected long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+
+ public long getNextRollingTimeMillis() {
+ return nextRollingCheckMillis;
+ }
+
+ public long getTimeToLive() {
+ return ttl;
+ }
+
+ public boolean getTimeToLiveEnabled() {
+ return ttlEnabled;
+ }
+
+ protected void setNextRollingTimeMillis(final long timestamp) {
+ this.nextRollingCheckMillis = timestamp;
+ LOG.info("Next rolling time for " + getName() + " is "
+ + fdf.format(nextRollingCheckMillis));
+ }
+
+ public void init(final Configuration config) throws Exception {
+ LOG.info("Initializing RollingLevelDB for " + getName());
+ this.conf = config;
+ this.ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS);
+ this.ttlEnabled = conf.getBoolean(
+ YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true);
+ this.rollingDBPath = new Path(
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH),
+ RollingLevelDBTimelineStore.FILENAME);
+ initFileSystem();
+ initRollingPeriod();
+ initHistoricalDBs();
+ }
+
+ protected void initFileSystem() throws IOException {
+ lfs = FileSystem.getLocal(conf);
+ boolean success = lfs.mkdirs(rollingDBPath,
+ RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK);
+ if (!success) {
+ throw new IOException("Failed to create leveldb root directory "
+ + rollingDBPath);
+ }
+ }
+
+ protected synchronized void initRollingPeriod() {
+ final String lcRollingPeriod = conf.get(
+ YarnConfiguration.TIMELINE_SERVICE_ROLLING_PERIOD,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD);
+ this.rollingPeriod = RollingPeriod.valueOf(lcRollingPeriod
+ .toUpperCase(Locale.ENGLISH));
+ fdf = FastDateFormat.getInstance(rollingPeriod.dateFormat(),
+ TimeZone.getTimeZone("GMT"));
+ sdf = new SimpleDateFormat(rollingPeriod.dateFormat());
+ sdf.setTimeZone(fdf.getTimeZone());
+ }
+
+ protected synchronized void initHistoricalDBs() throws IOException {
+ Path rollingDBGlobPath = new Path(rollingDBPath, getName() + ".*");
+ FileStatus[] statuses = lfs.globStatus(rollingDBGlobPath);
+ for (FileStatus status : statuses) {
+ String dbName = FilenameUtils.getExtension(status.getPath().toString());
+ try {
+ Long dbStartTime = sdf.parse(dbName).getTime();
+ initRollingLevelDB(dbStartTime, status.getPath());
+ } catch (ParseException pe) {
+ LOG.warn("Failed to initialize rolling leveldb " + dbName + " for "
+ + getName());
+ }
+ }
+ }
+
+ private void initRollingLevelDB(Long dbStartTime,
+ Path rollingInstanceDBPath) {
+ if (rollingdbs.containsKey(dbStartTime)) {
+ return;
+ }
+ Options options = new Options();
+ options.createIfMissing(true);
+ options.cacheSize(conf.getLong(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
+ options.maxOpenFiles(conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES));
+ options.writeBufferSize(conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
+ LOG.info("Initializing rolling leveldb instance :" + rollingInstanceDBPath
+ + " for start time: " + dbStartTime);
+ DB db = null;
+ try {
+ db = factory.open(
+ new File(rollingInstanceDBPath.toUri().getPath()), options);
+ rollingdbs.put(dbStartTime, db);
+ String dbName = fdf.format(dbStartTime);
+ LOG.info("Added rolling leveldb instance " + dbName + " to " + getName());
+ } catch (IOException ioe) {
+ LOG.warn("Failed to open rolling leveldb instance :"
+ + new File(rollingInstanceDBPath.toUri().getPath()), ioe);
+ }
+ }
+
+ synchronized DB getPreviousDB(DB db) {
+ Iterator iterator = rollingdbs.values().iterator();
+ DB prev = null;
+ while (iterator.hasNext()) {
+ DB cur = iterator.next();
+ if (cur == db) {
+ break;
+ }
+ prev = cur;
+ }
+ return prev;
+ }
+
+ synchronized long getStartTimeFor(DB db) {
+ long startTime = -1;
+ for (Map.Entry entry : rollingdbs.entrySet()) {
+ if (entry.getValue() == db) {
+ startTime = entry.getKey();
+ }
+ }
+ return startTime;
+ }
+
+ public synchronized DB getDBForStartTime(long startTime) {
+ // make sure we sanitize this input
+ startTime = Math.min(startTime, currentTimeMillis());
+
+ if (startTime >= getNextRollingTimeMillis()) {
+ roll(startTime);
+ }
+ Entry entry = rollingdbs.floorEntry(startTime);
+ if (entry == null) {
+ return null;
+ }
+ return entry.getValue();
+ }
+
+ private void roll(long startTime) {
+ LOG.info("Rolling new DB instance for " + getName());
+ long currentStartTime = computeCurrentCheckMillis(startTime);
+ setNextRollingTimeMillis(computeNextCheckMillis(currentStartTime));
+ String currentRollingDBInstance = fdf.format(currentStartTime);
+ String currentRollingDBName = getName() + "." + currentRollingDBInstance;
+ Path currentRollingDBPath = new Path(rollingDBPath, currentRollingDBName);
+ if (getTimeToLiveEnabled()) {
+ scheduleOldDBsForEviction();
+ }
+ initRollingLevelDB(currentStartTime, currentRollingDBPath);
+ }
+
+ private synchronized void scheduleOldDBsForEviction() {
+ // keep at least time to live amount of data
+ long evictionThreshold = computeCurrentCheckMillis(currentTimeMillis()
+ - getTimeToLive());
+
+ LOG.info("Scheduling " + getName() + " DBs older than "
+ + fdf.format(evictionThreshold) + " for eviction");
+ Iterator> iterator = rollingdbs.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry entry = iterator.next();
+ // parse this in gmt time
+ if (entry.getKey() < evictionThreshold) {
+ LOG.info("Scheduling " + getName() + " eviction for "
+ + fdf.format(entry.getKey()));
+ iterator.remove();
+ rollingdbsToEvict.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ public synchronized void evictOldDBs() {
+ LOG.info("Evicting " + getName() + " DBs scheduled for eviction");
+ Iterator> iterator = rollingdbsToEvict.entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ Entry entry = iterator.next();
+ IOUtils.cleanup(LOG, entry.getValue());
+ String dbName = fdf.format(entry.getKey());
+ Path path = new Path(rollingDBPath, getName() + "." + dbName);
+ try {
+ LOG.info("Removing old db directory contents in " + path);
+ lfs.delete(path, true);
+ } catch (IOException ioe) {
+ LOG.warn("Failed to evict old db " + path, ioe);
+ }
+ iterator.remove();
+ }
+ }
+
+ public void stop() throws Exception {
+ for (DB db : rollingdbs.values()) {
+ IOUtils.cleanup(LOG, db);
+ }
+ IOUtils.cleanup(LOG, lfs);
+ }
+
+ private long computeNextCheckMillis(long now) {
+ return computeCheckMillis(now, true);
+ }
+
+ public long computeCurrentCheckMillis(long now) {
+ return computeCheckMillis(now, false);
+ }
+
+ private synchronized long computeCheckMillis(long now, boolean next) {
+ // needs to be called synchronously due to shared Calendar
+ cal.setTimeInMillis(now);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ if (rollingPeriod == RollingPeriod.DAILY) {
+ cal.set(Calendar.HOUR_OF_DAY, 0);
+ cal.set(Calendar.MINUTE, 0);
+ if (next) {
+ cal.add(Calendar.DATE, 1);
+ }
+ } else if (rollingPeriod == RollingPeriod.HALF_DAILY) {
+ // round down to 12 hour interval
+ int hour = (cal.get(Calendar.HOUR) / 12) * 12;
+ cal.set(Calendar.HOUR, hour);
+ cal.set(Calendar.MINUTE, 0);
+ if (next) {
+ cal.add(Calendar.HOUR_OF_DAY, 12);
+ }
+ } else if (rollingPeriod == RollingPeriod.QUARTER_DAILY) {
+ // round down to 6 hour interval
+ int hour = (cal.get(Calendar.HOUR) / 6) * 6;
+ cal.set(Calendar.HOUR, hour);
+ cal.set(Calendar.MINUTE, 0);
+ if (next) {
+ cal.add(Calendar.HOUR_OF_DAY, 6);
+ }
+ } else if (rollingPeriod == RollingPeriod.HOURLY) {
+ cal.set(Calendar.MINUTE, 0);
+ if (next) {
+ cal.add(Calendar.HOUR_OF_DAY, 1);
+ }
+ } else if (rollingPeriod == RollingPeriod.MINUTELY) {
+ // round down to 5 minute interval
+ int minute = (cal.get(Calendar.MINUTE) / 5) * 5;
+ cal.set(Calendar.MINUTE, minute);
+ if (next) {
+ cal.add(Calendar.MINUTE, 5);
+ }
+ }
+ return cal.getTimeInMillis();
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java
new file mode 100644
index 0000000..e98ac5a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java
@@ -0,0 +1,1780 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.*;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
+import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
+import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.*;
+import org.nustaq.serialization.FSTConfiguration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
+import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
+import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
+import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_MS;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+
+/**
+ *
+ * An implementation of an application timeline store backed by leveldb.
+ *
+ *
+ *
+ * There are three sections of the db, the start time section, the entity
+ * section, and the indexed entity section.
+ *
+ *
+ *
+ * The start time section is used to retrieve the unique start time for a given
+ * entity. Its values each contain a start time while its keys are of the form:
+ *
+ *
+ *
+ * START_TIME_LOOKUP_PREFIX + entity type + entity id
+ *
+ *
+ *
+ * The entity section is ordered by entity type, then entity start time
+ * descending, then entity ID. There are four sub-sections of the entity
+ * section: events, primary filters, related entities, and other info. The event
+ * entries have event info serialized into their values. The other info entries
+ * have values corresponding to the values of the other info name/value map for
+ * the entry (note the names are contained in the key). All other entries have
+ * empty values. The key structure is as follows:
+ *
+ *
+ *
+ * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
+ *
+ * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ * DOMAIN_ID_COLUMN
+ *
+ * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ * EVENTS_COLUMN + reveventtimestamp + eventtype
+ *
+ * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ * PRIMARY_FILTERS_COLUMN + name + value
+ *
+ * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ * OTHER_INFO_COLUMN + name
+ *
+ * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ * RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
+ *
+ *
+ *
+ * The indexed entity section contains a primary filter name and primary filter
+ * value as the prefix. Within a given name/value, entire entity entries are
+ * stored in the same format as described in the entity section above (below,
+ * "key" represents any one of the possible entity entry keys described above).
+ *
+ *
+ *
+ * INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
+ * key
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RollingLevelDBTimelineStore extends AbstractService implements
+ TimelineStore {
+ private static final Log LOG = LogFactory
+ .getLog(RollingLevelDBTimelineStore.class);
+ private static FSTConfiguration fstConf =
+ FSTConfiguration.createDefaultConfiguration();
+
+ static {
+ fstConf.setShareReferences(false);
+ }
+
+ @Private
+ @VisibleForTesting
+ static final String FILENAME = "leveldb-timeline-store";
+ static final String DOMAIN = "domain-ldb";
+ static final String ENTITY = "entity-ldb";
+ static final String INDEX = "indexes-ldb";
+ static final String STARTTIME = "starttime-ldb";
+ static final String OWNER = "owner-ldb";
+
+ private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8);
+ private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8);
+ private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8);
+ private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(UTF_8);
+ private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes(UTF_8);
+
+ private static final byte[] DESCRIPTION_COLUMN = "d".getBytes(UTF_8);
+ private static final byte[] OWNER_COLUMN = "o".getBytes(UTF_8);
+ private static final byte[] READER_COLUMN = "r".getBytes(UTF_8);
+ private static final byte[] WRITER_COLUMN = "w".getBytes(UTF_8);
+ private static final byte[] TIMESTAMP_COLUMN = "t".getBytes(UTF_8);
+
+ private static final byte[] EMPTY_BYTES = new byte[0];
+
+ private static final String TIMELINE_STORE_VERSION_KEY =
+ "timeline-store-version";
+
+ private static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 0);
+
+ private static long writeBatchSize = 10000;
+
+ @Private
+ @VisibleForTesting
+ static final FsPermission LEVELDB_DIR_UMASK = FsPermission
+ .createImmutable((short) 0700);
+
+ private Map startTimeWriteCache;
+ private Map startTimeReadCache;
+
+ private DB domaindb;
+ private RollingLevelDB entitydb;
+ private RollingLevelDB indexdb;
+ private DB starttimedb;
+ private DB ownerdb;
+
+ private Thread deletionThread;
+
+ public RollingLevelDBTimelineStore() {
+ super(RollingLevelDBTimelineStore.class.getName());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void serviceInit(Configuration conf) throws Exception {
+ Preconditions
+ .checkArgument(conf.getLong(TIMELINE_SERVICE_TTL_MS,
+ DEFAULT_TIMELINE_SERVICE_TTL_MS) > 0,
+ "%s property value should be greater than zero",
+ TIMELINE_SERVICE_TTL_MS);
+ Preconditions.checkArgument(conf.getLong(
+ TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS) > 0,
+ "%s property value should be greater than zero",
+ TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
+ Preconditions.checkArgument(conf.getLong(
+ TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE) >= 0,
+ "%s property value should be greater than or equal to zero",
+ TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE);
+ Preconditions.checkArgument(conf.getLong(
+ TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE) > 0,
+ " %s property value should be greater than zero",
+ TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
+ Preconditions.checkArgument(conf.getLong(
+ TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE) > 0,
+ "%s property value should be greater than zero",
+ TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
+ Preconditions.checkArgument(conf.getLong(
+ TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES) > 0,
+ "%s property value should be greater than zero",
+ TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES);
+ Preconditions.checkArgument(conf.getLong(
+ TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE) > 0,
+ "%s property value should be greater than zero",
+ TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE);
+
+ Options options = new Options();
+ options.createIfMissing(true);
+ options.cacheSize(conf.getLong(
+ TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
+ JniDBFactory factory = new JniDBFactory();
+ Path dbPath = new Path(
+ conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
+ Path domainDBPath = new Path(dbPath, DOMAIN);
+ Path starttimeDBPath = new Path(dbPath, STARTTIME);
+ Path ownerDBPath = new Path(dbPath, OWNER);
+ FileSystem localFS = null;
+ try {
+ localFS = FileSystem.getLocal(conf);
+ if (!localFS.exists(dbPath)) {
+ if (!localFS.mkdirs(dbPath)) {
+ throw new IOException("Couldn't create directory for leveldb "
+ + "timeline store " + dbPath);
+ }
+ localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
+ }
+ if (!localFS.exists(domainDBPath)) {
+ if (!localFS.mkdirs(domainDBPath)) {
+ throw new IOException("Couldn't create directory for leveldb "
+ + "timeline store " + domainDBPath);
+ }
+ localFS.setPermission(domainDBPath, LEVELDB_DIR_UMASK);
+ }
+ if (!localFS.exists(starttimeDBPath)) {
+ if (!localFS.mkdirs(starttimeDBPath)) {
+ throw new IOException("Couldn't create directory for leveldb "
+ + "timeline store " + starttimeDBPath);
+ }
+ localFS.setPermission(starttimeDBPath, LEVELDB_DIR_UMASK);
+ }
+ if (!localFS.exists(ownerDBPath)) {
+ if (!localFS.mkdirs(ownerDBPath)) {
+ throw new IOException("Couldn't create directory for leveldb "
+ + "timeline store " + ownerDBPath);
+ }
+ localFS.setPermission(ownerDBPath, LEVELDB_DIR_UMASK);
+ }
+ } finally {
+ IOUtils.cleanup(LOG, localFS);
+ }
+ options.maxOpenFiles(conf.getInt(
+ TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES));
+ options.writeBufferSize(conf.getInt(
+ TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
+ LOG.info("Using leveldb path " + dbPath);
+ domaindb = factory.open(new File(domainDBPath.toString()), options);
+ entitydb = new RollingLevelDB(ENTITY);
+ entitydb.init(conf);
+ indexdb = new RollingLevelDB(INDEX);
+ indexdb.init(conf);
+ starttimedb = factory.open(new File(starttimeDBPath.toString()), options);
+ ownerdb = factory.open(new File(ownerDBPath.toString()), options);
+ checkVersion();
+ startTimeWriteCache = Collections.synchronizedMap(new LRUMap(
+ getStartTimeWriteCacheSize(conf)));
+ startTimeReadCache = Collections.synchronizedMap(new LRUMap(
+ getStartTimeReadCacheSize(conf)));
+
+ if (conf.getBoolean(TIMELINE_SERVICE_TTL_ENABLE, true)) {
+ deletionThread = new EntityDeletionThread(conf);
+ deletionThread.start();
+ }
+
+ writeBatchSize = conf.getInt(
+ TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE,
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE);
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (deletionThread != null) {
+ deletionThread.interrupt();
+ LOG.info("Waiting for deletion thread to complete its current action");
+ try {
+ deletionThread.join();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for deletion thread to complete,"
+ + " closing db now", e);
+ }
+ }
+ IOUtils.cleanup(LOG, domaindb);
+ IOUtils.cleanup(LOG, starttimedb);
+ IOUtils.cleanup(LOG, ownerdb);
+ entitydb.stop();
+ indexdb.stop();
+ super.serviceStop();
+ }
+
+ private class EntityDeletionThread extends Thread {
+ private final long ttl;
+ private final long ttlInterval;
+
+ public EntityDeletionThread(Configuration conf) {
+ ttl = conf.getLong(TIMELINE_SERVICE_TTL_MS,
+ DEFAULT_TIMELINE_SERVICE_TTL_MS);
+ ttlInterval = conf.getLong(
+ TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
+ DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
+ LOG.info("Starting deletion thread with ttl " + ttl + " and cycle "
+ + "interval " + ttlInterval);
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("Leveldb Timeline Store Retention");
+ while (true) {
+ long timestamp = System.currentTimeMillis() - ttl;
+ try {
+ discardOldEntities(timestamp);
+ Thread.sleep(ttlInterval);
+ } catch (IOException e) {
+ LOG.error(e);
+ } catch (InterruptedException e) {
+ LOG.info("Deletion thread received interrupt, exiting");
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public TimelineEntity getEntity(String entityId, String entityType,
+ EnumSet fields) throws IOException {
+ Long revStartTime = getStartTimeLong(entityId, entityType);
+ if (revStartTime == null) {
+ return null;
+ }
+ byte[] prefix = KeyBuilder.newInstance().add(entityType)
+ .add(writeReverseOrderedLong(revStartTime)).add(entityId)
+ .getBytesForLookup();
+
+ DBIterator iterator = null;
+ try {
+ DB db = entitydb.getDBForStartTime(revStartTime);
+ if (db == null) {
+ return null;
+ }
+ iterator = db.iterator();
+ iterator.seek(prefix);
+
+ return getEntity(entityId, entityType, revStartTime, fields, iterator,
+ prefix, prefix.length);
+ } finally {
+ IOUtils.cleanup(LOG, iterator);
+ }
+ }
+
+ /**
+ * Read entity from a db iterator. If no information is found in the specified
+ * fields for this entity, return null.
+ */
+ private static TimelineEntity getEntity(String entityId, String entityType,
+ Long startTime, EnumSet fields, DBIterator iterator,
+ byte[] prefix, int prefixlen) throws IOException {
+ if (fields == null) {
+ fields = EnumSet.allOf(Field.class);
+ }
+
+ TimelineEntity entity = new TimelineEntity();
+ boolean events = false;
+ boolean lastEvent = false;
+ if (fields.contains(Field.EVENTS)) {
+ events = true;
+ } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
+ lastEvent = true;
+ } else {
+ entity.setEvents(null);
+ }
+ boolean relatedEntities = false;
+ if (fields.contains(Field.RELATED_ENTITIES)) {
+ relatedEntities = true;
+ } else {
+ entity.setRelatedEntities(null);
+ }
+ boolean primaryFilters = false;
+ if (fields.contains(Field.PRIMARY_FILTERS)) {
+ primaryFilters = true;
+ } else {
+ entity.setPrimaryFilters(null);
+ }
+ boolean otherInfo = false;
+ if (fields.contains(Field.OTHER_INFO)) {
+ otherInfo = true;
+ } else {
+ entity.setOtherInfo(null);
+ }
+
+ // iterate through the entity's entry, parsing information if it is part
+ // of a requested field
+ for (; iterator.hasNext(); iterator.next()) {
+ byte[] key = iterator.peekNext().getKey();
+ if (!prefixMatches(prefix, prefixlen, key)) {
+ break;
+ }
+ if (key.length == prefixlen) {
+ continue;
+ }
+ if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
+ if (primaryFilters) {
+ addPrimaryFilter(entity, key, prefixlen
+ + PRIMARY_FILTERS_COLUMN.length);
+ }
+ } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
+ if (otherInfo) {
+ entity.addOtherInfo(
+ parseRemainingKey(key, prefixlen + OTHER_INFO_COLUMN.length),
+ fstConf.asObject(iterator.peekNext().getValue()));
+ }
+ } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
+ if (relatedEntities) {
+ addRelatedEntity(entity, key, prefixlen
+ + RELATED_ENTITIES_COLUMN.length);
+ }
+ } else if (key[prefixlen] == EVENTS_COLUMN[0]) {
+ if (events || (lastEvent && entity.getEvents().size() == 0)) {
+ TimelineEvent event = getEntityEvent(null, key, prefixlen
+ + EVENTS_COLUMN.length, iterator.peekNext().getValue());
+ if (event != null) {
+ entity.addEvent(event);
+ }
+ }
+ } else if (key[prefixlen] == DOMAIN_ID_COLUMN[0]) {
+ byte[] v = iterator.peekNext().getValue();
+ String domainId = new String(v, UTF_8);
+ entity.setDomainId(domainId);
+ } else {
+ LOG.warn(String.format("Found unexpected column for entity %s of "
+ + "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
+ }
+ }
+
+ entity.setEntityId(entityId);
+ entity.setEntityType(entityType);
+ entity.setStartTime(startTime);
+
+ return entity;
+ }
+
+ @Override
+ public TimelineEvents getEntityTimelines(String entityType,
+ SortedSet entityIds, Long limit, Long windowStart,
+ Long windowEnd, Set eventType) throws IOException {
+ TimelineEvents events = new TimelineEvents();
+ if (entityIds == null || entityIds.isEmpty()) {
+ return events;
+ }
+ // create a lexicographically-ordered map from start time to entities
+ Map> startTimeMap =
+ new TreeMap>(
+ new Comparator() {
+ @Override
+ public int compare(byte[] o1, byte[] o2) {
+ return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
+ o2.length);
+ }
+ });
+ DBIterator iterator = null;
+ try {
+ // look up start times for the specified entities
+ // skip entities with no start time
+ for (String entityId : entityIds) {
+ byte[] startTime = getStartTime(entityId, entityType);
+ if (startTime != null) {
+ List entities = startTimeMap.get(startTime);
+ if (entities == null) {
+ entities = new ArrayList();
+ startTimeMap.put(startTime, entities);
+ }
+ entities.add(new EntityIdentifier(entityId, entityType));
+ }
+ }
+ for (Entry> entry : startTimeMap
+ .entrySet()) {
+ // look up the events matching the given parameters (limit,
+ // start time, end time, event types) for entities whose start times
+ // were found and add the entities to the return list
+ byte[] revStartTime = entry.getKey();
+ for (EntityIdentifier entityIdentifier : entry.getValue()) {
+ EventsOfOneEntity entity = new EventsOfOneEntity();
+ entity.setEntityId(entityIdentifier.getId());
+ entity.setEntityType(entityType);
+ events.addEvent(entity);
+ KeyBuilder kb = KeyBuilder.newInstance().add(entityType)
+ .add(revStartTime).add(entityIdentifier.getId())
+ .add(EVENTS_COLUMN);
+ byte[] prefix = kb.getBytesForLookup();
+ if (windowEnd == null) {
+ windowEnd = Long.MAX_VALUE;
+ }
+ byte[] revts = writeReverseOrderedLong(windowEnd);
+ kb.add(revts);
+ byte[] first = kb.getBytesForLookup();
+ byte[] last = null;
+ if (windowStart != null) {
+ last = KeyBuilder.newInstance().add(prefix)
+ .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
+ }
+ if (limit == null) {
+ limit = DEFAULT_LIMIT;
+ }
+ DB db = entitydb.getDBForStartTime(readReverseOrderedLong(
+ revStartTime, 0));
+ if (db == null) {
+ continue;
+ }
+ iterator = db.iterator();
+ for (iterator.seek(first); entity.getEvents().size() < limit
+ && iterator.hasNext(); iterator.next()) {
+ byte[] key = iterator.peekNext().getKey();
+ if (!prefixMatches(prefix, prefix.length, key)
+ || (last != null && WritableComparator.compareBytes(key, 0,
+ key.length, last, 0, last.length) > 0)) {
+ break;
+ }
+ TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
+ iterator.peekNext().getValue());
+ if (event != null) {
+ entity.addEvent(event);
+ }
+ }
+ }
+ }
+ } finally {
+ IOUtils.cleanup(LOG, iterator);
+ }
+ return events;
+ }
+
+ @Override
+ public TimelineEntities getEntities(String entityType, Long limit,
+ Long windowStart, Long windowEnd, String fromId, Long fromTs,
+ NameValuePair primaryFilter, Collection secondaryFilters,
+ EnumSet fields, CheckAcl checkAcl) throws IOException {
+ if (primaryFilter == null) {
+ // if no primary filter is specified, prefix the lookup with
+ // ENTITY_ENTRY_PREFIX
+ return getEntityByTime(EMPTY_BYTES, entityType, limit, windowStart,
+ windowEnd, fromId, fromTs, secondaryFilters, fields, checkAcl, false);
+ } else {
+ // if a primary filter is specified, prefix the lookup with
+ // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
+ // ENTITY_ENTRY_PREFIX
+ byte[] base = KeyBuilder.newInstance().add(primaryFilter.getName())
+ .add(fstConf.asByteArray(primaryFilter.getValue()), true)
+ .getBytesForLookup();
+ return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
+ fromId, fromTs, secondaryFilters, fields, checkAcl, true);
+ }
+ }
+
+ /**
+ * Retrieves a list of entities satisfying given parameters.
+ *
+ * @param base
+ * A byte array prefix for the lookup
+ * @param entityType
+ * The type of the entity
+ * @param limit
+ * A limit on the number of entities to return
+ * @param starttime
+ * The earliest entity start time to retrieve (exclusive)
+ * @param endtime
+ * The latest entity start time to retrieve (inclusive)
+ * @param fromId
+ * Retrieve entities starting with this entity
+ * @param fromTs
+ * Ignore entities with insert timestamp later than this ts
+ * @param secondaryFilters
+ * Filter pairs that the entities should match
+ * @param fields
+ * The set of fields to retrieve
+ * @param usingPrimaryFilter
+ * true if this query is using a primary filter
+ * @return A list of entities
+ * @throws IOException
+ */
+ private TimelineEntities getEntityByTime(byte[] base, String entityType,
+ Long limit, Long starttime, Long endtime, String fromId, Long fromTs,
+ Collection secondaryFilters, EnumSet fields,
+ CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException {
+ DBIterator iterator = null;
+ try {
+ KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
+ // only db keys matching the prefix (base + entity type) will be parsed
+ byte[] prefix = kb.getBytesForLookup();
+ if (endtime == null) {
+ // if end time is null, place no restriction on end time
+ endtime = Long.MAX_VALUE;
+ }
+
+ // Sanitize the fields parameter
+ if (fields == null) {
+ fields = EnumSet.allOf(Field.class);
+ }
+
+ // construct a first key that will be seeked to using end time or fromId
+ long firstStartTime = Long.MAX_VALUE;
+ byte[] first = null;
+ if (fromId != null) {
+ Long fromIdStartTime = getStartTimeLong(fromId, entityType);
+ if (fromIdStartTime == null) {
+ // no start time for provided id, so return empty entities
+ return new TimelineEntities();
+ }
+ if (fromIdStartTime <= endtime) {
+ // if provided id's start time falls before the end of the window,
+ // use it to construct the seek key
+ firstStartTime = fromIdStartTime;
+ first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId)
+ .getBytesForLookup();
+ }
+ }
+ // if seek key wasn't constructed using fromId, construct it using end ts
+ if (first == null) {
+ firstStartTime = endtime;
+ first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
+ }
+ byte[] last = null;
+ if (starttime != null) {
+ // if start time is not null, set a last key that will not be
+ // iterated past
+ last = KeyBuilder.newInstance().add(base).add(entityType)
+ .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
+ }
+ if (limit == null) {
+ // if limit is not specified, use the default
+ limit = DEFAULT_LIMIT;
+ }
+
+ TimelineEntities entities = new TimelineEntities();
+ RollingLevelDB rollingdb = null;
+ if (usingPrimaryFilter) {
+ rollingdb = indexdb;
+ } else {
+ rollingdb = entitydb;
+ }
+
+ DB db = rollingdb.getDBForStartTime(firstStartTime);
+ while (entities.getEntities().size() < limit && db != null) {
+ iterator = db.iterator();
+ iterator.seek(first);
+
+ // iterate until one of the following conditions is met: limit is
+ // reached, there are no more keys, the key prefix no longer matches,
+ // or a start time has been specified and reached/exceeded
+ while (entities.getEntities().size() < limit && iterator.hasNext()) {
+ byte[] key = iterator.peekNext().getKey();
+ if (!prefixMatches(prefix, prefix.length, key)
+ || (last != null && WritableComparator.compareBytes(key, 0,
+ key.length, last, 0, last.length) > 0)) {
+ break;
+ }
+ // read the start time and entity id from the current key
+ KeyParser kp = new KeyParser(key, prefix.length);
+ Long startTime = kp.getNextLong();
+ String entityId = kp.getNextString();
+
+ if (fromTs != null) {
+ long insertTime = readReverseOrderedLong(iterator.peekNext()
+ .getValue(), 0);
+ if (insertTime > fromTs) {
+ byte[] firstKey = key;
+ while (iterator.hasNext()) {
+ key = iterator.peekNext().getKey();
+ iterator.next();
+ if (!prefixMatches(firstKey, kp.getOffset(), key)) {
+ break;
+ }
+ }
+ continue;
+ }
+ }
+ // Even if other info and primary filter fields are not included, we
+ // still need to load them to match secondary filters when they are
+ // non-empty
+ EnumSet queryFields = EnumSet.copyOf(fields);
+ boolean addPrimaryFilters = false;
+ boolean addOtherInfo = false;
+ if (secondaryFilters != null && secondaryFilters.size() > 0) {
+ if (!queryFields.contains(Field.PRIMARY_FILTERS)) {
+ queryFields.add(Field.PRIMARY_FILTERS);
+ addPrimaryFilters = true;
+ }
+ if (!queryFields.contains(Field.OTHER_INFO)) {
+ queryFields.add(Field.OTHER_INFO);
+ addOtherInfo = true;
+ }
+ }
+
+ // parse the entity that owns this key, iterating over all keys for
+ // the entity
+ TimelineEntity entity = null;
+ if (usingPrimaryFilter) {
+ entity = getEntity(entityId, entityType, queryFields);
+ iterator.next();
+ } else {
+ entity = getEntity(entityId, entityType, startTime, queryFields,
+ iterator, key, kp.getOffset());
+ }
+ // determine if the retrieved entity matches the provided secondary
+ // filters, and if so add it to the list of entities to return
+ boolean filterPassed = true;
+ if (secondaryFilters != null) {
+ for (NameValuePair filter : secondaryFilters) {
+ Object v = entity.getOtherInfo().get(filter.getName());
+ if (v == null) {
+ Set