diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java index a56d4d4..7ef642e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java @@ -129,6 +129,11 @@ public void setErrors(List errors) { */ public static final int FORBIDDEN_RELATION = 6; + /** + * Error code returned if the entity start time is before the eviction period of old data + */ + public static final int EXPIRED_ENTITY = 7; + private String entityId; private String entityType; private int errorCode; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 81bcb9b..3d0ef3a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1365,6 +1365,13 @@ private static void addDeprecatedKeys() { public static final long DEFAULT_TIMELINE_SERVICE_TTL_MS = 1000 * 60 * 60 * 24 * 7; + /** Timeline service rolling period {daily, hourly} */ + public static final String TIMELINE_SERVICE_ROLLING_PERIOD = + TIMELINE_SERVICE_PREFIX + "rolling-period"; + + public static final String DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD = + "hourly"; + public static final String TIMELINE_SERVICE_LEVELDB_PREFIX = TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store."; @@ -1379,6 +1386,21 @@ private static void addDeprecatedKeys() { public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE = 100 * 1024 * 1024; + /** Timeline service leveldb write buffer size */ + public static final String TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE = + TIMELINE_SERVICE_LEVELDB_PREFIX + "write-buffer-size"; + + public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE = + 128 * 1024 * 1024; + + /** Timeline service leveldb write batch size */ + public static final String + TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE = + TIMELINE_SERVICE_LEVELDB_PREFIX + "write-batch-size"; + + public static final int + DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE = 10000; + /** Timeline service leveldb start time read cache (number of entities) */ public static final String TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE = @@ -1402,6 +1424,13 @@ private static void addDeprecatedKeys() { public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS = 1000 * 60 * 5; + /** Timeline service leveldb number of concurrent open files */ + public static final String TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES = + TIMELINE_SERVICE_LEVELDB_PREFIX + "max-open-files"; + + public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES = + 1000; + /** The Kerberos principal for the timeline server.*/ public static final String TIMELINE_SERVICE_PRINCIPAL = TIMELINE_SERVICE_PREFIX + "principal"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java index e69de29..52e4a6a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java @@ -0,0 +1,388 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import java.io.File; +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.Iterator; +import java.util.Locale; +import java.util.Map; +import java.util.TimeZone; +import java.util.TreeMap; +import java.util.Map.Entry; + +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.Options; +import org.iq80.leveldb.WriteBatch; + +import com.google.common.annotations.VisibleForTesting; + +class RollingLevelDB { + + private static final Log LOG = LogFactory.getLog(RollingLevelDB.class); + private static JniDBFactory factory = new JniDBFactory(); + // Thread safe date formatter + private static FastDateFormat fdf; + private SimpleDateFormat sdf; + + private GregorianCalendar cal = new GregorianCalendar(TimeZone.getTimeZone("GMT")); + private final TreeMap rollingdbs; + private final TreeMap rollingdbsToEvict; + private final String name; + private volatile long nextRollingCheckMillis = 0; + private FileSystem lfs = null; + private Path rollingDBPath; + private Configuration conf; + private RollingPeriod rollingPeriod; + private long ttl; + private boolean ttlEnabled; + + enum RollingPeriod { + DAILY { + @Override + public String dateFormat() { + return "yyyy-MM-dd"; + } + }, + HALF_DAILY { + @Override + public String dateFormat() { + return "yyyy-MM-dd-HH"; + } + }, + QUARTER_DAILY { + @Override + public String dateFormat() { + return "yyyy-MM-dd-HH"; + } + }, + HOURLY { + @Override + public String dateFormat() { + return "yyyy-MM-dd-HH"; + } + }, + MINUTELY { + @Override + public String dateFormat() { + return "yyyy-MM-dd-HH-mm"; + } + }; + public abstract String dateFormat(); + } + + public static class RollingWriteBatch { + private final DB db; + private final WriteBatch writeBatch; + + public RollingWriteBatch(DB db, WriteBatch writeBatch) { + this.db = db; + this.writeBatch = writeBatch; + } + + public DB getDB() { + return db; + } + + public WriteBatch getWriteBatch() { + return writeBatch; + } + + public void write() { + db.write(writeBatch); + } + + public void close() { + IOUtils.cleanup(LOG, writeBatch); + } + } + + RollingLevelDB(String name) { + this.name = name; + this.rollingdbs = new TreeMap(); + this.rollingdbsToEvict = new TreeMap(); + } + + protected String getName() { + return name; + } + + protected long currentTimeMillis() { + return System.currentTimeMillis(); + } + + public long getNextRollingTimeMillis() { + return nextRollingCheckMillis; + } + + public long getTimeToLive() { + return ttl; + } + + public boolean getTimeToLiveEnabled() { + return ttlEnabled; + } + + protected void setNextRollingTimeMillis(long nextRollingCheckMillis) { + this.nextRollingCheckMillis = nextRollingCheckMillis; + LOG.info("Next rolling time for " + getName() + " is " + fdf.format(nextRollingCheckMillis)); + } + + public void init(Configuration conf) throws Exception { + LOG.info("Initializing RollingLevelDB for " + getName()); + this.conf = conf; + this.ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS); + this.ttlEnabled = conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true); + this.rollingDBPath = new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), + RollingLevelDBTimelineStore.FILENAME); + initRollingPeriod(); + initFileSystem(); + initHistoricalDBs(); + } + + protected void initFileSystem() throws IOException { + lfs = FileSystem.getLocal(conf); + boolean success = lfs.mkdirs(rollingDBPath, RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK); + if (!success) { + throw new IOException("Failed to create leveldb root directory " + rollingDBPath); + } + } + + protected void initRollingPeriod() { + this.rollingPeriod = RollingPeriod.valueOf(conf.get(YarnConfiguration.TIMELINE_SERVICE_ROLLING_PERIOD, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD).toUpperCase(Locale.ENGLISH)); + fdf = FastDateFormat.getInstance(rollingPeriod.dateFormat(), TimeZone.getTimeZone("GMT")); + sdf = new SimpleDateFormat(rollingPeriod.dateFormat()); + sdf.setTimeZone(fdf.getTimeZone()); + } + + protected void initHistoricalDBs() throws IOException { + Path rollingDBGlobPath = new Path(rollingDBPath, getName() + ".*"); + FileStatus[] statuses = lfs.globStatus(rollingDBGlobPath); + for (FileStatus status : statuses) { + String dbName = FilenameUtils.getExtension(status.getPath().toString()); + try { + Long dbStartTime = sdf.parse(dbName).getTime(); + initRollingLevelDB(dbStartTime, status.getPath()); + } catch (ParseException pe) { + LOG.warn("Failed to initialize rolling leveldb " + dbName + " for " + getName()); + } + } + } + + private void initRollingLevelDB(Long dbStartTime, Path rollingDBPath) { + if (rollingdbs.containsKey(dbStartTime)) { + return; + } + Options options = new Options(); + options.createIfMissing(true); + options.cacheSize(conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); + options.maxOpenFiles(conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES)); + options.writeBufferSize(conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE)); + LOG.info("Initializing rolling leveldb instance :" + rollingDBPath + " for start time: " + dbStartTime); + DB db = null; + try { + db = factory.open(new File(rollingDBPath.toUri().getPath()), options); + } catch (IOException ioe) { + LOG.warn("Failed to open rolling leveldb instance :" + new File(rollingDBPath.toUri().getPath()), ioe); + } + rollingdbs.put(dbStartTime, db); + String dbName = fdf.format(dbStartTime); + LOG.info("Added rolling leveldb instance " + dbName + " to " + getName()); + } + + synchronized DB getPreviousDB(DB db) { + Iterator iterator = rollingdbs.values().iterator(); + DB prev = null; + while (iterator.hasNext()) { + DB cur = iterator.next(); + if (cur == db) { + break; + } + prev = cur; + } + return prev; + } + + @VisibleForTesting + @Private + public synchronized long getStartTimeFor(DB db) { + long startTime = -1; + for (Map.Entry entry : rollingdbs.entrySet()) { + if (entry.getValue() == db) { + startTime = entry.getKey(); + } + } + return startTime; + } + + public synchronized DB getDBForStartTime(long startTime) { + // make sure we sanitize this input + startTime = Math.min(startTime, currentTimeMillis()); + + if (startTime >= getNextRollingTimeMillis()) { + roll(startTime); + } + Entry entry = rollingdbs.floorEntry(startTime); + if (entry == null) { + return null; + } + return entry.getValue(); + } + + private void roll(long startTime) { + LOG.info("Rolling new DB instance for "+ getName()); + long currentStartTime = computeCurrentCheckMillis(startTime); + setNextRollingTimeMillis(computeNextCheckMillis(currentStartTime)); + String currentRollingDBInstance = fdf.format(currentStartTime); + String currentRollingDBName = getName() + "." + currentRollingDBInstance; + Path currentRollingDBPath = new Path(rollingDBPath, currentRollingDBName); + if (getTimeToLiveEnabled()) { + scheduleOldDBsForEviction(); + } + initRollingLevelDB(currentStartTime, currentRollingDBPath); + } + + private synchronized void scheduleOldDBsForEviction() { + // keep at least time to live amount of data + long evictionThreshold = computeCurrentCheckMillis(currentTimeMillis() - getTimeToLive()); + + LOG.info("Scheduling " + getName() + " DBs older than " + fdf.format(evictionThreshold) + " for eviction"); + Iterator> iterator = rollingdbs.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + // parse this in gmt time + if (entry.getKey() < evictionThreshold) { + LOG.info("Scheduling " + getName() + " eviction for " + fdf.format(entry.getKey())); + iterator.remove(); + rollingdbsToEvict.put(entry.getKey(), entry.getValue()); + } + } + } + + public synchronized void evictOldDBs() { + LOG.info("Evicting " + getName() + " DBs scheduled for eviction"); + Iterator> iterator = rollingdbsToEvict.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + IOUtils.cleanup(LOG, entry.getValue()); + String dbName = fdf.format(entry.getKey()); + Path path = new Path(rollingDBPath, getName() + "." + dbName); + try { + LOG.info("Removing old db directory contents in " + path); + lfs.delete(path, true); + } catch (IOException ioe) { + LOG.warn("Failed to evict old db " + path, ioe); + } + iterator.remove(); + } + } + + public void stop() throws Exception { + for (DB db : rollingdbs.values()) { + IOUtils.cleanup(LOG, db); + } + IOUtils.cleanup(LOG, lfs); + } + + private long computeNextCheckMillis(long now) { + return computeCheckMillis(now, true); + } + + public long computeCurrentCheckMillis(long now) { + return computeCheckMillis(now, false); + } + + private synchronized long computeCheckMillis(long now, boolean next) { + // needs to be called synchronously due to shared Calendar + cal.setTimeInMillis(now); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + switch (rollingPeriod) { + case DAILY: { + cal.set(Calendar.HOUR_OF_DAY, 0); + cal.set(Calendar.MINUTE, 0); + if (next) { + cal.add(Calendar.DATE, 1); + } + break; + } + case HALF_DAILY: { + // round down to 12 hour interval + int hour = (cal.get(Calendar.HOUR) / 12) * 12; + cal.set(Calendar.HOUR, hour); + cal.set(Calendar.MINUTE, 0); + if (next) { + cal.add(Calendar.HOUR_OF_DAY, 12); + } + break; + } + case QUARTER_DAILY: { + // round down to 12 hour interval + int hour = (cal.get(Calendar.HOUR) / 6) * 6; + cal.set(Calendar.HOUR, hour); + cal.set(Calendar.MINUTE, 0); + if (next) { + cal.add(Calendar.HOUR_OF_DAY, 6); + } + break; + } + case HOURLY: { + cal.set(Calendar.MINUTE, 0); + if (next) { + cal.add(Calendar.HOUR_OF_DAY, 1); + } + break; + } + case MINUTELY: { + // round down to 5 minute interval + int minute = (cal.get(Calendar.MINUTE) / 5) * 5; + cal.set(Calendar.MINUTE, minute); + if (next) { + cal.add(Calendar.MINUTE, 5); + } + } + } + return cal.getTimeInMillis(); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java index e69de29..9eff5b0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java @@ -0,0 +1,1744 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.apache.commons.collections.map.LRUMap; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.*; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl; +import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder; +import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser; +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.*; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.Map.Entry; + +import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; +import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong; +import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID; +import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches; +import static org.fusesource.leveldbjni.JniDBFactory.bytes; + +/** + *

An implementation of an application timeline store backed by leveldb.

+ * + *

There are three sections of the db, the start time section, + * the entity section, and the indexed entity section.

+ * + *

The start time section is used to retrieve the unique start time for + * a given entity. Its values each contain a start time while its keys are of + * the form:

+ *
+ *   START_TIME_LOOKUP_PREFIX + entity type + entity id
+ * + *

The entity section is ordered by entity type, then entity start time + * descending, then entity ID. There are four sub-sections of the entity + * section: events, primary filters, related entities, + * and other info. The event entries have event info serialized into their + * values. The other info entries have values corresponding to the values of + * the other info name/value map for the entry (note the names are contained + * in the key). All other entries have empty values. The key structure is as + * follows:

+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     DOMAIN_ID_COLUMN
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     EVENTS_COLUMN + reveventtimestamp + eventtype
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     PRIMARY_FILTERS_COLUMN + name + value
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     OTHER_INFO_COLUMN + name
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN + relatedentity type +
+ *     relatedentity id
+ * + *

The indexed entity section contains a primary filter name and primary + * filter value as the prefix. Within a given name/value, entire entity + * entries are stored in the same format as described in the entity section + * above (below, "key" represents any one of the possible entity entry keys + * described above).

+ *
+ *   INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
+ *     key
+ */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class RollingLevelDBTimelineStore extends AbstractService + implements TimelineStore { + private static final Log LOG = LogFactory + .getLog(RollingLevelDBTimelineStore.class); + + @Private + @VisibleForTesting + static final String FILENAME = "leveldb-timeline-store"; + static final String DOMAIN = "domain-ldb"; + static final String ENTITY = "entity-ldb"; + static final String INDEX = "indexes-ldb"; + static final String STARTTIME = "starttime-ldb"; + static final String OWNER = "owner-ldb"; + + private static final byte[] DOMAIN_ENTRY_PREFIX = "d".getBytes(); + private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes(); + private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes(); + private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes(); + private static final byte[] OWNER_LOOKUP_PREFIX = "o".getBytes(); + + private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(); + private static final byte[] EVENTS_COLUMN = "e".getBytes(); + private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(); + private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(); + private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes(); + private static final byte[] INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN = + "z".getBytes(); + + + private static final byte[] DESCRIPTION_COLUMN = "d".getBytes(); + private static final byte[] OWNER_COLUMN = "o".getBytes(); + private static final byte[] READER_COLUMN = "r".getBytes(); + private static final byte[] WRITER_COLUMN = "w".getBytes(); + private static final byte[] TIMESTAMP_COLUMN = "t".getBytes(); + + private static final byte[] EMPTY_BYTES = new byte[0]; + + private static final String TIMELINE_STORE_VERSION_KEY = "timeline-store-version"; + + private static final Version CURRENT_VERSION_INFO = Version + .newInstance(1, 0); + + private static long writeBatchSize = 10000; + + @Private + @VisibleForTesting + static final FsPermission LEVELDB_DIR_UMASK = FsPermission + .createImmutable((short) 0700); + + private Map startTimeWriteCache; + private Map startTimeReadCache; + + private DB domaindb; + private RollingLevelDB entitydb; + private RollingLevelDB indexdb; + private DB starttimedb; + private DB ownerdb; + + private Thread deletionThread; + + public RollingLevelDBTimelineStore() { + super(RollingLevelDBTimelineStore.class.getName()); + } + + @Override + @SuppressWarnings("unchecked") + protected void serviceInit(Configuration conf) throws Exception { + Preconditions.checkArgument(conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_TTL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS) > 0, + "%s property value should be greater than zero", + YarnConfiguration.TIMELINE_SERVICE_TTL_MS); + Preconditions.checkArgument(conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS) > 0, + "%s property value should be greater than zero", + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS); + Preconditions.checkArgument(conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE) >= 0, + "%s property value should be greater than or equal to zero", + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE); + Preconditions.checkArgument(conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE) > 0, + " %s property value should be greater than zero", + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE); + Preconditions.checkArgument(conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE) > 0, + "%s property value should be greater than zero", + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE); + Preconditions.checkArgument(conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES) > 0, + "%s property value should be greater than zero", + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES); + Preconditions.checkArgument(conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE) > 0, + "%s property value should be greater than zero", + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE); + + Options options = new Options(); + options.createIfMissing(true); + options.cacheSize(conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); + JniDBFactory factory = new JniDBFactory(); + Path dbPath = new Path( + conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), FILENAME); + Path domainDBPath = new Path(dbPath, DOMAIN); + Path starttimeDBPath = new Path(dbPath, STARTTIME); + Path ownerDBPath = new Path(dbPath, OWNER); + FileSystem localFS = null; + try { + localFS = FileSystem.getLocal(conf); + if (!localFS.exists(dbPath)) { + if (!localFS.mkdirs(dbPath)) { + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + dbPath); + } + localFS.setPermission(dbPath, LEVELDB_DIR_UMASK); + } + if (!localFS.exists(domainDBPath)) { + if (!localFS.mkdirs(domainDBPath)) { + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + domainDBPath); + } + localFS.setPermission(domainDBPath, LEVELDB_DIR_UMASK); + } + if (!localFS.exists(starttimeDBPath)) { + if (!localFS.mkdirs(starttimeDBPath)) { + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + starttimeDBPath); + } + localFS.setPermission(starttimeDBPath, LEVELDB_DIR_UMASK); + } + if (!localFS.exists(ownerDBPath)) { + if (!localFS.mkdirs(ownerDBPath)) { + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + ownerDBPath); + } + localFS.setPermission(ownerDBPath, LEVELDB_DIR_UMASK); + } + } finally { + IOUtils.cleanup(LOG, localFS); + } + options.maxOpenFiles(conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES)); + options.writeBufferSize(conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE)); + LOG.info("Using leveldb path " + dbPath); + domaindb = factory.open(new File(domainDBPath.toString()), options); + entitydb = new RollingLevelDB(ENTITY); + entitydb.init(conf); + indexdb = new RollingLevelDB(INDEX); + indexdb.init(conf); + starttimedb = factory.open(new File(starttimeDBPath.toString()), options); + ownerdb = factory.open(new File(ownerDBPath.toString()), options); + checkVersion(); + startTimeWriteCache = + Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize( + conf))); + startTimeReadCache = + Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize( + conf))); + + if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true)) { + deletionThread = new EntityDeletionThread(conf); + deletionThread.start(); + } + + writeBatchSize = conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE); + + super.serviceInit(conf); + } + + @Override + protected void serviceStop() throws Exception { + if (deletionThread != null) { + deletionThread.interrupt(); + LOG.info("Waiting for deletion thread to complete its current action"); + try { + deletionThread.join(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for deletion thread to complete," + + " closing db now", e); + } + } + IOUtils.cleanup(LOG, domaindb); + IOUtils.cleanup(LOG, starttimedb); + IOUtils.cleanup(LOG, ownerdb); + entitydb.stop(); + indexdb.stop(); + super.serviceStop(); + } + + private static class StartAndInsertTime { + final long startTime; + final long insertTime; + + public StartAndInsertTime(long startTime, long insertTime) { + this.startTime = startTime; + this.insertTime = insertTime; + } + } + + private class EntityDeletionThread extends Thread { + private final long ttl; + private final long ttlInterval; + + public EntityDeletionThread(Configuration conf) { + ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS); + ttlInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS); + LOG.info("Starting deletion thread with ttl " + ttl + " and cycle " + + "interval " + ttlInterval); + } + + @Override + public void run() { + Thread.currentThread().setName("Leveldb Timeline Store Retention"); + while (true) { + long timestamp = System.currentTimeMillis() - ttl; + try { + discardOldEntities(timestamp); + Thread.sleep(ttlInterval); + } catch (IOException e) { + LOG.error(e); + } catch (InterruptedException e) { + LOG.info("Deletion thread received interrupt, exiting"); + break; + } + } + } + } + + @Override + public TimelineEntity getEntity(String entityId, String entityType, + EnumSet fields) throws IOException { + Long revStartTime = getStartTimeLong(entityId, entityType); + if (revStartTime == null) { + return null; + } + byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) + .add(entityType).add(writeReverseOrderedLong(revStartTime)) + .add(entityId).getBytesForLookup(); + + DBIterator iterator = null; + try { + DB db = entitydb.getDBForStartTime(revStartTime); + if (db == null) { + return null; + } + iterator = db.iterator(); + iterator.seek(prefix); + + return getEntity(entityId, entityType, revStartTime, fields, iterator, + prefix, prefix.length); + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + /** + * Read entity from a db iterator. If no information is found in the + * specified fields for this entity, return null. + */ + private static TimelineEntity getEntity(String entityId, String entityType, + Long startTime, EnumSet fields, DBIterator iterator, + byte[] prefix, int prefixlen) throws IOException { + if (fields == null) { + fields = EnumSet.allOf(Field.class); + } + + TimelineEntity entity = new TimelineEntity(); + boolean events = false; + boolean lastEvent = false; + if (fields.contains(Field.EVENTS)) { + events = true; + } else if (fields.contains(Field.LAST_EVENT_ONLY)) { + lastEvent = true; + } else { + entity.setEvents(null); + } + boolean relatedEntities = false; + if (fields.contains(Field.RELATED_ENTITIES)) { + relatedEntities = true; + } else { + entity.setRelatedEntities(null); + } + boolean primaryFilters = false; + if (fields.contains(Field.PRIMARY_FILTERS)) { + primaryFilters = true; + } else { + entity.setPrimaryFilters(null); + } + boolean otherInfo = false; + if (fields.contains(Field.OTHER_INFO)) { + otherInfo = true; + } else { + entity.setOtherInfo(null); + } + + // iterate through the entity's entry, parsing information if it is part + // of a requested field + for (; iterator.hasNext(); iterator.next()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefixlen, key)) { + break; + } + if (key.length == prefixlen) { + continue; + } + if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) { + if (primaryFilters) { + addPrimaryFilter(entity, key, + prefixlen + PRIMARY_FILTERS_COLUMN.length); + } + } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) { + if (otherInfo) { + entity.addOtherInfo(parseRemainingKey(key, + prefixlen + OTHER_INFO_COLUMN.length), + GenericObjectMapper.read(iterator.peekNext().getValue())); + } + } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) { + if (relatedEntities) { + addRelatedEntity(entity, key, + prefixlen + RELATED_ENTITIES_COLUMN.length); + } + } else if (key[prefixlen] == EVENTS_COLUMN[0]) { + if (events || (lastEvent && + entity.getEvents().size() == 0)) { + TimelineEvent event = getEntityEvent(null, key, prefixlen + + EVENTS_COLUMN.length, iterator.peekNext().getValue()); + if (event != null) { + entity.addEvent(event); + } + } + } else if (key[prefixlen] == DOMAIN_ID_COLUMN[0]) { + byte[] v = iterator.peekNext().getValue(); + String domainId = new String(v); + entity.setDomainId(domainId); + } else { + if (key[prefixlen] != + INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) { + LOG.warn(String.format("Found unexpected column for entity %s of " + + "type %s (0x%02x)", entityId, entityType, key[prefixlen])); + } + } + } + + entity.setEntityId(entityId); + entity.setEntityType(entityType); + entity.setStartTime(startTime); + + return entity; + } + + @Override + public TimelineEvents getEntityTimelines(String entityType, + SortedSet entityIds, Long limit, Long windowStart, + Long windowEnd, Set eventType) throws IOException { + TimelineEvents events = new TimelineEvents(); + if (entityIds == null || entityIds.isEmpty()) { + return events; + } + // create a lexicographically-ordered map from start time to entities + Map> startTimeMap = new TreeMap>(new Comparator() { + @Override + public int compare(byte[] o1, byte[] o2) { + return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, + o2.length); + } + }); + DBIterator iterator = null; + try { + // look up start times for the specified entities + // skip entities with no start time + for (String entityId : entityIds) { + byte[] startTime = getStartTime(entityId, entityType); + if (startTime != null) { + List entities = startTimeMap.get(startTime); + if (entities == null) { + entities = new ArrayList(); + startTimeMap.put(startTime, entities); + } + entities.add(new EntityIdentifier(entityId, entityType)); + } + } + for (Entry> entry : + startTimeMap.entrySet()) { + // look up the events matching the given parameters (limit, + // start time, end time, event types) for entities whose start times + // were found and add the entities to the return list + byte[] revStartTime = entry.getKey(); + for (EntityIdentifier entityIdentifier : entry.getValue()) { + EventsOfOneEntity entity = new EventsOfOneEntity(); + entity.setEntityId(entityIdentifier.getId()); + entity.setEntityType(entityType); + events.addEvent(entity); + KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) + .add(entityType).add(revStartTime).add(entityIdentifier.getId()) + .add(EVENTS_COLUMN); + byte[] prefix = kb.getBytesForLookup(); + if (windowEnd == null) { + windowEnd = Long.MAX_VALUE; + } + byte[] revts = writeReverseOrderedLong(windowEnd); + kb.add(revts); + byte[] first = kb.getBytesForLookup(); + byte[] last = null; + if (windowStart != null) { + last = KeyBuilder.newInstance().add(prefix) + .add(writeReverseOrderedLong(windowStart)).getBytesForLookup(); + } + if (limit == null) { + limit = DEFAULT_LIMIT; + } + DB db = entitydb.getDBForStartTime(readReverseOrderedLong(revStartTime, 0)); + if (db == null) { + continue; + } + iterator = db.iterator(); + for (iterator.seek(first); entity.getEvents().size() < limit && + iterator.hasNext(); iterator.next()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key) || (last != null && + WritableComparator.compareBytes(key, 0, key.length, last, 0, + last.length) > 0)) { + break; + } + TimelineEvent event = getEntityEvent(eventType, key, prefix.length, + iterator.peekNext().getValue()); + if (event != null) { + entity.addEvent(event); + } + } + } + } + } finally { + IOUtils.cleanup(LOG, iterator); + } + return events; + } + + @Override + public TimelineEntities getEntities(String entityType, + Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, + NameValuePair primaryFilter, Collection secondaryFilters, + EnumSet fields, CheckAcl checkAcl) throws IOException { + if (primaryFilter == null) { + // if no primary filter is specified, prefix the lookup with + // ENTITY_ENTRY_PREFIX + return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit, + windowStart, windowEnd, fromId, fromTs, secondaryFilters, + fields, checkAcl, false); + } else { + // if a primary filter is specified, prefix the lookup with + // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue + + // ENTITY_ENTRY_PREFIX + byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX) + .add(primaryFilter.getName()) + .add(GenericObjectMapper.write(primaryFilter.getValue()), true) + .add(ENTITY_ENTRY_PREFIX).getBytesForLookup(); + return getEntityByTime(base, entityType, limit, windowStart, windowEnd, + fromId, fromTs, secondaryFilters, fields, checkAcl, true); + } + } + + /** + * Retrieves a list of entities satisfying given parameters. + * + * @param base A byte array prefix for the lookup + * @param entityType The type of the entity + * @param limit A limit on the number of entities to return + * @param starttime The earliest entity start time to retrieve (exclusive) + * @param endtime The latest entity start time to retrieve (inclusive) + * @param fromId Retrieve entities starting with this entity + * @param fromTs Ignore entities with insert timestamp later than this ts + * @param secondaryFilters Filter pairs that the entities should match + * @param fields The set of fields to retrieve + * @param usingPrimaryFilter true if this query is using a primary filter + * @return A list of entities + * @throws IOException + */ + private TimelineEntities getEntityByTime(byte[] base, + String entityType, Long limit, Long starttime, Long endtime, + String fromId, Long fromTs, Collection secondaryFilters, + EnumSet fields, CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException { + DBIterator iterator = null; + try { + KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType); + // only db keys matching the prefix (base + entity type) will be parsed + byte[] prefix = kb.getBytesForLookup(); + if (endtime == null) { + // if end time is null, place no restriction on end time + endtime = Long.MAX_VALUE; + } + // construct a first key that will be seeked to using end time or fromId + long firstStartTime = Long.MAX_VALUE; + byte[] first = null; + if (fromId != null) { + Long fromIdStartTime = getStartTimeLong(fromId, entityType); + if (fromIdStartTime == null) { + // no start time for provided id, so return empty entities + return new TimelineEntities(); + } + if (fromIdStartTime <= endtime) { + // if provided id's start time falls before the end of the window, + // use it to construct the seek key + firstStartTime = fromIdStartTime; + first = kb.add(writeReverseOrderedLong(fromIdStartTime)) + .add(fromId).getBytesForLookup(); + } + } + // if seek key wasn't constructed using fromId, construct it using end ts + if (first == null) { + firstStartTime = endtime; + first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup(); + } + byte[] last = null; + if (starttime != null) { + // if start time is not null, set a last key that will not be + // iterated past + last = KeyBuilder.newInstance().add(base).add(entityType) + .add(writeReverseOrderedLong(starttime)).getBytesForLookup(); + } + if (limit == null) { + // if limit is not specified, use the default + limit = DEFAULT_LIMIT; + } + + TimelineEntities entities = new TimelineEntities(); + RollingLevelDB rollingdb = null; + if (usingPrimaryFilter) { + rollingdb = indexdb; + } else { + rollingdb = entitydb; + } + + DB db = rollingdb.getDBForStartTime(firstStartTime); + while (entities.getEntities().size() < limit && db != null) { + iterator = db.iterator(); + iterator.seek(first); + + // iterate until one of the following conditions is met: limit is + // reached, there are no more keys, the key prefix no longer matches, + // or a start time has been specified and reached/exceeded + while (entities.getEntities().size() < limit && iterator.hasNext()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key) + || (last != null && WritableComparator.compareBytes(key, 0, + key.length, last, 0, last.length) > 0)) { + break; + } + // read the start time and entity id from the current key + KeyParser kp = new KeyParser(key, prefix.length); + Long startTime = kp.getNextLong(); + String entityId = kp.getNextString(); + + if (fromTs != null) { + long insertTime = readReverseOrderedLong(iterator.peekNext() + .getValue(), 0); + if (insertTime > fromTs) { + byte[] firstKey = key; + while (iterator.hasNext()) { + key = iterator.peekNext().getKey(); + iterator.next(); + if (!prefixMatches(firstKey, kp.getOffset(), key)) { + break; + } + } + continue; + } + } + + // parse the entity that owns this key, iterating over all keys for + // the entity + TimelineEntity entity = null; + if (usingPrimaryFilter) { + entity = getEntity(entityId, entityType, fields); + iterator.next(); + } else { + entity = getEntity(entityId, entityType, startTime, fields, iterator, key, + kp.getOffset()); + } + // determine if the retrieved entity matches the provided secondary + // filters, and if so add it to the list of entities to return + boolean filterPassed = true; + if (secondaryFilters != null) { + for (NameValuePair filter : secondaryFilters) { + Object v = entity.getOtherInfo().get(filter.getName()); + if (v == null) { + Set vs = entity.getPrimaryFilters().get( + filter.getName()); + if (vs == null || !vs.contains(filter.getValue())) { + filterPassed = false; + break; + } + } else if (!v.equals(filter.getValue())) { + filterPassed = false; + break; + } + } + } + if (filterPassed) { + if (entity.getDomainId() == null) { + entity.setDomainId(DEFAULT_DOMAIN_ID); + } + if (checkAcl == null || checkAcl.check(entity)) { + entities.addEntity(entity); + } + } + } + db = rollingdb.getPreviousDB(db); + } + return entities; + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + /** + * Put a single entity. If there is an error, add a TimelinePutError to the + * given response. + * @param entityUpdates a map containing all the scheduled writes for this put to the entity db + * @param indexUpdates a map containing all the scheduled writes for this put to the index db + */ + private long putEntities(TreeMap entityUpdates, TreeMap indexUpdates, TimelineEntity entity, TimelinePutResponse response) { + + long putCount = 0; + List relatedEntitiesWithoutStartTimes = + new ArrayList(); + byte[] revStartTime = null; + Map> primaryFilters = null; + try { + List events = entity.getEvents(); + // look up the start time for the entity + StartAndInsertTime startAndInsertTime = getAndSetStartTime( + entity.getEntityId(), entity.getEntityType(), + entity.getStartTime(), events); + if (startAndInsertTime == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.NO_START_TIME); + response.addError(error); + return putCount; + } + revStartTime = writeReverseOrderedLong(startAndInsertTime.startTime); + long roundedStartTime = entitydb.computeCurrentCheckMillis(startAndInsertTime.startTime); + RollingWriteBatch rollingWriteBatch = entityUpdates.get(roundedStartTime); + if (rollingWriteBatch == null) { + DB db = entitydb.getDBForStartTime(startAndInsertTime.startTime); + if (db != null) { + WriteBatch writeBatch = db.createWriteBatch(); + rollingWriteBatch = new RollingWriteBatch(db, writeBatch); + entityUpdates.put(roundedStartTime, rollingWriteBatch); + } + } + if (rollingWriteBatch == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.EXPIRED_ENTITY); + response.addError(error); + return putCount; + } + WriteBatch writeBatch = rollingWriteBatch.getWriteBatch(); + + // write entity marker + byte[] markerKey = createEntityMarkerKey(entity.getEntityId(), + entity.getEntityType(), revStartTime); + byte[] markerValue = writeReverseOrderedLong(startAndInsertTime + .insertTime); + writeBatch.put(markerKey, markerValue); + ++putCount; + + // write domain id entry + byte[] domainkey = createDomainIdKey(entity.getEntityId(), + entity.getEntityType(), revStartTime); + if (StringUtils.isEmpty(entity.getDomainId())) { + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.NO_DOMAIN); + response.addError(error); + return putCount; + } + + writeBatch.put(domainkey, entity.getDomainId().getBytes()); + ++putCount; + + // write event entries + if (events != null) { + for (TimelineEvent event : events) { + byte[] revts = writeReverseOrderedLong(event.getTimestamp()); + byte[] key = createEntityEventKey(entity.getEntityId(), + entity.getEntityType(), revStartTime, revts, + event.getEventType()); + byte[] value = GenericObjectMapper.write(event.getEventInfo()); + writeBatch.put(key, value); + ++putCount; + } + } + + // write primary filter entries + primaryFilters = entity.getPrimaryFilters(); + if (primaryFilters != null) { + for (Entry> primaryFilter : + primaryFilters.entrySet()) { + for (Object primaryFilterValue : primaryFilter.getValue()) { + byte[] key = createPrimaryFilterKey(entity.getEntityId(), + entity.getEntityType(), revStartTime, + primaryFilter.getKey(), primaryFilterValue); + writeBatch.put(key, EMPTY_BYTES); + ++putCount; + } + } + } + + // write other info entries + Map otherInfo = entity.getOtherInfo(); + if (otherInfo != null) { + for (Entry i : otherInfo.entrySet()) { + byte[] key = createOtherInfoKey(entity.getEntityId(), + entity.getEntityType(), revStartTime, i.getKey()); + byte[] value = GenericObjectMapper.write(i.getValue()); + writeBatch.put(key, value); + ++putCount; + } + } + + // write related entity entries + Map> relatedEntities = entity.getRelatedEntities(); + if (relatedEntities != null) { + for (Entry> relatedEntityList : + relatedEntities.entrySet()) { + String relatedEntityType = relatedEntityList.getKey(); + for (String relatedEntityId : relatedEntityList.getValue()) { + // invisible "reverse" entries (entity -> related entity) + byte[] key = createReverseRelatedEntityKey(entity.getEntityId(), + entity.getEntityType(), revStartTime, relatedEntityId, + relatedEntityType); + writeBatch.put(key, EMPTY_BYTES); + ++putCount; + // look up start time of related entity + Long relatedStartTimeLong = getStartTimeLong(relatedEntityId, relatedEntityType); + // delay writing the related entity if no start time is found + if (relatedStartTimeLong == null) { + relatedEntitiesWithoutStartTimes.add(new EntityIdentifier(relatedEntityId, relatedEntityType)); + continue; + } + + byte[] relatedEntityStartTime = writeReverseOrderedLong(relatedStartTimeLong); + long relatedRoundedStartTime = entitydb.computeCurrentCheckMillis(relatedStartTimeLong); + RollingWriteBatch relatedRollingWriteBatch = entityUpdates.get(relatedRoundedStartTime); + if (relatedRollingWriteBatch == null) { + DB db = entitydb.getDBForStartTime(relatedStartTimeLong); + if (db != null) { + WriteBatch relatedWriteBatch = db.createWriteBatch(); + relatedRollingWriteBatch = new RollingWriteBatch(db, relatedWriteBatch); + entityUpdates.put(relatedRoundedStartTime, relatedRollingWriteBatch); + } + } + if (relatedRollingWriteBatch == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.EXPIRED_ENTITY); + response.addError(error); + continue; + } + // This is the existing entity + byte[] domainIdBytes = relatedRollingWriteBatch.getDB().get(createDomainIdKey( + relatedEntityId, relatedEntityType, relatedEntityStartTime)); + // The timeline data created by the server before 2.6 won't have + // the domain field. We assume this timeline data is in the + // default timeline domain. + String domainId = null; + if (domainIdBytes == null) { + domainId = TimelineDataManager.DEFAULT_DOMAIN_ID; + } else { + domainId = new String(domainIdBytes); + } + if (!domainId.equals(entity.getDomainId())) { + // in this case the entity will be put, but the relation will be + // ignored + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION); + response.addError(error); + continue; + } + // write "forward" entry (related entity -> entity) + key = createRelatedEntityKey(relatedEntityId, relatedEntityType, + relatedEntityStartTime, entity.getEntityId(), + entity.getEntityType()); + WriteBatch relatedWriteBatch = relatedRollingWriteBatch.getWriteBatch(); + relatedWriteBatch.put(key, EMPTY_BYTES); + ++putCount; + } + } + } + RollingWriteBatch indexRollingWriteBatch = indexUpdates.get(roundedStartTime); + if (indexRollingWriteBatch == null) { + DB db = indexdb.getDBForStartTime(startAndInsertTime.startTime); + if (db != null) { + WriteBatch indexWriteBatch = db.createWriteBatch(); + indexRollingWriteBatch = new RollingWriteBatch(db, indexWriteBatch); + indexUpdates.put(roundedStartTime, indexRollingWriteBatch); + } + } + if (indexRollingWriteBatch == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.EXPIRED_ENTITY); + response.addError(error); + return putCount; + } + WriteBatch indexWriteBatch = indexRollingWriteBatch.getWriteBatch(); + putCount += writePrimaryFilterEntries(indexWriteBatch, primaryFilters, markerKey, + markerValue); + } catch (IOException e) { + LOG.error("Error putting entity " + entity.getEntityId() + + " of type " + entity.getEntityType(), e); + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.IO_EXCEPTION); + response.addError(error); + } + + for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) { + try { + StartAndInsertTime relatedEntityStartAndInsertTime = + getAndSetStartTime(relatedEntity.getId(), relatedEntity.getType(), + readReverseOrderedLong(revStartTime, 0), null); + if (relatedEntityStartAndInsertTime == null) { + throw new IOException("Error setting start time for related entity"); + } + long relatedStartTimeLong = relatedEntityStartAndInsertTime.startTime; + long relatedRoundedStartTime = entitydb.computeCurrentCheckMillis(relatedStartTimeLong); + RollingWriteBatch relatedRollingWriteBatch = entityUpdates.get(relatedRoundedStartTime); + if (relatedRollingWriteBatch == null) { + DB db = entitydb.getDBForStartTime(relatedStartTimeLong); + if (db != null) { + WriteBatch relatedWriteBatch = db.createWriteBatch(); + relatedRollingWriteBatch = new RollingWriteBatch(db, relatedWriteBatch); + entityUpdates.put(relatedRoundedStartTime, relatedRollingWriteBatch); + } + } + if (relatedRollingWriteBatch == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.EXPIRED_ENTITY); + response.addError(error); + continue; + } + WriteBatch relatedWriteBatch = relatedRollingWriteBatch.getWriteBatch(); + byte[] relatedEntityStartTime = writeReverseOrderedLong( + relatedEntityStartAndInsertTime.startTime); + // This is the new entity, the domain should be the same + byte[] key = createDomainIdKey(relatedEntity.getId(), + relatedEntity.getType(), relatedEntityStartTime); + relatedWriteBatch.put(key, entity.getDomainId().getBytes()); + ++putCount; + relatedWriteBatch.put(createRelatedEntityKey(relatedEntity.getId(), + relatedEntity.getType(), relatedEntityStartTime, + entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES); + ++putCount; + relatedWriteBatch.put(createEntityMarkerKey(relatedEntity.getId(), + relatedEntity.getType(), relatedEntityStartTime), + writeReverseOrderedLong(relatedEntityStartAndInsertTime + .insertTime)); + ++putCount; + } catch (IOException e) { + LOG.error("Error putting related entity " + relatedEntity.getId() + + " of type " + relatedEntity.getType() + " for entity " + + entity.getEntityId() + " of type " + entity.getEntityType(), e); + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.IO_EXCEPTION); + response.addError(error); + } + } + + return putCount; + } + + /** + * For a given key / value pair that has been written to the db, + * write additional entries to the db for each primary filter. + */ + private static long writePrimaryFilterEntries(WriteBatch writeBatch, + Map> primaryFilters, byte[] key, byte[] value) + throws IOException { + long putCount = 0; + if (primaryFilters != null) { + for (Entry> pf : primaryFilters.entrySet()) { + for (Object pfval : pf.getValue()) { + writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval, key), value); + ++putCount; + } + } + } + return putCount; + } + + @Override + public TimelinePutResponse put(TimelineEntities entities) { + if (LOG.isDebugEnabled()) { + LOG.debug("Starting put"); + } + TimelinePutResponse response = new TimelinePutResponse(); + TreeMap entityUpdates = new TreeMap(); + TreeMap indexUpdates = new TreeMap(); + + long entityCount = 0; + long indexCount = 0; + + try { + + for (TimelineEntity entity : entities.getEntities()) { + entityCount += putEntities(entityUpdates, indexUpdates, entity, response); + } + + for (RollingWriteBatch entityUpdate : entityUpdates.values()) { + entityUpdate.write(); + } + + for (RollingWriteBatch indexUpdate : indexUpdates.values()) { + indexUpdate.write(); + } + + } finally { + + for (RollingWriteBatch entityRollingWriteBatch : entityUpdates.values()) { + entityRollingWriteBatch.close(); + } + for (RollingWriteBatch indexRollingWriteBatch : indexUpdates.values()) { + indexRollingWriteBatch.close(); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Put " + entityCount + " new leveldb entity entries and " + + indexCount + " new leveldb index entries from " + + entities.getEntities().size() + " timeline entities"); + } + return response; + } + + /** + * Get the unique start time for a given entity as a byte array that sorts + * the timestamps in reverse order (see {@link + * GenericObjectMapper#writeReverseOrderedLong(long)}). + * + * @param entityId The id of the entity + * @param entityType The type of the entity + * @return A byte array, null if not found + * @throws IOException + */ + private byte[] getStartTime(String entityId, String entityType) + throws IOException { + Long l = getStartTimeLong(entityId, entityType); + return l == null ? null : writeReverseOrderedLong(l); + } + + /** + * Get the unique start time for a given entity as a Long. + * + * @param entityId The id of the entity + * @param entityType The type of the entity + * @return A Long, null if not found + * @throws IOException + */ + private Long getStartTimeLong(String entityId, String entityType) + throws IOException { + EntityIdentifier entity = new EntityIdentifier(entityId, entityType); + // start time is not provided, so try to look it up + if (startTimeReadCache.containsKey(entity)) { + // found the start time in the cache + return startTimeReadCache.get(entity); + } else { + // try to look up the start time in the db + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + byte[] v = starttimedb.get(b); + if (v == null) { + // did not find the start time in the db + return null; + } else { + // found the start time in the db + Long l = readReverseOrderedLong(v, 0); + startTimeReadCache.put(entity, l); + return l; + } + } + } + + /** + * Get the unique start time for a given entity as a byte array that sorts + * the timestamps in reverse order (see {@link + * GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time + * doesn't exist, set it based on the information provided. Should only be + * called when a lock has been obtained on the entity. + * + * @param entityId The id of the entity + * @param entityType The type of the entity + * @param startTime The start time of the entity, or null + * @param events A list of events for the entity, or null + * @return A StartAndInsertTime + * @throws IOException + */ + private StartAndInsertTime getAndSetStartTime(String entityId, + String entityType, Long startTime, List events) + throws IOException { + EntityIdentifier entity = new EntityIdentifier(entityId, entityType); + StartAndInsertTime time = startTimeWriteCache.get(entity); + if (time != null) { + // return the value in the cache + return time; + } + if (startTime == null && events != null) { + // calculate best guess start time based on lowest event time + startTime = Long.MAX_VALUE; + for (TimelineEvent e : events) { + if (e.getTimestamp() < startTime) { + startTime = e.getTimestamp(); + } + } + } + // check the provided start time matches the db + return checkStartTimeInDb(entity, startTime); + } + + /** + * Checks db for start time and returns it if it exists. If it doesn't + * exist, writes the suggested start time (if it is not null). This is + * only called when the start time is not found in the cache, + * so it adds it back into the cache if it is found. Should only be called + * when a lock has been obtained on the entity. + */ + private StartAndInsertTime checkStartTimeInDb(EntityIdentifier entity, + Long suggestedStartTime) throws IOException { + StartAndInsertTime startAndInsertTime = null; + // create lookup key for start time + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + // retrieve value for key + byte[] v = starttimedb.get(b); + if (v == null) { + // start time doesn't exist in db + if (suggestedStartTime == null) { + return null; + } + startAndInsertTime = new StartAndInsertTime(suggestedStartTime, + System.currentTimeMillis()); + + // write suggested start time + v = new byte[16]; + writeReverseOrderedLong(suggestedStartTime, v, 0); + writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8); + starttimedb.put(b, v); + } else { + // found start time in db, so ignore suggested start time + startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0), + readReverseOrderedLong(v, 8)); + } + startTimeWriteCache.put(entity, startAndInsertTime); + startTimeReadCache.put(entity, startAndInsertTime.startTime); + return startAndInsertTime; + } + + /** + * Creates a key for looking up the start time of a given entity, + * of the form START_TIME_LOOKUP_PREFIX + entity type + entity id. + */ + private static byte[] createStartTimeLookupKey(String entityId, + String entityType) throws IOException { + return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX) + .add(entityType).add(entityId).getBytes(); + } + + /** + * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type + + * revstarttime + entity id. + */ + private static byte[] createEntityMarkerKey(String entityId, + String entityType, byte[] revStartTime) throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) + .add(entityType).add(revStartTime).add(entityId).getBytesForLookup(); + } + + /** + * Creates an index entry for the given key of the form + * INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key. + */ + private static byte[] addPrimaryFilterToKey(String primaryFilterName, + Object primaryFilterValue, byte[] key) throws IOException { + return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX) + .add(primaryFilterName) + .add(GenericObjectMapper.write(primaryFilterValue), true).add(key) + .getBytes(); + } + + /** + * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entity type + + * revstarttime + entity id + EVENTS_COLUMN + reveventtimestamp + event type. + */ + private static byte[] createEntityEventKey(String entityId, + String entityType, byte[] revStartTime, byte[] revEventTimestamp, + String eventType) throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) + .add(entityType).add(revStartTime).add(entityId).add(EVENTS_COLUMN) + .add(revEventTimestamp).add(eventType).getBytes(); + } + + /** + * Creates an event object from the given key, offset, and value. If the + * event type is not contained in the specified set of event types, + * returns null. + */ + private static TimelineEvent getEntityEvent(Set eventTypes, + byte[] key, int offset, byte[] value) throws IOException { + KeyParser kp = new KeyParser(key, offset); + long ts = kp.getNextLong(); + String tstype = kp.getNextString(); + if (eventTypes == null || eventTypes.contains(tstype)) { + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(ts); + event.setEventType(tstype); + Object o = GenericObjectMapper.read(value); + if (o == null) { + event.setEventInfo(null); + } else if (o instanceof Map) { + @SuppressWarnings("unchecked") + Map m = (Map) o; + event.setEventInfo(m); + } else { + throw new IOException("Couldn't deserialize event info map"); + } + return event; + } + return null; + } + + /** + * Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX + + * entity type + revstarttime + entity id + PRIMARY_FILTERS_COLUMN + name + + * value. + */ + private static byte[] createPrimaryFilterKey(String entityId, + String entityType, byte[] revStartTime, String name, Object value) + throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) + .add(revStartTime).add(entityId).add(PRIMARY_FILTERS_COLUMN).add(name) + .add(GenericObjectMapper.write(value)).getBytes(); + } + + /** + * Parses the primary filter from the given key at the given offset and + * adds it to the given entity. + */ + private static void addPrimaryFilter(TimelineEntity entity, byte[] key, + int offset) throws IOException { + KeyParser kp = new KeyParser(key, offset); + String name = kp.getNextString(); + Object value = GenericObjectMapper.read(key, kp.getOffset()); + entity.addPrimaryFilter(name, value); + } + + /** + * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entity type + + * revstarttime + entity id + OTHER_INFO_COLUMN + name. + */ + private static byte[] createOtherInfoKey(String entityId, String entityType, + byte[] revStartTime, String name) throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) + .add(revStartTime).add(entityId).add(OTHER_INFO_COLUMN).add(name) + .getBytes(); + } + + /** + * Creates a string representation of the byte array from the given offset + * to the end of the array (for parsing other info keys). + */ + private static String parseRemainingKey(byte[] b, int offset) { + return new String(b, offset, b.length - offset); + } + + /** + * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX + + * entity type + revstarttime + entity id + RELATED_ENTITIES_COLUMN + + * relatedentity type + relatedentity id. + */ + private static byte[] createRelatedEntityKey(String entityId, + String entityType, byte[] revStartTime, String relatedEntityId, + String relatedEntityType) throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) + .add(revStartTime).add(entityId).add(RELATED_ENTITIES_COLUMN) + .add(relatedEntityType).add(relatedEntityId).getBytes(); + } + + /** + * Parses the related entity from the given key at the given offset and + * adds it to the given entity. + */ + private static void addRelatedEntity(TimelineEntity entity, byte[] key, + int offset) throws IOException { + KeyParser kp = new KeyParser(key, offset); + String type = kp.getNextString(); + String id = kp.getNextString(); + entity.addRelatedEntity(type, id); + } + + /** + * Creates a reverse related entity key, serializing ENTITY_ENTRY_PREFIX + + * entity type + revstarttime + entity id + + * INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN + + * relatedentity type + relatedentity id. + */ + private static byte[] createReverseRelatedEntityKey(String entityId, + String entityType, byte[] revStartTime, String relatedEntityId, + String relatedEntityType) throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) + .add(revStartTime).add(entityId) + .add(INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN) + .add(relatedEntityType).add(relatedEntityId).getBytes(); + } + + /** + * Creates a domain id key, serializing ENTITY_ENTRY_PREFIX + + * entity type + revstarttime + entity id + DOMAIN_ID_COLUMN. + */ + private static byte[] createDomainIdKey(String entityId, + String entityType, byte[] revStartTime) throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) + .add(revStartTime).add(entityId).add(DOMAIN_ID_COLUMN).getBytes(); + } + /** + * Clears the cache to test reloading start times from leveldb (only for + * testing). + */ + @VisibleForTesting + void clearStartTimeCache() { + startTimeWriteCache.clear(); + startTimeReadCache.clear(); + } + + @VisibleForTesting + static int getStartTimeReadCacheSize(Configuration conf) { + return conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + YarnConfiguration. + DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE); + } + + @VisibleForTesting + static int getStartTimeWriteCacheSize(Configuration conf) { + return conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + YarnConfiguration. + DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE); + } + + @VisibleForTesting + long evictOldStartTimes(byte[] reverseTimestamp) + throws IOException { + LOG.info("Searching for old start times to evict"); + + long minStartTime = readReverseOrderedLong(reverseTimestamp, 0); + long batchSize = 0; + long totalCount = 0; + long startTimesCount = 0; + + WriteBatch writeBatch = null; + DBIterator iterator = null; + + try { + writeBatch = starttimedb.createWriteBatch(); + ReadOptions readOptions = new ReadOptions(); + readOptions.fillCache(false); + iterator = starttimedb.iterator(readOptions); + // seek to the first start time entry + iterator.seek(START_TIME_LOOKUP_PREFIX); + + // evaluate each start time entry to see if it needs to be evicted or not + while (iterator.hasNext()) { + Map.Entry current = iterator.next(); + byte[] entityKey = current.getKey(); + if (!prefixMatches(START_TIME_LOOKUP_PREFIX, START_TIME_LOOKUP_PREFIX.length, entityKey)) { + break; + } + byte[] entityValue = current.getValue(); + long startTime = readReverseOrderedLong(entityValue, 0); + if (startTime < minStartTime) { + ++batchSize; + ++startTimesCount; + writeBatch.delete(entityKey); + + // a large delete will hold the lock for too long + if (batchSize >= writeBatchSize) { + LOG.info("Preparing to delete a batch of " + batchSize + " old start times"); + starttimedb.write(writeBatch); + LOG.info("Deleted batch of " + batchSize + ". Total start times deleted so far this cycle: " + startTimesCount); + IOUtils.cleanup(LOG, writeBatch); + writeBatch = starttimedb.createWriteBatch(); + batchSize = 0; + } + } + ++totalCount; + } + LOG.info("Preparing to delete " + startTimesCount + "/" + totalCount + " old start time entities"); + starttimedb.write(writeBatch); + LOG.info("Deleted " + startTimesCount + "/" + totalCount + " old start time entities"); + } finally { + IOUtils.cleanup(LOG, writeBatch); + IOUtils.cleanup(LOG, iterator); + } + return startTimesCount; + } + + /** + * Discards entities with start timestamp less than or equal to the given + * timestamp. + */ + @VisibleForTesting + void discardOldEntities(long timestamp) + throws IOException, InterruptedException { + byte[] reverseTimestamp = writeReverseOrderedLong(timestamp); + long totalCount = 0; + long t1 = System.currentTimeMillis(); + try { + totalCount += evictOldStartTimes(reverseTimestamp); + indexdb.evictOldDBs(); + entitydb.evictOldDBs(); + } finally { + long t2 = System.currentTimeMillis(); + LOG.info("Discarded " + totalCount + " entities for timestamp " + + timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds"); + } + } + + Version loadVersion() throws IOException { + byte[] data = starttimedb.get(bytes(TIMELINE_STORE_VERSION_KEY)); + // if version is not stored previously, treat it as 1.0. + if (data == null || data.length == 0) { + return Version.newInstance(1, 0); + } + Version version = + new VersionPBImpl(VersionProto.parseFrom(data)); + return version; + } + + // Only used for test + @VisibleForTesting + void storeVersion(Version state) throws IOException { + dbStoreVersion(state); + } + + private void dbStoreVersion(Version state) throws IOException { + String key = TIMELINE_STORE_VERSION_KEY; + byte[] data = + ((VersionPBImpl) state).getProto().toByteArray(); + try { + starttimedb.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e); + } + } + + Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + /** + * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. + * 2) Any incompatible change of TS-store is a major upgrade, and any + * compatible change of TS-store is a minor upgrade. + * 3) Within a minor upgrade, say 1.1 to 1.2: + * overwrite the version info and proceed as normal. + * 4) Within a major upgrade, say 1.2 to 2.0: + * throw exception and indicate user to use a separate upgrade tool to + * upgrade timeline store or remove incompatible old state. + */ + private void checkVersion() throws IOException { + Version loadedVersion = loadVersion(); + LOG.info("Loaded timeline store version info " + loadedVersion); + if (loadedVersion.equals(getCurrentVersion())) { + return; + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing timeline store version info " + getCurrentVersion()); + dbStoreVersion(CURRENT_VERSION_INFO); + } else { + String incompatibleMessage = + "Incompatible version for timeline store: expecting version " + + getCurrentVersion() + ", but loading version " + loadedVersion; + LOG.fatal(incompatibleMessage); + throw new IOException(incompatibleMessage); + } + } + + //TODO: make data retention work with the domain data as well + @Override + public void put(TimelineDomain domain) throws IOException { + WriteBatch domainWriteBatch = null; + WriteBatch ownerWriteBatch = null; + try { + domainWriteBatch = domaindb.createWriteBatch(); + ownerWriteBatch = ownerdb.createWriteBatch(); + if (domain.getId() == null || domain.getId().length() == 0) { + throw new IllegalArgumentException("Domain doesn't have an ID"); + } + if (domain.getOwner() == null || domain.getOwner().length() == 0) { + throw new IllegalArgumentException("Domain doesn't have an owner."); + } + + // Write description + byte[] domainEntryKey = createDomainEntryKey( + domain.getId(), DESCRIPTION_COLUMN); + byte[] ownerLookupEntryKey = createOwnerLookupKey( + domain.getOwner(), domain.getId(), DESCRIPTION_COLUMN); + if (domain.getDescription() != null) { + domainWriteBatch.put(domainEntryKey, domain.getDescription().getBytes()); + ownerWriteBatch.put(ownerLookupEntryKey, domain.getDescription().getBytes()); + } else { + domainWriteBatch.put(domainEntryKey, EMPTY_BYTES); + ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES); + } + + // Write owner + domainEntryKey = createDomainEntryKey(domain.getId(), OWNER_COLUMN); + ownerLookupEntryKey = createOwnerLookupKey( + domain.getOwner(), domain.getId(), OWNER_COLUMN); + // Null check for owner is done before + domainWriteBatch.put(domainEntryKey, domain.getOwner().getBytes()); + ownerWriteBatch.put(ownerLookupEntryKey, domain.getOwner().getBytes()); + + // Write readers + domainEntryKey = createDomainEntryKey(domain.getId(), READER_COLUMN); + ownerLookupEntryKey = createOwnerLookupKey( + domain.getOwner(), domain.getId(), READER_COLUMN); + if (domain.getReaders() != null && domain.getReaders().length() > 0) { + domainWriteBatch.put(domainEntryKey, domain.getReaders().getBytes()); + ownerWriteBatch.put(ownerLookupEntryKey, domain.getReaders().getBytes()); + } else { + domainWriteBatch.put(domainEntryKey, EMPTY_BYTES); + ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES); + } + + // Write writers + domainEntryKey = createDomainEntryKey(domain.getId(), WRITER_COLUMN); + ownerLookupEntryKey = createOwnerLookupKey( + domain.getOwner(), domain.getId(), WRITER_COLUMN); + if (domain.getWriters() != null && domain.getWriters().length() > 0) { + domainWriteBatch.put(domainEntryKey, domain.getWriters().getBytes()); + ownerWriteBatch.put(ownerLookupEntryKey, domain.getWriters().getBytes()); + } else { + domainWriteBatch.put(domainEntryKey, EMPTY_BYTES); + ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES); + } + + // Write creation time and modification time + // We put both timestamps together because they are always retrieved + // together, and store them in the same way as we did for the entity's + // start time and insert time. + domainEntryKey = createDomainEntryKey(domain.getId(), TIMESTAMP_COLUMN); + ownerLookupEntryKey = createOwnerLookupKey( + domain.getOwner(), domain.getId(), TIMESTAMP_COLUMN); + long currentTimestamp = System.currentTimeMillis(); + byte[] timestamps = domaindb.get(domainEntryKey); + if (timestamps == null) { + timestamps = new byte[16]; + writeReverseOrderedLong(currentTimestamp, timestamps, 0); + writeReverseOrderedLong(currentTimestamp, timestamps, 8); + } else { + writeReverseOrderedLong(currentTimestamp, timestamps, 8); + } + domainWriteBatch.put(domainEntryKey, timestamps); + ownerWriteBatch.put(ownerLookupEntryKey, timestamps); + domaindb.write(domainWriteBatch); + ownerdb.write(ownerWriteBatch); + } finally { + IOUtils.cleanup(LOG, domainWriteBatch); + IOUtils.cleanup(LOG, ownerWriteBatch); + } + } + + /** + * Creates a domain entity key with column name suffix, + * of the form DOMAIN_ENTRY_PREFIX + domain id + column name. + */ + private static byte[] createDomainEntryKey(String domainId, + byte[] columnName) throws IOException { + return KeyBuilder.newInstance().add(DOMAIN_ENTRY_PREFIX) + .add(domainId).add(columnName).getBytes(); + } + + /** + * Creates an owner lookup key with column name suffix, + * of the form OWNER_LOOKUP_PREFIX + owner + domain id + column name. + */ + private static byte[] createOwnerLookupKey( + String owner, String domainId, byte[] columnName) throws IOException { + return KeyBuilder.newInstance().add(OWNER_LOOKUP_PREFIX) + .add(owner).add(domainId).add(columnName).getBytes(); + } + + @Override + public TimelineDomain getDomain(String domainId) + throws IOException { + DBIterator iterator = null; + try { + byte[] prefix = KeyBuilder.newInstance() + .add(DOMAIN_ENTRY_PREFIX).add(domainId).getBytesForLookup(); + iterator = domaindb.iterator(); + iterator.seek(prefix); + return getTimelineDomain(iterator, domainId, prefix); + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + @Override + public TimelineDomains getDomains(String owner) + throws IOException { + DBIterator iterator = null; + try { + byte[] prefix = KeyBuilder.newInstance() + .add(OWNER_LOOKUP_PREFIX).add(owner).getBytesForLookup(); + List domains = new ArrayList(); + for (iterator = ownerdb.iterator(), iterator.seek(prefix); + iterator.hasNext();) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key)) { + break; + } + // Iterator to parse the rows of an individual domain + KeyParser kp = new KeyParser(key, prefix.length); + String domainId = kp.getNextString(); + byte[] prefixExt = KeyBuilder.newInstance().add(OWNER_LOOKUP_PREFIX) + .add(owner).add(domainId).getBytesForLookup(); + TimelineDomain domainToReturn = + getTimelineDomain(iterator, domainId, prefixExt); + if (domainToReturn != null) { + domains.add(domainToReturn); + } + } + // Sort the domains to return + Collections.sort(domains, new Comparator() { + @Override + public int compare( + TimelineDomain domain1, TimelineDomain domain2) { + int result = domain2.getCreatedTime().compareTo( + domain1.getCreatedTime()); + if (result == 0) { + return domain2.getModifiedTime().compareTo( + domain1.getModifiedTime()); + } else { + return result; + } + } + }); + TimelineDomains domainsToReturn = new TimelineDomains(); + domainsToReturn.addDomains(domains); + return domainsToReturn; + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + private static TimelineDomain getTimelineDomain( + DBIterator iterator, String domainId, byte[] prefix) throws IOException { + // Iterate over all the rows whose key starts with prefix to retrieve the + // domain information. + TimelineDomain domain = new TimelineDomain(); + domain.setId(domainId); + boolean noRows = true; + for (; iterator.hasNext(); iterator.next()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key)) { + break; + } + if (noRows) { + noRows = false; + } + byte[] value = iterator.peekNext().getValue(); + if (value != null && value.length > 0) { + if (key[prefix.length] == DESCRIPTION_COLUMN[0]) { + domain.setDescription(new String(value)); + } else if (key[prefix.length] == OWNER_COLUMN[0]) { + domain.setOwner(new String(value)); + } else if (key[prefix.length] == READER_COLUMN[0]) { + domain.setReaders(new String(value)); + } else if (key[prefix.length] == WRITER_COLUMN[0]) { + domain.setWriters(new String(value)); + } else if (key[prefix.length] == TIMESTAMP_COLUMN[0]) { + domain.setCreatedTime(readReverseOrderedLong(value, 0)); + domain.setModifiedTime(readReverseOrderedLong(value, 8)); + } else { + LOG.error("Unrecognized domain column: " + key[prefix.length]); + } + } + } + if (noRows) { + return null; + } else { + return domain; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java index 8c6b83a..0803db8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.timeline; -import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -43,7 +41,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; @@ -218,7 +215,7 @@ public TimelineEvents getEvents( eventsItr.remove(); } } catch (Exception e) { - LOG.error("Error when verifying access for user " + callerUGI + LOG.warn("Error when verifying access for user " + callerUGI + " on the events of the timeline entity " + new EntityIdentifier(eventsOfOneEntity.getEntityId(), eventsOfOneEntity.getEntityType()), e); @@ -261,44 +258,41 @@ public TimelinePutResponse postEntities( TimelineEntity existingEntity = null; try { existingEntity = - store.getEntity(entityID.getId(), entityID.getType(), + store.getEntity(entity.getEntityId(), entity.getEntityType(), EnumSet.of(Field.PRIMARY_FILTERS)); if (existingEntity != null) { addDefaultDomainIdIfAbsent(existingEntity); if (!existingEntity.getDomainId().equals(entity.getDomainId())) { throw new YarnException("The domain of the timeline entity " - + entityID + " is not allowed to be changed."); + + "{ id: " + entity.getEntityId() + ", type: "+ entity.getEntityType() + " }" + + " is not allowed to be changed from " + existingEntity.getDomainId() + + " to " + entity.getDomainId()); } } if (!timelineACLsManager.checkAccess( callerUGI, ApplicationAccessType.MODIFY_APP, entity)) { throw new YarnException(callerUGI - + " is not allowed to put the timeline entity " + entityID + + " is not allowed to put the timeline entity " + + "{ id: " + entity.getEntityId() + ", type: "+ entity.getEntityType() + " }" + " into the domain " + entity.getDomainId() + "."); } } catch (Exception e) { // Skip the entity which already exists and was put by others - LOG.error("Skip the timeline entity: " + entityID, e); + LOG.warn("Skip the timeline entity: " + + "{ id: " + entity.getEntityId() + ", type: "+ entity.getEntityType() + " }", e); TimelinePutResponse.TimelinePutError error = new TimelinePutResponse.TimelinePutError(); - error.setEntityId(entityID.getId()); - error.setEntityType(entityID.getType()); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); error.setErrorCode( TimelinePutResponse.TimelinePutError.ACCESS_DENIED); errors.add(error); continue; } - entityIDs.add(entityID); entitiesToPut.addEntity(entity); - if (LOG.isDebugEnabled()) { - LOG.debug("Storing the entity " + entityID + ", JSON-style content: " - + TimelineUtils.dumpTimelineRecordtoJSON(entity)); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs)); } + TimelinePutResponse response = store.put(entitiesToPut); // add the errors of timeline system filter key conflict response.addErrors(errors); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java index fc6cc7d..153bf34 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java @@ -112,6 +112,16 @@ public String getNextString() throws IOException { return s; } + public void skipNextString() throws IOException { + if (offset >= b.length) { + throw new IOException("tried to read nonexistent string from byte array"); + } + while (offset < b.length && b[offset] != 0x0) { + ++offset; + } + ++offset; + } + public long getNextLong() throws IOException { if (offset + 8 >= b.length) { throw new IOException("byte array ran out when trying to read long"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java index c5c0f93..345d1bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java @@ -198,7 +198,7 @@ public void testDeleteEntities() throws IOException, InterruptedException { verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(1), domainId2); - ((LeveldbTimelineStore)store).discardOldEntities(-123l); + ((LeveldbTimelineStore)store).discardOldEntities(0l); assertEquals(2, getEntities("type_1").size()); assertEquals(0, getEntities("type_2").size()); assertEquals(6, ((LeveldbTimelineStore)store).getEntityTypes().size()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java index e69de29..014c808 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import java.io.File; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.iq80.leveldb.DB; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestRollingLevelDB { + Configuration conf = new YarnConfiguration(); + FileSystem lfs; + MyRollingLevelDB rollingLevelDB; + + public static class MyRollingLevelDB extends RollingLevelDB { + long time; + + MyRollingLevelDB() { + super("Test"); + this.time = System.currentTimeMillis(); + } + + @Override + protected long currentTimeMillis() { + return time; + } + + public void setCurrentTimeMillis(long time) { + this.time = time; + } + }; + + @Before + public void setup() throws Exception { + lfs = FileSystem.getLocal(conf); + File fsPath = new File("target", this.getClass().getSimpleName() + + "-tmpDir").getAbsoluteFile(); + conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, fsPath.getAbsolutePath()); + lfs.delete(new Path(fsPath.getAbsolutePath()), true); + rollingLevelDB = new MyRollingLevelDB(); + } + + @Test + public void testInsertAfterRollPeriodRollsDB() throws Exception { + + rollingLevelDB.init(conf); + long now = rollingLevelDB.currentTimeMillis(); + DB db = rollingLevelDB.getDBForStartTime(now); + long startTime = rollingLevelDB.getStartTimeFor(db); + Assert.assertEquals("Received level db for incorrect start time", + rollingLevelDB.computeCurrentCheckMillis(now), + startTime); + now = rollingLevelDB.getNextRollingTimeMillis(); + rollingLevelDB.setCurrentTimeMillis(now); + db = rollingLevelDB.getDBForStartTime(now); + startTime = rollingLevelDB.getStartTimeFor(db); + Assert.assertEquals("Received level db for incorrect start time", + rollingLevelDB.computeCurrentCheckMillis(now), + startTime); + } + + @Test + public void testInsertForPreviousPeriodAfterRollPeriodRollsDB() throws Exception { + + rollingLevelDB.init(conf); + long now = rollingLevelDB.currentTimeMillis(); + now = rollingLevelDB.computeCurrentCheckMillis(now); + rollingLevelDB.setCurrentTimeMillis(now); + DB db = rollingLevelDB.getDBForStartTime(now - 1); + long startTime = rollingLevelDB.getStartTimeFor(db); + Assert.assertEquals("Received level db for incorrect start time", + rollingLevelDB.computeCurrentCheckMillis(now - 1), + startTime); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java index e69de29..53297a5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.records.Version; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TestRollingLevelDBTimelineStore extends TimelineStoreTestUtils { + private FileContext fsContext; + private File fsPath; + private Configuration config = new YarnConfiguration(); + + @Before + public void setup() throws Exception { + fsContext = FileContext.getLocalFSFileContext(); + fsPath = new File("target", this.getClass().getSimpleName() + + "-tmpDir").getAbsoluteFile(); + fsContext.delete(new Path(fsPath.getAbsolutePath()), true); + config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, + fsPath.getAbsolutePath()); + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); + store = new RollingLevelDBTimelineStore(); + store.init(config); + store.start(); + loadTestEntityData(); + loadVerificationEntityData(); + loadTestDomainData(); + } + + @After + public void tearDown() throws Exception { + store.stop(); + fsContext.delete(new Path(fsPath.getAbsolutePath()), true); + } + + @Test + public void testRootDirPermission() throws IOException { + FileSystem fs = FileSystem.getLocal(new YarnConfiguration()); + FileStatus file = fs.getFileStatus( + new Path(fsPath.getAbsolutePath(), RollingLevelDBTimelineStore.FILENAME)); + assertNotNull(file); + assertEquals(RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK, file.getPermission()); + } + + @Test + public void testGetSingleEntity() throws IOException { + super.testGetSingleEntity(); + ((RollingLevelDBTimelineStore)store).clearStartTimeCache(); + super.testGetSingleEntity(); + loadTestEntityData(); + } + + @Test + public void testGetEntities() throws IOException { + super.testGetEntities(); + } + + @Test + public void testGetEntitiesWithFromId() throws IOException { + super.testGetEntitiesWithFromId(); + } + + @Test + public void testGetEntitiesWithFromTs() throws IOException { + super.testGetEntitiesWithFromTs(); + } + + @Test + public void testGetEntitiesWithPrimaryFilters() throws IOException { + super.testGetEntitiesWithPrimaryFilters(); + } + + @Test + public void testGetEntitiesWithSecondaryFilters() throws IOException { + super.testGetEntitiesWithSecondaryFilters(); + } + + @Test + public void testGetEvents() throws IOException { + super.testGetEvents(); + } + + @Test + public void testCacheSizes() { + Configuration conf = new Configuration(); + assertEquals(10000, RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf)); + assertEquals(10000, RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf)); + conf.setInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + 10001); + assertEquals(10001, RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf)); + conf = new Configuration(); + conf.setInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + 10002); + assertEquals(10002, RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf)); + } + + @Test + public void testCheckVersion() throws IOException { + RollingLevelDBTimelineStore dbStore = (RollingLevelDBTimelineStore) store; + // default version + Version defaultVersion = dbStore.getCurrentVersion(); + Assert.assertEquals(defaultVersion, dbStore.loadVersion()); + + // compatible version + Version compatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion(), + defaultVersion.getMinorVersion() + 2); + dbStore.storeVersion(compatibleVersion); + Assert.assertEquals(compatibleVersion, dbStore.loadVersion()); + restartTimelineStore(); + dbStore = (RollingLevelDBTimelineStore) store; + // overwrite the compatible version + Assert.assertEquals(defaultVersion, dbStore.loadVersion()); + + // incompatible version + Version incompatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion() + 1, + defaultVersion.getMinorVersion()); + dbStore.storeVersion(incompatibleVersion); + try { + restartTimelineStore(); + Assert.fail("Incompatible version, should expect fail here."); + } catch (ServiceStateException e) { + Assert.assertTrue("Exception message mismatch", + e.getMessage().contains("Incompatible version for timeline store")); + } + } + + @Test + public void testValidateConfig() throws IOException { + Configuration copyConfig = new YarnConfiguration(config); + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig.setLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, 0); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration.TIMELINE_SERVICE_TTL_MS)); + } + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig.setLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, 0); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS)); + } + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig.setLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, -1); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); + } + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig + .setLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + 0); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert + .assertTrue(e + .getMessage().contains( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE)); + } + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig + .setLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + 0); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert + .assertTrue(e + .getMessage() + .contains( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE)); + } + config = copyConfig; + restartTimelineStore(); + } + + private void restartTimelineStore() throws IOException { + // need to close so leveldb releases database lock + if (store != null) { + store.close(); + } + store = new RollingLevelDBTimelineStore(); + store.init(config); + store.start(); + } + + @Test + public void testGetDomain() throws IOException { + super.testGetDomain(); + } + + @Test + public void testGetDomains() throws IOException { + super.testGetDomains(); + } + + @Test + public void testRelatingToNonExistingEntity() throws IOException { + TimelineEntity entityToStore = new TimelineEntity(); + entityToStore.setEntityType("TEST_ENTITY_TYPE_1"); + entityToStore.setEntityId("TEST_ENTITY_ID_1"); + entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID); + entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2"); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entityToStore); + store.put(entities); + TimelineEntity entityToGet = + store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null); + Assert.assertNotNull(entityToGet); + Assert.assertEquals("DEFAULT", entityToGet.getDomainId()); + Assert.assertEquals("TEST_ENTITY_TYPE_1", + entityToGet.getRelatedEntities().keySet().iterator().next()); + Assert.assertEquals("TEST_ENTITY_ID_1", + entityToGet.getRelatedEntities().values().iterator().next() + .iterator().next()); + } + + @Test + public void testRelatingToOldEntityWithoutDomainId() throws IOException { + // New entity is put in the default domain + TimelineEntity entityToStore = new TimelineEntity(); + entityToStore.setEntityType("NEW_ENTITY_TYPE_1"); + entityToStore.setEntityId("NEW_ENTITY_ID_1"); + entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID); + entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1"); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entityToStore); + store.put(entities); + + TimelineEntity entityToGet = + store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null); + Assert.assertNotNull(entityToGet); + Assert.assertEquals("DEFAULT", entityToGet.getDomainId()); + Assert.assertEquals("NEW_ENTITY_TYPE_1", + entityToGet.getRelatedEntities().keySet().iterator().next()); + Assert.assertEquals("NEW_ENTITY_ID_1", + entityToGet.getRelatedEntities().values().iterator().next() + .iterator().next()); + + // New entity is not put in the default domain + entityToStore = new TimelineEntity(); + entityToStore.setEntityType("NEW_ENTITY_TYPE_2"); + entityToStore.setEntityId("NEW_ENTITY_ID_2"); + entityToStore.setDomainId("NON_DEFAULT"); + entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1"); + entities = new TimelineEntities(); + entities.addEntity(entityToStore); + TimelinePutResponse response = store.put(entities); + Assert.assertEquals(1, response.getErrors().size()); + Assert.assertEquals(TimelinePutError.FORBIDDEN_RELATION, + response.getErrors().get(0).getErrorCode()); + entityToGet = + store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null); + Assert.assertNotNull(entityToGet); + Assert.assertEquals("DEFAULT", entityToGet.getDomainId()); + // Still have one related entity + Assert.assertEquals(1, entityToGet.getRelatedEntities().keySet().size()); + Assert.assertEquals(1, entityToGet.getRelatedEntities().values() + .iterator().next().size()); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java index da71f46..6aff085 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java @@ -138,7 +138,7 @@ protected void loadTestEntityData() throws IOException { relatedEntities.put(entityType2, Collections.singleton(entityId2)); TimelineEvent ev3 = createEvent(789l, "launch_event", null); - TimelineEvent ev4 = createEvent(-123l, "init_event", null); + TimelineEvent ev4 = createEvent(0l, "init_event", null); List events = new ArrayList(); events.add(ev3); events.add(ev4); @@ -301,7 +301,7 @@ protected void loadVerificationEntityData() throws Exception { relEntityMap2.put(entityType4, Collections.singleton(entityId4)); ev3 = createEvent(789l, "launch_event", null); - ev4 = createEvent(-123l, "init_event", null); + ev4 = createEvent(0l, "init_event", null); events2 = new ArrayList(); events2.add(ev3); events2.add(ev4); @@ -383,7 +383,7 @@ public void testGetSingleEntity() throws IOException { entityType1, EnumSet.allOf(Field.class)), domainId1); verifyEntityInfo(entityId2, entityType2, events2, relEntityMap, - EMPTY_PRIMARY_FILTERS, EMPTY_MAP, -123l, store.getEntity(entityId2, + EMPTY_PRIMARY_FILTERS, EMPTY_MAP, 0l, store.getEntity(entityId2, entityType2, EnumSet.allOf(Field.class)), domainId1); verifyEntityInfo(entityId4, entityType4, EMPTY_EVENTS, EMPTY_REL_ENTITIES,