diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java index a56d4d4..abe106f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline/TimelinePutResponse.java @@ -129,6 +129,12 @@ public void setErrors(List errors) { */ public static final int FORBIDDEN_RELATION = 6; + /** + * Error code returned if the entity start time is before the eviction + * period of old data. + */ + public static final int EXPIRED_ENTITY = 7; + private String entityId; private String entityType; private int errorCode; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 790a4dd..9780ae5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1431,6 +1431,18 @@ private static void addDeprecatedKeys() { public static final long DEFAULT_TIMELINE_SERVICE_TTL_MS = 1000 * 60 * 60 * 24 * 7; + /** Timeline service rolling period. Valid values are daily, half_daily, + * quarter_daily, and hourly. */ + public static final String TIMELINE_SERVICE_ROLLING_PERIOD = + TIMELINE_SERVICE_PREFIX + "rolling-period"; + + /** Roll a new database each hour. */ + public static final String DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD = + "hourly"; + + /** Implementation specific configuration prefix for Timeline Service + * leveldb. + */ public static final String TIMELINE_SERVICE_LEVELDB_PREFIX = TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store."; @@ -1438,13 +1450,36 @@ private static void addDeprecatedKeys() { public static final String TIMELINE_SERVICE_LEVELDB_PATH = TIMELINE_SERVICE_LEVELDB_PREFIX + "path"; - /** Timeline service leveldb read cache (uncompressed blocks) */ + /** Timeline service leveldb read cache (uncompressed blocks). This is + * per rolling instance so should be tuned if using rolling leveldb + * timeline store */ public static final String TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE = TIMELINE_SERVICE_LEVELDB_PREFIX + "read-cache-size"; + /** Default leveldb read cache size if no configuration is specified. */ public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE = 100 * 1024 * 1024; + /** Timeline service leveldb write buffer size. */ + public static final String TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE = + TIMELINE_SERVICE_LEVELDB_PREFIX + "write-buffer-size"; + + /** Default leveldb write buffer size if no configuration is specified. This + * is per rolling instance so should be tuned if using rolling leveldb + * timeline store. */ + public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE = + 16 * 1024 * 1024; + + /** Timeline service leveldb write batch size. This value can be tuned down + * to reduce lock time for ttl eviction. */ + public static final String + TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE = + TIMELINE_SERVICE_LEVELDB_PREFIX + "write-batch-size"; + + /** Default leveldb write batch size is no configuration is specified */ + public static final int + DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE = 10000; + /** Timeline service leveldb start time read cache (number of entities) */ public static final String TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE = @@ -1468,6 +1503,16 @@ private static void addDeprecatedKeys() { public static final long DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS = 1000 * 60 * 5; + /** Timeline service leveldb number of concurrent open files. Tuned this + * configuration to stay within system limits. This is per rolling instance + * so should be tuned if using rolling leveldb timeline store. */ + public static final String TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES = + TIMELINE_SERVICE_LEVELDB_PREFIX + "max-open-files"; + + /** Default leveldb max open files if no configuration is specified. */ + public static final int DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES = + 1000; + /** The Kerberos principal for the timeline server.*/ public static final String TIMELINE_SERVICE_PRINCIPAL = TIMELINE_SERVICE_PREFIX + "principal"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml index f183cae..84eee39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml @@ -180,6 +180,11 @@ bcprov-jdk16 test + + de.ruedigermoeller + fst + 2.24 + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java index e69de29..6d10671 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDB.java @@ -0,0 +1,420 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import java.io.File; +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.Iterator; +import java.util.Locale; +import java.util.Map; +import java.util.TimeZone; +import java.util.TreeMap; +import java.util.Map.Entry; + +import org.apache.commons.io.FilenameUtils; +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.Options; +import org.iq80.leveldb.WriteBatch; + +/** + * Contains the logic to lookup a leveldb by timestamp so that multiple smaller + * databases can roll according to the configured period and evicted efficiently + * via operating system directory removal. + */ +class RollingLevelDB { + + /** Logger for this class. */ + private static final Log LOG = LogFactory.getLog(RollingLevelDB.class); + /** Factory to open and create new leveldb instances. */ + private static JniDBFactory factory = new JniDBFactory(); + /** Thread safe date formatter. */ + private FastDateFormat fdf; + /** Date parser. */ + private SimpleDateFormat sdf; + /** Calendar to calculate the current and next rolling period. */ + private GregorianCalendar cal = new GregorianCalendar( + TimeZone.getTimeZone("GMT")); + /** Collection of all active rolling leveldb instances. */ + private final TreeMap rollingdbs; + /** Collection of all rolling leveldb instances to evict. */ + private final TreeMap rollingdbsToEvict; + /** Name of this rolling level db. */ + private final String name; + /** Calculated timestamp of when to roll a new leveldb instance. */ + private volatile long nextRollingCheckMillis = 0; + /** File system instance to find and create new leveldb instances. */ + private FileSystem lfs = null; + /** Directory to store rolling leveldb instances. */ + private Path rollingDBPath; + /** Configuration for this object. */ + private Configuration conf; + /** Rolling period. */ + private RollingPeriod rollingPeriod; + /** + * Rolling leveldb instances are evicted when their endtime is earlier than + * the current time minus the time to live value. + */ + private long ttl; + /** Whether time to live is enabled. */ + private boolean ttlEnabled; + + /** Encapsulates the rolling period to date format lookup. */ + enum RollingPeriod { + DAILY { + @Override + public String dateFormat() { + return "yyyy-MM-dd"; + } + }, + HALF_DAILY { + @Override + public String dateFormat() { + return "yyyy-MM-dd-HH"; + } + }, + QUARTER_DAILY { + @Override + public String dateFormat() { + return "yyyy-MM-dd-HH"; + } + }, + HOURLY { + @Override + public String dateFormat() { + return "yyyy-MM-dd-HH"; + } + }, + MINUTELY { + @Override + public String dateFormat() { + return "yyyy-MM-dd-HH-mm"; + } + }; + public abstract String dateFormat(); + } + + /** + * Convenience class for associating a write batch with its rolling leveldb + * instance. + */ + public static class RollingWriteBatch { + /** Leveldb object. */ + private final DB db; + /** Write batch for the db object. */ + private final WriteBatch writeBatch; + + public RollingWriteBatch(final DB db, final WriteBatch writeBatch) { + this.db = db; + this.writeBatch = writeBatch; + } + + public DB getDB() { + return db; + } + + public WriteBatch getWriteBatch() { + return writeBatch; + } + + public void write() { + db.write(writeBatch); + } + + public void close() { + IOUtils.cleanup(LOG, writeBatch); + } + } + + RollingLevelDB(String name) { + this.name = name; + this.rollingdbs = new TreeMap(); + this.rollingdbsToEvict = new TreeMap(); + } + + protected String getName() { + return name; + } + + protected long currentTimeMillis() { + return System.currentTimeMillis(); + } + + public long getNextRollingTimeMillis() { + return nextRollingCheckMillis; + } + + public long getTimeToLive() { + return ttl; + } + + public boolean getTimeToLiveEnabled() { + return ttlEnabled; + } + + protected void setNextRollingTimeMillis(final long timestamp) { + this.nextRollingCheckMillis = timestamp; + LOG.info("Next rolling time for " + getName() + " is " + + fdf.format(nextRollingCheckMillis)); + } + + public void init(final Configuration config) throws Exception { + LOG.info("Initializing RollingLevelDB for " + getName()); + this.conf = config; + this.ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS); + this.ttlEnabled = conf.getBoolean( + YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true); + this.rollingDBPath = new Path( + conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), + RollingLevelDBTimelineStore.FILENAME); + initFileSystem(); + initRollingPeriod(); + initHistoricalDBs(); + } + + protected void initFileSystem() throws IOException { + lfs = FileSystem.getLocal(conf); + boolean success = lfs.mkdirs(rollingDBPath, + RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK); + if (!success) { + throw new IOException("Failed to create leveldb root directory " + + rollingDBPath); + } + } + + protected synchronized void initRollingPeriod() { + final String lcRollingPeriod = conf.get( + YarnConfiguration.TIMELINE_SERVICE_ROLLING_PERIOD, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ROLLING_PERIOD); + this.rollingPeriod = RollingPeriod.valueOf(lcRollingPeriod + .toUpperCase(Locale.ENGLISH)); + fdf = FastDateFormat.getInstance(rollingPeriod.dateFormat(), + TimeZone.getTimeZone("GMT")); + sdf = new SimpleDateFormat(rollingPeriod.dateFormat()); + sdf.setTimeZone(fdf.getTimeZone()); + } + + protected synchronized void initHistoricalDBs() throws IOException { + Path rollingDBGlobPath = new Path(rollingDBPath, getName() + ".*"); + FileStatus[] statuses = lfs.globStatus(rollingDBGlobPath); + for (FileStatus status : statuses) { + String dbName = FilenameUtils.getExtension(status.getPath().toString()); + try { + Long dbStartTime = sdf.parse(dbName).getTime(); + initRollingLevelDB(dbStartTime, status.getPath()); + } catch (ParseException pe) { + LOG.warn("Failed to initialize rolling leveldb " + dbName + " for " + + getName()); + } + } + } + + private void initRollingLevelDB(Long dbStartTime, + Path rollingInstanceDBPath) { + if (rollingdbs.containsKey(dbStartTime)) { + return; + } + Options options = new Options(); + options.createIfMissing(true); + options.cacheSize(conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); + options.maxOpenFiles(conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES)); + options.writeBufferSize(conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE)); + LOG.info("Initializing rolling leveldb instance :" + rollingInstanceDBPath + + " for start time: " + dbStartTime); + DB db = null; + try { + db = factory.open( + new File(rollingInstanceDBPath.toUri().getPath()), options); + rollingdbs.put(dbStartTime, db); + String dbName = fdf.format(dbStartTime); + LOG.info("Added rolling leveldb instance " + dbName + " to " + getName()); + } catch (IOException ioe) { + LOG.warn("Failed to open rolling leveldb instance :" + + new File(rollingInstanceDBPath.toUri().getPath()), ioe); + } + } + + synchronized DB getPreviousDB(DB db) { + Iterator iterator = rollingdbs.values().iterator(); + DB prev = null; + while (iterator.hasNext()) { + DB cur = iterator.next(); + if (cur == db) { + break; + } + prev = cur; + } + return prev; + } + + synchronized long getStartTimeFor(DB db) { + long startTime = -1; + for (Map.Entry entry : rollingdbs.entrySet()) { + if (entry.getValue() == db) { + startTime = entry.getKey(); + } + } + return startTime; + } + + public synchronized DB getDBForStartTime(long startTime) { + // make sure we sanitize this input + startTime = Math.min(startTime, currentTimeMillis()); + + if (startTime >= getNextRollingTimeMillis()) { + roll(startTime); + } + Entry entry = rollingdbs.floorEntry(startTime); + if (entry == null) { + return null; + } + return entry.getValue(); + } + + private void roll(long startTime) { + LOG.info("Rolling new DB instance for " + getName()); + long currentStartTime = computeCurrentCheckMillis(startTime); + setNextRollingTimeMillis(computeNextCheckMillis(currentStartTime)); + String currentRollingDBInstance = fdf.format(currentStartTime); + String currentRollingDBName = getName() + "." + currentRollingDBInstance; + Path currentRollingDBPath = new Path(rollingDBPath, currentRollingDBName); + if (getTimeToLiveEnabled()) { + scheduleOldDBsForEviction(); + } + initRollingLevelDB(currentStartTime, currentRollingDBPath); + } + + private synchronized void scheduleOldDBsForEviction() { + // keep at least time to live amount of data + long evictionThreshold = computeCurrentCheckMillis(currentTimeMillis() + - getTimeToLive()); + + LOG.info("Scheduling " + getName() + " DBs older than " + + fdf.format(evictionThreshold) + " for eviction"); + Iterator> iterator = rollingdbs.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + // parse this in gmt time + if (entry.getKey() < evictionThreshold) { + LOG.info("Scheduling " + getName() + " eviction for " + + fdf.format(entry.getKey())); + iterator.remove(); + rollingdbsToEvict.put(entry.getKey(), entry.getValue()); + } + } + } + + public synchronized void evictOldDBs() { + LOG.info("Evicting " + getName() + " DBs scheduled for eviction"); + Iterator> iterator = rollingdbsToEvict.entrySet() + .iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + IOUtils.cleanup(LOG, entry.getValue()); + String dbName = fdf.format(entry.getKey()); + Path path = new Path(rollingDBPath, getName() + "." + dbName); + try { + LOG.info("Removing old db directory contents in " + path); + lfs.delete(path, true); + } catch (IOException ioe) { + LOG.warn("Failed to evict old db " + path, ioe); + } + iterator.remove(); + } + } + + public void stop() throws Exception { + for (DB db : rollingdbs.values()) { + IOUtils.cleanup(LOG, db); + } + IOUtils.cleanup(LOG, lfs); + } + + private long computeNextCheckMillis(long now) { + return computeCheckMillis(now, true); + } + + public long computeCurrentCheckMillis(long now) { + return computeCheckMillis(now, false); + } + + private synchronized long computeCheckMillis(long now, boolean next) { + // needs to be called synchronously due to shared Calendar + cal.setTimeInMillis(now); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + + if (rollingPeriod == RollingPeriod.DAILY) { + cal.set(Calendar.HOUR_OF_DAY, 0); + cal.set(Calendar.MINUTE, 0); + if (next) { + cal.add(Calendar.DATE, 1); + } + } else if (rollingPeriod == RollingPeriod.HALF_DAILY) { + // round down to 12 hour interval + int hour = (cal.get(Calendar.HOUR) / 12) * 12; + cal.set(Calendar.HOUR, hour); + cal.set(Calendar.MINUTE, 0); + if (next) { + cal.add(Calendar.HOUR_OF_DAY, 12); + } + } else if (rollingPeriod == RollingPeriod.QUARTER_DAILY) { + // round down to 6 hour interval + int hour = (cal.get(Calendar.HOUR) / 6) * 6; + cal.set(Calendar.HOUR, hour); + cal.set(Calendar.MINUTE, 0); + if (next) { + cal.add(Calendar.HOUR_OF_DAY, 6); + } + } else if (rollingPeriod == RollingPeriod.HOURLY) { + cal.set(Calendar.MINUTE, 0); + if (next) { + cal.add(Calendar.HOUR_OF_DAY, 1); + } + } else if (rollingPeriod == RollingPeriod.MINUTELY) { + // round down to 5 minute interval + int minute = (cal.get(Calendar.MINUTE) / 5) * 5; + cal.set(Calendar.MINUTE, minute); + if (next) { + cal.add(Calendar.MINUTE, 5); + } + } + return cal.getTimeInMillis(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java index e69de29..8b6a51b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java @@ -0,0 +1,1807 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; + +import org.apache.commons.collections.map.LRUMap; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; +import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch; +import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl; +import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder; +import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser; + +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.Options; +import org.iq80.leveldb.ReadOptions; +import org.iq80.leveldb.WriteBatch; +import org.nustaq.serialization.FSTConfiguration; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; +import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong; +import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID; +import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_MS; + +import static org.fusesource.leveldbjni.JniDBFactory.bytes; + +/** + *

+ * An implementation of an application timeline store backed by leveldb. + *

+ * + *

+ * There are three sections of the db, the start time section, the entity + * section, and the indexed entity section. + *

+ * + *

+ * The start time section is used to retrieve the unique start time for a given + * entity. Its values each contain a start time while its keys are of the form: + *

+ * + *
+ *   START_TIME_LOOKUP_PREFIX + entity type + entity id
+ * 
+ * + *

+ * The entity section is ordered by entity type, then entity start time + * descending, then entity ID. There are four sub-sections of the entity + * section: events, primary filters, related entities, and other info. The event + * entries have event info serialized into their values. The other info entries + * have values corresponding to the values of the other info name/value map for + * the entry (note the names are contained in the key). All other entries have + * empty values. The key structure is as follows: + *

+ * + *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     DOMAIN_ID_COLUMN
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     EVENTS_COLUMN + reveventtimestamp + eventtype
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     PRIMARY_FILTERS_COLUMN + name + value
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     OTHER_INFO_COLUMN + name
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
+ * 
+ * + *

+ * The indexed entity section contains a primary filter name and primary filter + * value as the prefix. Within a given name/value, entire entity entries are + * stored in the same format as described in the entity section above (below, + * "key" represents any one of the possible entity entry keys described above). + *

+ * + *
+ *   INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
+ *     key
+ * 
+ */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class RollingLevelDBTimelineStore extends AbstractService implements + TimelineStore { + private static final Log LOG = LogFactory + .getLog(RollingLevelDBTimelineStore.class); + private static FSTConfiguration fstConf = + FSTConfiguration.createDefaultConfiguration(); + + static { + fstConf.setShareReferences(false); + } + + @Private + @VisibleForTesting + static final String FILENAME = "leveldb-timeline-store"; + static final String DOMAIN = "domain-ldb"; + static final String ENTITY = "entity-ldb"; + static final String INDEX = "indexes-ldb"; + static final String STARTTIME = "starttime-ldb"; + static final String OWNER = "owner-ldb"; + + private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8); + private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8); + private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8); + private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(UTF_8); + private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes(UTF_8); + + private static final byte[] DESCRIPTION_COLUMN = "d".getBytes(UTF_8); + private static final byte[] OWNER_COLUMN = "o".getBytes(UTF_8); + private static final byte[] READER_COLUMN = "r".getBytes(UTF_8); + private static final byte[] WRITER_COLUMN = "w".getBytes(UTF_8); + private static final byte[] TIMESTAMP_COLUMN = "t".getBytes(UTF_8); + + private static final byte[] EMPTY_BYTES = new byte[0]; + + private static final String TIMELINE_STORE_VERSION_KEY = + "timeline-store-version"; + + private static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 0); + + private static long writeBatchSize = 10000; + + @Private + @VisibleForTesting + static final FsPermission LEVELDB_DIR_UMASK = FsPermission + .createImmutable((short) 0700); + + private Map startTimeWriteCache; + private Map startTimeReadCache; + + private DB domaindb; + private RollingLevelDB entitydb; + private RollingLevelDB indexdb; + private DB starttimedb; + private DB ownerdb; + + private Thread deletionThread; + + public RollingLevelDBTimelineStore() { + super(RollingLevelDBTimelineStore.class.getName()); + } + + @Override + @SuppressWarnings("unchecked") + protected void serviceInit(Configuration conf) throws Exception { + Preconditions + .checkArgument(conf.getLong(TIMELINE_SERVICE_TTL_MS, + DEFAULT_TIMELINE_SERVICE_TTL_MS) > 0, + "%s property value should be greater than zero", + TIMELINE_SERVICE_TTL_MS); + Preconditions.checkArgument(conf.getLong( + TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, + DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS) > 0, + "%s property value should be greater than zero", + TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS); + Preconditions.checkArgument(conf.getLong( + TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE) >= 0, + "%s property value should be greater than or equal to zero", + TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE); + Preconditions.checkArgument(conf.getLong( + TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE) > 0, + " %s property value should be greater than zero", + TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE); + Preconditions.checkArgument(conf.getLong( + TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE) > 0, + "%s property value should be greater than zero", + TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE); + Preconditions.checkArgument(conf.getLong( + TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES, + DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES) > 0, + "%s property value should be greater than zero", + TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES); + Preconditions.checkArgument(conf.getLong( + TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE) > 0, + "%s property value should be greater than zero", + TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE); + + Options options = new Options(); + options.createIfMissing(true); + options.cacheSize(conf.getLong( + TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); + JniDBFactory factory = new JniDBFactory(); + Path dbPath = new Path( + conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME); + Path domainDBPath = new Path(dbPath, DOMAIN); + Path starttimeDBPath = new Path(dbPath, STARTTIME); + Path ownerDBPath = new Path(dbPath, OWNER); + FileSystem localFS = null; + try { + localFS = FileSystem.getLocal(conf); + if (!localFS.exists(dbPath)) { + if (!localFS.mkdirs(dbPath)) { + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + dbPath); + } + localFS.setPermission(dbPath, LEVELDB_DIR_UMASK); + } + if (!localFS.exists(domainDBPath)) { + if (!localFS.mkdirs(domainDBPath)) { + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + domainDBPath); + } + localFS.setPermission(domainDBPath, LEVELDB_DIR_UMASK); + } + if (!localFS.exists(starttimeDBPath)) { + if (!localFS.mkdirs(starttimeDBPath)) { + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + starttimeDBPath); + } + localFS.setPermission(starttimeDBPath, LEVELDB_DIR_UMASK); + } + if (!localFS.exists(ownerDBPath)) { + if (!localFS.mkdirs(ownerDBPath)) { + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + ownerDBPath); + } + localFS.setPermission(ownerDBPath, LEVELDB_DIR_UMASK); + } + } finally { + IOUtils.cleanup(LOG, localFS); + } + options.maxOpenFiles(conf.getInt( + TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES, + DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES)); + options.writeBufferSize(conf.getInt( + TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE)); + LOG.info("Using leveldb path " + dbPath); + domaindb = factory.open(new File(domainDBPath.toString()), options); + entitydb = new RollingLevelDB(ENTITY); + entitydb.init(conf); + indexdb = new RollingLevelDB(INDEX); + indexdb.init(conf); + starttimedb = factory.open(new File(starttimeDBPath.toString()), options); + ownerdb = factory.open(new File(ownerDBPath.toString()), options); + checkVersion(); + startTimeWriteCache = Collections.synchronizedMap(new LRUMap( + getStartTimeWriteCacheSize(conf))); + startTimeReadCache = Collections.synchronizedMap(new LRUMap( + getStartTimeReadCacheSize(conf))); + + writeBatchSize = conf.getInt( + TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE); + + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + if (getConfig().getBoolean(TIMELINE_SERVICE_TTL_ENABLE, true)) { + deletionThread = new EntityDeletionThread(getConfig()); + deletionThread.start(); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (deletionThread != null) { + deletionThread.interrupt(); + LOG.info("Waiting for deletion thread to complete its current action"); + try { + deletionThread.join(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for deletion thread to complete," + + " closing db now", e); + } + } + IOUtils.cleanup(LOG, domaindb); + IOUtils.cleanup(LOG, starttimedb); + IOUtils.cleanup(LOG, ownerdb); + entitydb.stop(); + indexdb.stop(); + super.serviceStop(); + } + + private class EntityDeletionThread extends Thread { + private final long ttl; + private final long ttlInterval; + + public EntityDeletionThread(Configuration conf) { + ttl = conf.getLong(TIMELINE_SERVICE_TTL_MS, + DEFAULT_TIMELINE_SERVICE_TTL_MS); + ttlInterval = conf.getLong( + TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, + DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS); + LOG.info("Starting deletion thread with ttl " + ttl + " and cycle " + + "interval " + ttlInterval); + } + + @Override + public void run() { + Thread.currentThread().setName("Leveldb Timeline Store Retention"); + while (true) { + long timestamp = System.currentTimeMillis() - ttl; + try { + discardOldEntities(timestamp); + Thread.sleep(ttlInterval); + } catch (IOException e) { + LOG.error(e); + } catch (InterruptedException e) { + LOG.info("Deletion thread received interrupt, exiting"); + break; + } + } + } + } + + @Override + public TimelineEntity getEntity(String entityId, String entityType, + EnumSet fields) throws IOException { + Long revStartTime = getStartTimeLong(entityId, entityType); + if (revStartTime == null) { + return null; + } + byte[] prefix = KeyBuilder.newInstance().add(entityType) + .add(writeReverseOrderedLong(revStartTime)).add(entityId) + .getBytesForLookup(); + + DBIterator iterator = null; + try { + DB db = entitydb.getDBForStartTime(revStartTime); + if (db == null) { + return null; + } + iterator = db.iterator(); + iterator.seek(prefix); + + return getEntity(entityId, entityType, revStartTime, fields, iterator, + prefix, prefix.length); + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + /** + * Read entity from a db iterator. If no information is found in the specified + * fields for this entity, return null. + */ + private static TimelineEntity getEntity(String entityId, String entityType, + Long startTime, EnumSet fields, DBIterator iterator, + byte[] prefix, int prefixlen) throws IOException { + if (fields == null) { + fields = EnumSet.allOf(Field.class); + } + + TimelineEntity entity = new TimelineEntity(); + boolean events = false; + boolean lastEvent = false; + if (fields.contains(Field.EVENTS)) { + events = true; + } else if (fields.contains(Field.LAST_EVENT_ONLY)) { + lastEvent = true; + } else { + entity.setEvents(null); + } + boolean relatedEntities = false; + if (fields.contains(Field.RELATED_ENTITIES)) { + relatedEntities = true; + } else { + entity.setRelatedEntities(null); + } + boolean primaryFilters = false; + if (fields.contains(Field.PRIMARY_FILTERS)) { + primaryFilters = true; + } else { + entity.setPrimaryFilters(null); + } + boolean otherInfo = false; + if (fields.contains(Field.OTHER_INFO)) { + otherInfo = true; + } else { + entity.setOtherInfo(null); + } + + // iterate through the entity's entry, parsing information if it is part + // of a requested field + for (; iterator.hasNext(); iterator.next()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefixlen, key)) { + break; + } + if (key.length == prefixlen) { + continue; + } + if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) { + if (primaryFilters) { + addPrimaryFilter(entity, key, prefixlen + + PRIMARY_FILTERS_COLUMN.length); + } + } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) { + if (otherInfo) { + entity.addOtherInfo( + parseRemainingKey(key, prefixlen + OTHER_INFO_COLUMN.length), + fstConf.asObject(iterator.peekNext().getValue())); + } + } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) { + if (relatedEntities) { + addRelatedEntity(entity, key, prefixlen + + RELATED_ENTITIES_COLUMN.length); + } + } else if (key[prefixlen] == EVENTS_COLUMN[0]) { + if (events || (lastEvent && entity.getEvents().size() == 0)) { + TimelineEvent event = getEntityEvent(null, key, prefixlen + + EVENTS_COLUMN.length, iterator.peekNext().getValue()); + if (event != null) { + entity.addEvent(event); + } + } + } else if (key[prefixlen] == DOMAIN_ID_COLUMN[0]) { + byte[] v = iterator.peekNext().getValue(); + String domainId = new String(v, UTF_8); + entity.setDomainId(domainId); + } else { + LOG.warn(String.format("Found unexpected column for entity %s of " + + "type %s (0x%02x)", entityId, entityType, key[prefixlen])); + } + } + + entity.setEntityId(entityId); + entity.setEntityType(entityType); + entity.setStartTime(startTime); + + return entity; + } + + @Override + public TimelineEvents getEntityTimelines(String entityType, + SortedSet entityIds, Long limit, Long windowStart, + Long windowEnd, Set eventType) throws IOException { + TimelineEvents events = new TimelineEvents(); + if (entityIds == null || entityIds.isEmpty()) { + return events; + } + // create a lexicographically-ordered map from start time to entities + Map> startTimeMap = + new TreeMap>( + new Comparator() { + @Override + public int compare(byte[] o1, byte[] o2) { + return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, + o2.length); + } + }); + DBIterator iterator = null; + try { + // look up start times for the specified entities + // skip entities with no start time + for (String entityId : entityIds) { + byte[] startTime = getStartTime(entityId, entityType); + if (startTime != null) { + List entities = startTimeMap.get(startTime); + if (entities == null) { + entities = new ArrayList(); + startTimeMap.put(startTime, entities); + } + entities.add(new EntityIdentifier(entityId, entityType)); + } + } + for (Entry> entry : startTimeMap + .entrySet()) { + // look up the events matching the given parameters (limit, + // start time, end time, event types) for entities whose start times + // were found and add the entities to the return list + byte[] revStartTime = entry.getKey(); + for (EntityIdentifier entityIdentifier : entry.getValue()) { + EventsOfOneEntity entity = new EventsOfOneEntity(); + entity.setEntityId(entityIdentifier.getId()); + entity.setEntityType(entityType); + events.addEvent(entity); + KeyBuilder kb = KeyBuilder.newInstance().add(entityType) + .add(revStartTime).add(entityIdentifier.getId()) + .add(EVENTS_COLUMN); + byte[] prefix = kb.getBytesForLookup(); + if (windowEnd == null) { + windowEnd = Long.MAX_VALUE; + } + byte[] revts = writeReverseOrderedLong(windowEnd); + kb.add(revts); + byte[] first = kb.getBytesForLookup(); + byte[] last = null; + if (windowStart != null) { + last = KeyBuilder.newInstance().add(prefix) + .add(writeReverseOrderedLong(windowStart)).getBytesForLookup(); + } + if (limit == null) { + limit = DEFAULT_LIMIT; + } + DB db = entitydb.getDBForStartTime(readReverseOrderedLong( + revStartTime, 0)); + if (db == null) { + continue; + } + iterator = db.iterator(); + for (iterator.seek(first); entity.getEvents().size() < limit + && iterator.hasNext(); iterator.next()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key) + || (last != null && WritableComparator.compareBytes(key, 0, + key.length, last, 0, last.length) > 0)) { + break; + } + TimelineEvent event = getEntityEvent(eventType, key, prefix.length, + iterator.peekNext().getValue()); + if (event != null) { + entity.addEvent(event); + } + } + } + } + } finally { + IOUtils.cleanup(LOG, iterator); + } + return events; + } + + @Override + public TimelineEntities getEntities(String entityType, Long limit, + Long windowStart, Long windowEnd, String fromId, Long fromTs, + NameValuePair primaryFilter, Collection secondaryFilters, + EnumSet fields, CheckAcl checkAcl) throws IOException { + if (primaryFilter == null) { + // if no primary filter is specified, prefix the lookup with + // ENTITY_ENTRY_PREFIX + return getEntityByTime(EMPTY_BYTES, entityType, limit, windowStart, + windowEnd, fromId, fromTs, secondaryFilters, fields, checkAcl, false); + } else { + // if a primary filter is specified, prefix the lookup with + // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue + + // ENTITY_ENTRY_PREFIX + byte[] base = KeyBuilder.newInstance().add(primaryFilter.getName()) + .add(fstConf.asByteArray(primaryFilter.getValue()), true) + .getBytesForLookup(); + return getEntityByTime(base, entityType, limit, windowStart, windowEnd, + fromId, fromTs, secondaryFilters, fields, checkAcl, true); + } + } + + /** + * Retrieves a list of entities satisfying given parameters. + * + * @param base + * A byte array prefix for the lookup + * @param entityType + * The type of the entity + * @param limit + * A limit on the number of entities to return + * @param starttime + * The earliest entity start time to retrieve (exclusive) + * @param endtime + * The latest entity start time to retrieve (inclusive) + * @param fromId + * Retrieve entities starting with this entity + * @param fromTs + * Ignore entities with insert timestamp later than this ts + * @param secondaryFilters + * Filter pairs that the entities should match + * @param fields + * The set of fields to retrieve + * @param usingPrimaryFilter + * true if this query is using a primary filter + * @return A list of entities + * @throws IOException + */ + private TimelineEntities getEntityByTime(byte[] base, String entityType, + Long limit, Long starttime, Long endtime, String fromId, Long fromTs, + Collection secondaryFilters, EnumSet fields, + CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException { + DBIterator iterator = null; + try { + KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType); + // only db keys matching the prefix (base + entity type) will be parsed + byte[] prefix = kb.getBytesForLookup(); + if (endtime == null) { + // if end time is null, place no restriction on end time + endtime = Long.MAX_VALUE; + } + + // Sanitize the fields parameter + if (fields == null) { + fields = EnumSet.allOf(Field.class); + } + + // construct a first key that will be seeked to using end time or fromId + long firstStartTime = Long.MAX_VALUE; + byte[] first = null; + if (fromId != null) { + Long fromIdStartTime = getStartTimeLong(fromId, entityType); + if (fromIdStartTime == null) { + // no start time for provided id, so return empty entities + return new TimelineEntities(); + } + if (fromIdStartTime <= endtime) { + // if provided id's start time falls before the end of the window, + // use it to construct the seek key + firstStartTime = fromIdStartTime; + first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId) + .getBytesForLookup(); + } + } + // if seek key wasn't constructed using fromId, construct it using end ts + if (first == null) { + firstStartTime = endtime; + first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup(); + } + byte[] last = null; + if (starttime != null) { + // if start time is not null, set a last key that will not be + // iterated past + last = KeyBuilder.newInstance().add(base).add(entityType) + .add(writeReverseOrderedLong(starttime)).getBytesForLookup(); + } + if (limit == null) { + // if limit is not specified, use the default + limit = DEFAULT_LIMIT; + } + + TimelineEntities entities = new TimelineEntities(); + RollingLevelDB rollingdb = null; + if (usingPrimaryFilter) { + rollingdb = indexdb; + } else { + rollingdb = entitydb; + } + + DB db = rollingdb.getDBForStartTime(firstStartTime); + while (entities.getEntities().size() < limit && db != null) { + iterator = db.iterator(); + iterator.seek(first); + + // iterate until one of the following conditions is met: limit is + // reached, there are no more keys, the key prefix no longer matches, + // or a start time has been specified and reached/exceeded + while (entities.getEntities().size() < limit && iterator.hasNext()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key) + || (last != null && WritableComparator.compareBytes(key, 0, + key.length, last, 0, last.length) > 0)) { + break; + } + // read the start time and entity id from the current key + KeyParser kp = new KeyParser(key, prefix.length); + Long startTime = kp.getNextLong(); + String entityId = kp.getNextString(); + + if (fromTs != null) { + long insertTime = readReverseOrderedLong(iterator.peekNext() + .getValue(), 0); + if (insertTime > fromTs) { + byte[] firstKey = key; + while (iterator.hasNext()) { + key = iterator.peekNext().getKey(); + iterator.next(); + if (!prefixMatches(firstKey, kp.getOffset(), key)) { + break; + } + } + continue; + } + } + // Even if other info and primary filter fields are not included, we + // still need to load them to match secondary filters when they are + // non-empty + EnumSet queryFields = EnumSet.copyOf(fields); + boolean addPrimaryFilters = false; + boolean addOtherInfo = false; + if (secondaryFilters != null && secondaryFilters.size() > 0) { + if (!queryFields.contains(Field.PRIMARY_FILTERS)) { + queryFields.add(Field.PRIMARY_FILTERS); + addPrimaryFilters = true; + } + if (!queryFields.contains(Field.OTHER_INFO)) { + queryFields.add(Field.OTHER_INFO); + addOtherInfo = true; + } + } + + // parse the entity that owns this key, iterating over all keys for + // the entity + TimelineEntity entity = null; + if (usingPrimaryFilter) { + entity = getEntity(entityId, entityType, queryFields); + iterator.next(); + } else { + entity = getEntity(entityId, entityType, startTime, queryFields, + iterator, key, kp.getOffset()); + } + // determine if the retrieved entity matches the provided secondary + // filters, and if so add it to the list of entities to return + boolean filterPassed = true; + if (secondaryFilters != null) { + for (NameValuePair filter : secondaryFilters) { + Object v = entity.getOtherInfo().get(filter.getName()); + if (v == null) { + Set vs = entity.getPrimaryFilters() + .get(filter.getName()); + if (vs == null || !vs.contains(filter.getValue())) { + filterPassed = false; + break; + } + } else if (!v.equals(filter.getValue())) { + filterPassed = false; + break; + } + } + } + if (filterPassed) { + if (entity.getDomainId() == null) { + entity.setDomainId(DEFAULT_DOMAIN_ID); + } + if (checkAcl == null || checkAcl.check(entity)) { + // Remove primary filter and other info if they are added for + // matching secondary filters + if (addPrimaryFilters) { + entity.setPrimaryFilters(null); + } + if (addOtherInfo) { + entity.setOtherInfo(null); + } + entities.addEntity(entity); + } + } + } + db = rollingdb.getPreviousDB(db); + } + return entities; + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + /** + * Put a single entity. If there is an error, add a TimelinePutError to the + * given response. + * + * @param entityUpdates + * a map containing all the scheduled writes for this put to the + * entity db + * @param indexUpdates + * a map containing all the scheduled writes for this put to the + * index db + */ + private long putEntities(TreeMap entityUpdates, + TreeMap indexUpdates, TimelineEntity entity, + TimelinePutResponse response) { + + long putCount = 0; + List relatedEntitiesWithoutStartTimes = + new ArrayList(); + byte[] revStartTime = null; + Map> primaryFilters = null; + try { + List events = entity.getEvents(); + // look up the start time for the entity + Long startTime = getAndSetStartTime(entity.getEntityId(), + entity.getEntityType(), entity.getStartTime(), events); + if (startTime == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.NO_START_TIME); + response.addError(error); + return putCount; + } + + // Must have a domain + if (StringUtils.isEmpty(entity.getDomainId())) { + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.NO_DOMAIN); + response.addError(error); + return putCount; + } + + revStartTime = writeReverseOrderedLong(startTime); + long roundedStartTime = entitydb.computeCurrentCheckMillis(startTime); + RollingWriteBatch rollingWriteBatch = entityUpdates.get(roundedStartTime); + if (rollingWriteBatch == null) { + DB db = entitydb.getDBForStartTime(startTime); + if (db != null) { + WriteBatch writeBatch = db.createWriteBatch(); + rollingWriteBatch = new RollingWriteBatch(db, writeBatch); + entityUpdates.put(roundedStartTime, rollingWriteBatch); + } + } + if (rollingWriteBatch == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.EXPIRED_ENTITY); + response.addError(error); + return putCount; + } + WriteBatch writeBatch = rollingWriteBatch.getWriteBatch(); + + // Save off the getBytes conversion to avoid unnecessary cost + byte[] entityIdBytes = entity.getEntityId().getBytes(UTF_8); + byte[] entityTypeBytes = entity.getEntityType().getBytes(UTF_8); + byte[] domainIdBytes = entity.getDomainId().getBytes(UTF_8); + + // write entity marker + byte[] markerKey = KeyBuilder.newInstance(3).add(entityTypeBytes, true) + .add(revStartTime).add(entityIdBytes, true).getBytesForLookup(); + writeBatch.put(markerKey, EMPTY_BYTES); + ++putCount; + + // write domain id entry + byte[] domainkey = KeyBuilder.newInstance(4).add(entityTypeBytes, true) + .add(revStartTime).add(entityIdBytes, true).add(DOMAIN_ID_COLUMN) + .getBytes(); + writeBatch.put(domainkey, domainIdBytes); + ++putCount; + + // write event entries + if (events != null) { + for (TimelineEvent event : events) { + byte[] revts = writeReverseOrderedLong(event.getTimestamp()); + byte[] key = KeyBuilder.newInstance().add(entityTypeBytes, true) + .add(revStartTime).add(entityIdBytes, true).add(EVENTS_COLUMN) + .add(revts).add(event.getEventType().getBytes(UTF_8)).getBytes(); + byte[] value = fstConf.asByteArray(event.getEventInfo()); + writeBatch.put(key, value); + ++putCount; + } + } + + // write primary filter entries + primaryFilters = entity.getPrimaryFilters(); + if (primaryFilters != null) { + for (Entry> primaryFilter : primaryFilters + .entrySet()) { + for (Object primaryFilterValue : primaryFilter.getValue()) { + byte[] key = KeyBuilder.newInstance(6).add(entityTypeBytes, true) + .add(revStartTime).add(entityIdBytes, true) + .add(PRIMARY_FILTERS_COLUMN).add(primaryFilter.getKey()) + .add(fstConf.asByteArray(primaryFilterValue)).getBytes(); + writeBatch.put(key, EMPTY_BYTES); + ++putCount; + } + } + } + + // write other info entries + Map otherInfo = entity.getOtherInfo(); + if (otherInfo != null) { + for (Entry info : otherInfo.entrySet()) { + byte[] key = KeyBuilder.newInstance(5).add(entityTypeBytes, true) + .add(revStartTime).add(entityIdBytes, true) + .add(OTHER_INFO_COLUMN).add(info.getKey()).getBytes(); + byte[] value = fstConf.asByteArray(info.getValue()); + writeBatch.put(key, value); + ++putCount; + } + } + + // write related entity entries + Map> relatedEntities = entity.getRelatedEntities(); + if (relatedEntities != null) { + for (Entry> relatedEntityList : relatedEntities + .entrySet()) { + String relatedEntityType = relatedEntityList.getKey(); + for (String relatedEntityId : relatedEntityList.getValue()) { + // look up start time of related entity + Long relatedStartTimeLong = getStartTimeLong(relatedEntityId, + relatedEntityType); + // delay writing the related entity if no start time is found + if (relatedStartTimeLong == null) { + relatedEntitiesWithoutStartTimes.add(new EntityIdentifier( + relatedEntityId, relatedEntityType)); + continue; + } + + byte[] relatedEntityStartTime = + writeReverseOrderedLong(relatedStartTimeLong); + long relatedRoundedStartTime = entitydb + .computeCurrentCheckMillis(relatedStartTimeLong); + RollingWriteBatch relatedRollingWriteBatch = entityUpdates + .get(relatedRoundedStartTime); + if (relatedRollingWriteBatch == null) { + DB db = entitydb.getDBForStartTime(relatedStartTimeLong); + if (db != null) { + WriteBatch relatedWriteBatch = db.createWriteBatch(); + relatedRollingWriteBatch = new RollingWriteBatch(db, + relatedWriteBatch); + entityUpdates.put(relatedRoundedStartTime, + relatedRollingWriteBatch); + } + } + if (relatedRollingWriteBatch == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.EXPIRED_ENTITY); + response.addError(error); + continue; + } + // This is the existing entity + byte[] relatedDomainIdBytes = relatedRollingWriteBatch.getDB().get( + createDomainIdKey(relatedEntityId, relatedEntityType, + relatedEntityStartTime)); + // The timeline data created by the server before 2.6 won't have + // the domain field. We assume this timeline data is in the + // default timeline domain. + String domainId = null; + if (relatedDomainIdBytes == null) { + domainId = TimelineDataManager.DEFAULT_DOMAIN_ID; + } else { + domainId = new String(relatedDomainIdBytes, UTF_8); + } + if (!domainId.equals(entity.getDomainId())) { + // in this case the entity will be put, but the relation will be + // ignored + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION); + response.addError(error); + continue; + } + // write "forward" entry (related entity -> entity) + byte[] key = createRelatedEntityKey(relatedEntityId, + relatedEntityType, relatedEntityStartTime, + entity.getEntityId(), entity.getEntityType()); + WriteBatch relatedWriteBatch = relatedRollingWriteBatch + .getWriteBatch(); + relatedWriteBatch.put(key, EMPTY_BYTES); + ++putCount; + } + } + } + + // write index entities + RollingWriteBatch indexRollingWriteBatch = indexUpdates + .get(roundedStartTime); + if (indexRollingWriteBatch == null) { + DB db = indexdb.getDBForStartTime(startTime); + if (db != null) { + WriteBatch indexWriteBatch = db.createWriteBatch(); + indexRollingWriteBatch = new RollingWriteBatch(db, indexWriteBatch); + indexUpdates.put(roundedStartTime, indexRollingWriteBatch); + } + } + if (indexRollingWriteBatch == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.EXPIRED_ENTITY); + response.addError(error); + return putCount; + } + WriteBatch indexWriteBatch = indexRollingWriteBatch.getWriteBatch(); + putCount += writePrimaryFilterEntries(indexWriteBatch, primaryFilters, + markerKey, EMPTY_BYTES); + } catch (IOException e) { + LOG.error("Error putting entity " + entity.getEntityId() + " of type " + + entity.getEntityType(), e); + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.IO_EXCEPTION); + response.addError(error); + } + + for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) { + try { + Long relatedEntityStartAndInsertTime = getAndSetStartTime( + relatedEntity.getId(), relatedEntity.getType(), + readReverseOrderedLong(revStartTime, 0), null); + if (relatedEntityStartAndInsertTime == null) { + throw new IOException("Error setting start time for related entity"); + } + long relatedStartTimeLong = relatedEntityStartAndInsertTime; + long relatedRoundedStartTime = entitydb + .computeCurrentCheckMillis(relatedStartTimeLong); + RollingWriteBatch relatedRollingWriteBatch = entityUpdates + .get(relatedRoundedStartTime); + if (relatedRollingWriteBatch == null) { + DB db = entitydb.getDBForStartTime(relatedStartTimeLong); + if (db != null) { + WriteBatch relatedWriteBatch = db.createWriteBatch(); + relatedRollingWriteBatch = new RollingWriteBatch(db, + relatedWriteBatch); + entityUpdates + .put(relatedRoundedStartTime, relatedRollingWriteBatch); + } + } + if (relatedRollingWriteBatch == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.EXPIRED_ENTITY); + response.addError(error); + continue; + } + WriteBatch relatedWriteBatch = relatedRollingWriteBatch.getWriteBatch(); + byte[] relatedEntityStartTime = + writeReverseOrderedLong(relatedEntityStartAndInsertTime); + // This is the new entity, the domain should be the same + byte[] key = createDomainIdKey(relatedEntity.getId(), + relatedEntity.getType(), relatedEntityStartTime); + relatedWriteBatch.put(key, entity.getDomainId().getBytes(UTF_8)); + ++putCount; + relatedWriteBatch.put( + createRelatedEntityKey(relatedEntity.getId(), + relatedEntity.getType(), relatedEntityStartTime, + entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES); + ++putCount; + relatedWriteBatch.put( + createEntityMarkerKey(relatedEntity.getId(), + relatedEntity.getType(), relatedEntityStartTime), EMPTY_BYTES); + ++putCount; + } catch (IOException e) { + LOG.error( + "Error putting related entity " + relatedEntity.getId() + + " of type " + relatedEntity.getType() + " for entity " + + entity.getEntityId() + " of type " + entity.getEntityType(), + e); + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.IO_EXCEPTION); + response.addError(error); + } + } + + return putCount; + } + + /** + * For a given key / value pair that has been written to the db, write + * additional entries to the db for each primary filter. + */ + private static long writePrimaryFilterEntries(WriteBatch writeBatch, + Map> primaryFilters, byte[] key, byte[] value) + throws IOException { + long putCount = 0; + if (primaryFilters != null) { + for (Entry> pf : primaryFilters.entrySet()) { + for (Object pfval : pf.getValue()) { + writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval, key), value); + ++putCount; + } + } + } + return putCount; + } + + @Override + public TimelinePutResponse put(TimelineEntities entities) { + if (LOG.isDebugEnabled()) { + LOG.debug("Starting put"); + } + TimelinePutResponse response = new TimelinePutResponse(); + TreeMap entityUpdates = + new TreeMap(); + TreeMap indexUpdates = + new TreeMap(); + + long entityCount = 0; + long indexCount = 0; + + try { + + for (TimelineEntity entity : entities.getEntities()) { + entityCount += putEntities(entityUpdates, indexUpdates, entity, + response); + } + + for (RollingWriteBatch entityUpdate : entityUpdates.values()) { + entityUpdate.write(); + } + + for (RollingWriteBatch indexUpdate : indexUpdates.values()) { + indexUpdate.write(); + } + + } finally { + + for (RollingWriteBatch entityRollingWriteBatch : entityUpdates.values()) { + entityRollingWriteBatch.close(); + } + for (RollingWriteBatch indexRollingWriteBatch : indexUpdates.values()) { + indexRollingWriteBatch.close(); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Put " + entityCount + " new leveldb entity entries and " + + indexCount + " new leveldb index entries from " + + entities.getEntities().size() + " timeline entities"); + } + return response; + } + + /** + * Get the unique start time for a given entity as a byte array that sorts the + * timestamps in reverse order (see + * {@link GenericObjectMapper#writeReverseOrderedLong(long)}). + * + * @param entityId + * The id of the entity + * @param entityType + * The type of the entity + * @return A byte array, null if not found + * @throws IOException + */ + private byte[] getStartTime(String entityId, String entityType) + throws IOException { + Long l = getStartTimeLong(entityId, entityType); + return l == null ? null : writeReverseOrderedLong(l); + } + + /** + * Get the unique start time for a given entity as a Long. + * + * @param entityId + * The id of the entity + * @param entityType + * The type of the entity + * @return A Long, null if not found + * @throws IOException + */ + private Long getStartTimeLong(String entityId, String entityType) + throws IOException { + EntityIdentifier entity = new EntityIdentifier(entityId, entityType); + // start time is not provided, so try to look it up + if (startTimeReadCache.containsKey(entity)) { + // found the start time in the cache + return startTimeReadCache.get(entity); + } else { + // try to look up the start time in the db + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + byte[] v = starttimedb.get(b); + if (v == null) { + // did not find the start time in the db + return null; + } else { + // found the start time in the db + Long l = readReverseOrderedLong(v, 0); + startTimeReadCache.put(entity, l); + return l; + } + } + } + + /** + * Get the unique start time for a given entity as a byte array that sorts the + * timestamps in reverse order (see + * {@link GenericObjectMapper#writeReverseOrderedLong(long)}). If the start + * time doesn't exist, set it based on the information provided. Should only + * be called when a lock has been obtained on the entity. + * + * @param entityId + * The id of the entity + * @param entityType + * The type of the entity + * @param startTime + * The start time of the entity, or null + * @param events + * A list of events for the entity, or null + * @return A StartAndInsertTime + * @throws IOException + */ + private Long getAndSetStartTime(String entityId, String entityType, + Long startTime, List events) throws IOException { + EntityIdentifier entity = new EntityIdentifier(entityId, entityType); + Long time = startTimeWriteCache.get(entity); + if (time != null) { + // return the value in the cache + return time; + } + if (startTime == null && events != null) { + // calculate best guess start time based on lowest event time + startTime = Long.MAX_VALUE; + for (TimelineEvent e : events) { + if (e.getTimestamp() < startTime) { + startTime = e.getTimestamp(); + } + } + } + // check the provided start time matches the db + return checkStartTimeInDb(entity, startTime); + } + + /** + * Checks db for start time and returns it if it exists. If it doesn't exist, + * writes the suggested start time (if it is not null). This is only called + * when the start time is not found in the cache, so it adds it back into the + * cache if it is found. Should only be called when a lock has been obtained + * on the entity. + */ + private Long checkStartTimeInDb(EntityIdentifier entity, + Long suggestedStartTime) throws IOException { + Long startAndInsertTime = null; + // create lookup key for start time + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + // retrieve value for key + byte[] v = starttimedb.get(b); + if (v == null) { + // start time doesn't exist in db + if (suggestedStartTime == null) { + return null; + } + startAndInsertTime = suggestedStartTime; + + // write suggested start time + starttimedb.put(b, writeReverseOrderedLong(suggestedStartTime)); + } else { + // found start time in db, so ignore suggested start time + startAndInsertTime = readReverseOrderedLong(v, 0); + } + startTimeWriteCache.put(entity, startAndInsertTime); + startTimeReadCache.put(entity, startAndInsertTime); + return startAndInsertTime; + } + + /** + * Creates a key for looking up the start time of a given entity, of the form + * START_TIME_LOOKUP_PREFIX + entity type + entity id. + */ + private static byte[] createStartTimeLookupKey(String entityId, + String entityType) throws IOException { + return KeyBuilder.newInstance().add(entityType).add(entityId).getBytes(); + } + + /** + * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type + + * revstarttime + entity id. + */ + private static byte[] createEntityMarkerKey(String entityId, + String entityType, byte[] revStartTime) throws IOException { + return KeyBuilder.newInstance().add(entityType).add(revStartTime) + .add(entityId).getBytesForLookup(); + } + + /** + * Creates an index entry for the given key of the form INDEXED_ENTRY_PREFIX + + * primaryfiltername + primaryfiltervalue + key. + */ + private static byte[] addPrimaryFilterToKey(String primaryFilterName, + Object primaryFilterValue, byte[] key) throws IOException { + return KeyBuilder.newInstance().add(primaryFilterName) + .add(fstConf.asByteArray(primaryFilterValue), true).add(key).getBytes(); + } + + /** + * Creates an event object from the given key, offset, and value. If the event + * type is not contained in the specified set of event types, returns null. + */ + private static TimelineEvent getEntityEvent(Set eventTypes, + byte[] key, int offset, byte[] value) throws IOException { + KeyParser kp = new KeyParser(key, offset); + long ts = kp.getNextLong(); + String tstype = kp.getNextString(); + if (eventTypes == null || eventTypes.contains(tstype)) { + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(ts); + event.setEventType(tstype); + Object o = fstConf.asObject(value); + if (o == null) { + event.setEventInfo(null); + } else if (o instanceof Map) { + @SuppressWarnings("unchecked") + Map m = (Map) o; + event.setEventInfo(m); + } else { + throw new IOException("Couldn't deserialize event info map"); + } + return event; + } + return null; + } + + /** + * Parses the primary filter from the given key at the given offset and adds + * it to the given entity. + */ + private static void addPrimaryFilter(TimelineEntity entity, byte[] key, + int offset) throws IOException { + KeyParser kp = new KeyParser(key, offset); + String name = kp.getNextString(); + byte[] bytes = kp.getRemainingBytes(); + Object value = fstConf.asObject(bytes); + entity.addPrimaryFilter(name, value); + } + + /** + * Creates a string representation of the byte array from the given offset to + * the end of the array (for parsing other info keys). + */ + private static String parseRemainingKey(byte[] b, int offset) { + return new String(b, offset, b.length - offset, UTF_8); + } + + /** + * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX + entity type + * + revstarttime + entity id + RELATED_ENTITIES_COLUMN + relatedentity type + + * relatedentity id. + */ + private static byte[] createRelatedEntityKey(String entityId, + String entityType, byte[] revStartTime, String relatedEntityId, + String relatedEntityType) throws IOException { + return KeyBuilder.newInstance().add(entityType).add(revStartTime) + .add(entityId).add(RELATED_ENTITIES_COLUMN).add(relatedEntityType) + .add(relatedEntityId).getBytes(); + } + + /** + * Parses the related entity from the given key at the given offset and adds + * it to the given entity. + */ + private static void addRelatedEntity(TimelineEntity entity, byte[] key, + int offset) throws IOException { + KeyParser kp = new KeyParser(key, offset); + String type = kp.getNextString(); + String id = kp.getNextString(); + entity.addRelatedEntity(type, id); + } + + /** + * Creates a domain id key, serializing ENTITY_ENTRY_PREFIX + entity type + + * revstarttime + entity id + DOMAIN_ID_COLUMN. + */ + private static byte[] createDomainIdKey(String entityId, String entityType, + byte[] revStartTime) throws IOException { + return KeyBuilder.newInstance().add(entityType).add(revStartTime) + .add(entityId).add(DOMAIN_ID_COLUMN).getBytes(); + } + + /** + * Clears the cache to test reloading start times from leveldb (only for + * testing). + */ + @VisibleForTesting + void clearStartTimeCache() { + startTimeWriteCache.clear(); + startTimeReadCache.clear(); + } + + @VisibleForTesting + static int getStartTimeReadCacheSize(Configuration conf) { + return conf + .getInt( + TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE); + } + + @VisibleForTesting + static int getStartTimeWriteCacheSize(Configuration conf) { + return conf + .getInt( + TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE); + } + + @VisibleForTesting + long evictOldStartTimes(long minStartTime) throws IOException { + LOG.info("Searching for start times to evict earlier than " + minStartTime); + + long batchSize = 0; + long totalCount = 0; + long startTimesCount = 0; + + WriteBatch writeBatch = null; + DBIterator iterator = null; + + try { + writeBatch = starttimedb.createWriteBatch(); + ReadOptions readOptions = new ReadOptions(); + readOptions.fillCache(false); + iterator = starttimedb.iterator(readOptions); + // seek to the first start time entry + iterator.seekToFirst(); + + // evaluate each start time entry to see if it needs to be evicted or not + while (iterator.hasNext()) { + Map.Entry current = iterator.next(); + byte[] entityKey = current.getKey(); + byte[] entityValue = current.getValue(); + long startTime = readReverseOrderedLong(entityValue, 0); + if (startTime < minStartTime) { + ++batchSize; + ++startTimesCount; + writeBatch.delete(entityKey); + + // a large delete will hold the lock for too long + if (batchSize >= writeBatchSize) { + if (LOG.isDebugEnabled()) { + LOG.debug("Preparing to delete a batch of " + batchSize + + " old start times"); + } + starttimedb.write(writeBatch); + if (LOG.isDebugEnabled()) { + LOG.debug("Deleted batch of " + batchSize + + ". Total start times deleted so far this cycle: " + + startTimesCount); + } + IOUtils.cleanup(LOG, writeBatch); + writeBatch = starttimedb.createWriteBatch(); + batchSize = 0; + } + } + ++totalCount; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Preparing to delete a batch of " + batchSize + + " old start times"); + } + starttimedb.write(writeBatch); + if (LOG.isDebugEnabled()) { + LOG.debug("Deleted batch of " + batchSize + + ". Total start times deleted so far this cycle: " + + startTimesCount); + } + LOG.info("Deleted " + startTimesCount + "/" + totalCount + + " start time entities earlier than " + minStartTime); + } finally { + IOUtils.cleanup(LOG, writeBatch); + IOUtils.cleanup(LOG, iterator); + } + return startTimesCount; + } + + /** + * Discards entities with start timestamp less than or equal to the given + * timestamp. + */ + @VisibleForTesting + void discardOldEntities(long timestamp) throws IOException, + InterruptedException { + long totalCount = 0; + long t1 = System.currentTimeMillis(); + try { + totalCount += evictOldStartTimes(timestamp); + indexdb.evictOldDBs(); + entitydb.evictOldDBs(); + } finally { + long t2 = System.currentTimeMillis(); + LOG.info("Discarded " + totalCount + " entities for timestamp " + + timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds"); + } + } + + Version loadVersion() throws IOException { + byte[] data = starttimedb.get(bytes(TIMELINE_STORE_VERSION_KEY)); + // if version is not stored previously, treat it as 1.0. + if (data == null || data.length == 0) { + return Version.newInstance(1, 0); + } + Version version = new VersionPBImpl(VersionProto.parseFrom(data)); + return version; + } + + // Only used for test + @VisibleForTesting + void storeVersion(Version state) throws IOException { + dbStoreVersion(state); + } + + private void dbStoreVersion(Version state) throws IOException { + String key = TIMELINE_STORE_VERSION_KEY; + byte[] data = ((VersionPBImpl) state).getProto().toByteArray(); + try { + starttimedb.put(bytes(key), data); + } catch (DBException e) { + throw new IOException(e); + } + } + + Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + /** + * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, + * 2.0 etc. 2) Any incompatible change of TS-store is a major upgrade, and any + * compatible change of TS-store is a minor upgrade. 3) Within a minor + * upgrade, say 1.1 to 1.2: overwrite the version info and proceed as normal. + * 4) Within a major upgrade, say 1.2 to 2.0: throw exception and indicate + * user to use a separate upgrade tool to upgrade timeline store or remove + * incompatible old state. + */ + private void checkVersion() throws IOException { + Version loadedVersion = loadVersion(); + LOG.info("Loaded timeline store version info " + loadedVersion); + if (loadedVersion.equals(getCurrentVersion())) { + return; + } + if (loadedVersion.isCompatibleTo(getCurrentVersion())) { + LOG.info("Storing timeline store version info " + getCurrentVersion()); + dbStoreVersion(CURRENT_VERSION_INFO); + } else { + String incompatibleMessage = "Incompatible version for timeline store: " + + "expecting version " + getCurrentVersion() + + ", but loading version " + loadedVersion; + LOG.fatal(incompatibleMessage); + throw new IOException(incompatibleMessage); + } + } + + // TODO: make data retention work with the domain data as well + @Override + public void put(TimelineDomain domain) throws IOException { + WriteBatch domainWriteBatch = null; + WriteBatch ownerWriteBatch = null; + try { + domainWriteBatch = domaindb.createWriteBatch(); + ownerWriteBatch = ownerdb.createWriteBatch(); + if (domain.getId() == null || domain.getId().length() == 0) { + throw new IllegalArgumentException("Domain doesn't have an ID"); + } + if (domain.getOwner() == null || domain.getOwner().length() == 0) { + throw new IllegalArgumentException("Domain doesn't have an owner."); + } + + // Write description + byte[] domainEntryKey = createDomainEntryKey(domain.getId(), + DESCRIPTION_COLUMN); + byte[] ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(), + domain.getId(), DESCRIPTION_COLUMN); + if (domain.getDescription() != null) { + domainWriteBatch.put(domainEntryKey, + domain.getDescription().getBytes(UTF_8)); + ownerWriteBatch.put(ownerLookupEntryKey, domain.getDescription() + .getBytes(UTF_8)); + } else { + domainWriteBatch.put(domainEntryKey, EMPTY_BYTES); + ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES); + } + + // Write owner + domainEntryKey = createDomainEntryKey(domain.getId(), OWNER_COLUMN); + ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(), + domain.getId(), OWNER_COLUMN); + // Null check for owner is done before + domainWriteBatch.put(domainEntryKey, domain.getOwner().getBytes(UTF_8)); + ownerWriteBatch.put(ownerLookupEntryKey, domain.getOwner() + .getBytes(UTF_8)); + + // Write readers + domainEntryKey = createDomainEntryKey(domain.getId(), READER_COLUMN); + ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(), + domain.getId(), READER_COLUMN); + if (domain.getReaders() != null && domain.getReaders().length() > 0) { + domainWriteBatch.put(domainEntryKey, domain.getReaders() + .getBytes(UTF_8)); + ownerWriteBatch.put(ownerLookupEntryKey, + domain.getReaders().getBytes(UTF_8)); + } else { + domainWriteBatch.put(domainEntryKey, EMPTY_BYTES); + ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES); + } + + // Write writers + domainEntryKey = createDomainEntryKey(domain.getId(), WRITER_COLUMN); + ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(), + domain.getId(), WRITER_COLUMN); + if (domain.getWriters() != null && domain.getWriters().length() > 0) { + domainWriteBatch.put(domainEntryKey, domain.getWriters() + .getBytes(UTF_8)); + ownerWriteBatch.put(ownerLookupEntryKey, + domain.getWriters().getBytes(UTF_8)); + } else { + domainWriteBatch.put(domainEntryKey, EMPTY_BYTES); + ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES); + } + + // Write creation time and modification time + // We put both timestamps together because they are always retrieved + // together, and store them in the same way as we did for the entity's + // start time and insert time. + domainEntryKey = createDomainEntryKey(domain.getId(), TIMESTAMP_COLUMN); + ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(), + domain.getId(), TIMESTAMP_COLUMN); + long currentTimestamp = System.currentTimeMillis(); + byte[] timestamps = domaindb.get(domainEntryKey); + if (timestamps == null) { + timestamps = new byte[16]; + writeReverseOrderedLong(currentTimestamp, timestamps, 0); + writeReverseOrderedLong(currentTimestamp, timestamps, 8); + } else { + writeReverseOrderedLong(currentTimestamp, timestamps, 8); + } + domainWriteBatch.put(domainEntryKey, timestamps); + ownerWriteBatch.put(ownerLookupEntryKey, timestamps); + domaindb.write(domainWriteBatch); + ownerdb.write(ownerWriteBatch); + } finally { + IOUtils.cleanup(LOG, domainWriteBatch); + IOUtils.cleanup(LOG, ownerWriteBatch); + } + } + + /** + * Creates a domain entity key with column name suffix, of the form + * DOMAIN_ENTRY_PREFIX + domain id + column name. + */ + private static byte[] createDomainEntryKey(String domainId, byte[] columnName) + throws IOException { + return KeyBuilder.newInstance().add(domainId).add(columnName).getBytes(); + } + + /** + * Creates an owner lookup key with column name suffix, of the form + * OWNER_LOOKUP_PREFIX + owner + domain id + column name. + */ + private static byte[] createOwnerLookupKey(String owner, String domainId, + byte[] columnName) throws IOException { + return KeyBuilder.newInstance().add(owner).add(domainId).add(columnName) + .getBytes(); + } + + @Override + public TimelineDomain getDomain(String domainId) throws IOException { + DBIterator iterator = null; + try { + byte[] prefix = KeyBuilder.newInstance().add(domainId) + .getBytesForLookup(); + iterator = domaindb.iterator(); + iterator.seek(prefix); + return getTimelineDomain(iterator, domainId, prefix); + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + @Override + public TimelineDomains getDomains(String owner) throws IOException { + DBIterator iterator = null; + try { + byte[] prefix = KeyBuilder.newInstance().add(owner).getBytesForLookup(); + List domains = new ArrayList(); + for (iterator = ownerdb.iterator(), iterator.seek(prefix); iterator + .hasNext();) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key)) { + break; + } + // Iterator to parse the rows of an individual domain + KeyParser kp = new KeyParser(key, prefix.length); + String domainId = kp.getNextString(); + byte[] prefixExt = KeyBuilder.newInstance().add(owner).add(domainId) + .getBytesForLookup(); + TimelineDomain domainToReturn = getTimelineDomain(iterator, domainId, + prefixExt); + if (domainToReturn != null) { + domains.add(domainToReturn); + } + } + // Sort the domains to return + Collections.sort(domains, new Comparator() { + @Override + public int compare(TimelineDomain domain1, TimelineDomain domain2) { + int result = domain2.getCreatedTime().compareTo( + domain1.getCreatedTime()); + if (result == 0) { + return domain2.getModifiedTime().compareTo( + domain1.getModifiedTime()); + } else { + return result; + } + } + }); + TimelineDomains domainsToReturn = new TimelineDomains(); + domainsToReturn.addDomains(domains); + return domainsToReturn; + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + private static TimelineDomain getTimelineDomain(DBIterator iterator, + String domainId, byte[] prefix) throws IOException { + // Iterate over all the rows whose key starts with prefix to retrieve the + // domain information. + TimelineDomain domain = new TimelineDomain(); + domain.setId(domainId); + boolean noRows = true; + for (; iterator.hasNext(); iterator.next()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key)) { + break; + } + if (noRows) { + noRows = false; + } + byte[] value = iterator.peekNext().getValue(); + if (value != null && value.length > 0) { + if (key[prefix.length] == DESCRIPTION_COLUMN[0]) { + domain.setDescription(new String(value, UTF_8)); + } else if (key[prefix.length] == OWNER_COLUMN[0]) { + domain.setOwner(new String(value, UTF_8)); + } else if (key[prefix.length] == READER_COLUMN[0]) { + domain.setReaders(new String(value, UTF_8)); + } else if (key[prefix.length] == WRITER_COLUMN[0]) { + domain.setWriters(new String(value, UTF_8)); + } else if (key[prefix.length] == TIMESTAMP_COLUMN[0]) { + domain.setCreatedTime(readReverseOrderedLong(value, 0)); + domain.setModifiedTime(readReverseOrderedLong(value, 8)); + } else { + LOG.error("Unrecognized domain column: " + key[prefix.length]); + } + } + } + if (noRows) { + return null; + } else { + return domain; + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java index 8c6b83a..86aae77 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java @@ -18,8 +18,6 @@ package org.apache.hadoop.yarn.server.timeline; -import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -43,7 +41,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; -import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; @@ -51,7 +48,7 @@ * The class wrap over the timeline store and the ACLs manager. It does some non * trivial manipulation of the timeline data before putting or after getting it * from the timeline store, and checks the user's access to it. - * + * */ public class TimelineDataManager extends AbstractService { @@ -119,7 +116,7 @@ public boolean check(TimelineEntity entity) throws IOException { * Get the timeline entities that the given user have access to. The meaning * of each argument has been documented with * {@link TimelineReader#getEntities}. - * + * * @see TimelineReader#getEntities */ public TimelineEntities getEntities( @@ -156,7 +153,7 @@ public TimelineEntities getEntities( * Get the single timeline entity that the given user has access to. The * meaning of each argument has been documented with * {@link TimelineReader#getEntity}. - * + * * @see TimelineReader#getEntity */ public TimelineEntity getEntity( @@ -182,7 +179,7 @@ public TimelineEntity getEntity( * Get the events whose entities the given user has access to. The meaning of * each argument has been documented with * {@link TimelineReader#getEntityTimelines}. - * + * * @see TimelineReader#getEntityTimelines */ public TimelineEvents getEvents( @@ -218,7 +215,7 @@ public TimelineEvents getEvents( eventsItr.remove(); } } catch (Exception e) { - LOG.error("Error when verifying access for user " + callerUGI + LOG.warn("Error when verifying access for user " + callerUGI + " on the events of the timeline entity " + new EntityIdentifier(eventsOfOneEntity.getEntityId(), eventsOfOneEntity.getEntityType()), e); @@ -242,13 +239,10 @@ public TimelinePutResponse postEntities( if (entities == null) { return new TimelinePutResponse(); } - List entityIDs = new ArrayList(); TimelineEntities entitiesToPut = new TimelineEntities(); List errors = new ArrayList(); for (TimelineEntity entity : entities.getEntities()) { - EntityIdentifier entityID = - new EntityIdentifier(entity.getEntityId(), entity.getEntityType()); // if the domain id is not specified, the entity will be put into // the default domain @@ -261,44 +255,42 @@ public TimelinePutResponse postEntities( TimelineEntity existingEntity = null; try { existingEntity = - store.getEntity(entityID.getId(), entityID.getType(), + store.getEntity(entity.getEntityId(), entity.getEntityType(), EnumSet.of(Field.PRIMARY_FILTERS)); if (existingEntity != null) { addDefaultDomainIdIfAbsent(existingEntity); if (!existingEntity.getDomainId().equals(entity.getDomainId())) { throw new YarnException("The domain of the timeline entity " - + entityID + " is not allowed to be changed."); + + "{ id: " + entity.getEntityId() + ", type: " + + entity.getEntityType() + " } is not allowed to be changed from " + + existingEntity.getDomainId() + " to " + entity.getDomainId()); } } if (!timelineACLsManager.checkAccess( callerUGI, ApplicationAccessType.MODIFY_APP, entity)) { throw new YarnException(callerUGI - + " is not allowed to put the timeline entity " + entityID - + " into the domain " + entity.getDomainId() + "."); + + " is not allowed to put the timeline entity " + + "{ id: " + entity.getEntityId() + ", type: " + + entity.getEntityType() + " } into the domain " + + entity.getDomainId() + "."); } } catch (Exception e) { // Skip the entity which already exists and was put by others - LOG.error("Skip the timeline entity: " + entityID, e); + LOG.warn("Skip the timeline entity: { id: " + entity.getEntityId() + + ", type: "+ entity.getEntityType() + " }", e); TimelinePutResponse.TimelinePutError error = new TimelinePutResponse.TimelinePutError(); - error.setEntityId(entityID.getId()); - error.setEntityType(entityID.getType()); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); error.setErrorCode( TimelinePutResponse.TimelinePutError.ACCESS_DENIED); errors.add(error); continue; } - entityIDs.add(entityID); entitiesToPut.addEntity(entity); - if (LOG.isDebugEnabled()) { - LOG.debug("Storing the entity " + entityID + ", JSON-style content: " - + TimelineUtils.dumpTimelineRecordtoJSON(entity)); - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs)); } + TimelinePutResponse response = store.put(entitiesToPut); // add the errors of timeline system filter key conflict response.addErrors(errors); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java index fc6cc7d..5638581 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java @@ -21,15 +21,16 @@ import org.apache.hadoop.io.WritableComparator; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.nio.charset.Charset; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; public class LeveldbUtils { + /** A string builder utility for building timeline server leveldb keys. */ public static class KeyBuilder { + /** Maximum subkeys that can be added to construct a key. */ private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10; private byte[][] b; private boolean[] useSeparator; @@ -47,8 +48,15 @@ public static KeyBuilder newInstance() { return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS); } + /** Instantiate a new key build with the given maximum subkes. + * @param size maximum subkeys that can be added to this key builder + * @return a newly constructed key builder */ + public static KeyBuilder newInstance(final int size) { + return new KeyBuilder(size); + } + public KeyBuilder add(String s) { - return add(s.getBytes(Charset.forName("UTF-8")), true); + return add(s.getBytes(UTF_8), true); } public KeyBuilder add(byte[] t) { @@ -66,26 +74,37 @@ public KeyBuilder add(byte[] t, boolean sep) { return this; } - public byte[] getBytes() throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(length); + /** Builds a byte array without the final string delimiter. */ + public byte[] getBytes() { + // check the last valid entry to see the final length + int bytesLength = length; + if (useSeparator[index - 1]) { + bytesLength = length - 1; + } + byte[] bytes = new byte[bytesLength]; + int curPos = 0; for (int i = 0; i < index; i++) { - baos.write(b[i]); + System.arraycopy(b[i], 0, bytes, curPos, b[i].length); + curPos += b[i].length; if (i < index - 1 && useSeparator[i]) { - baos.write(0x0); + bytes[curPos++] = 0x0; } } - return baos.toByteArray(); + return bytes; } - public byte[] getBytesForLookup() throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(length); + /** Builds a byte array including the final string delimiter. */ + public byte[] getBytesForLookup() { + byte[] bytes = new byte[length]; + int curPos = 0; for (int i = 0; i < index; i++) { - baos.write(b[i]); + System.arraycopy(b[i], 0, bytes, curPos, b[i].length); + curPos += b[i].length; if (useSeparator[i]) { - baos.write(0x0); + bytes[curPos++] = 0x0; } } - return baos.toByteArray(); + return bytes; } } @@ -93,11 +112,12 @@ public KeyBuilder add(byte[] t, boolean sep) { private final byte[] b; private int offset; - public KeyParser(byte[] b, int offset) { + public KeyParser(final byte[] b, final int offset) { this.b = b; this.offset = offset; } + /** Returns a string from the offset until the next string delimiter. */ public String getNextString() throws IOException { if (offset >= b.length) { throw new IOException( @@ -107,23 +127,42 @@ public String getNextString() throws IOException { while (offset + i < b.length && b[offset + i] != 0x0) { i++; } - String s = new String(b, offset, i, Charset.forName("UTF-8")); + String s = new String(b, offset, i, UTF_8); offset = offset + i + 1; return s; } + /** Moves current position until after the next end of string marker. */ + public void skipNextString() throws IOException { + if (offset >= b.length) { + throw new IOException("tried to read nonexistent string from byte array"); + } + while (offset < b.length && b[offset] != 0x0) { + ++offset; + } + ++offset; + } + + /** Read the next 8 bytes in the byte buffer as a long. */ public long getNextLong() throws IOException { if (offset + 8 >= b.length) { throw new IOException("byte array ran out when trying to read long"); } - long l = readReverseOrderedLong(b, offset); + long value = readReverseOrderedLong(b, offset); offset += 8; - return l; + return value; } public int getOffset() { return offset; } + + /** Returns a copy of the remaining bytes. */ + public byte[] getRemainingBytes() { + byte[] bytes = new byte[b.length - offset]; + System.arraycopy(b, offset, bytes, 0, b.length - offset); + return bytes; + } } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java index c5c0f93..121e9f3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java @@ -43,8 +43,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.records.Version; -import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore; -import org.apache.hadoop.yarn.server.timeline.NameValuePair; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.iq80.leveldb.DBException; import org.junit.After; @@ -155,7 +153,7 @@ private boolean deleteNextEntity(String entityType, byte[] ts) return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts, iterator, pfIterator, false); } catch(DBException e) { - throw new IOException(e); + throw new IOException(e); } finally { IOUtils.cleanup(null, iterator, pfIterator); } @@ -179,12 +177,12 @@ public void testDeleteEntities() throws IOException, InterruptedException { assertEquals(1, getEntities("type_2").size()); assertEquals(false, deleteNextEntity(entityType1, - writeReverseOrderedLong(60l))); + writeReverseOrderedLong(60L))); assertEquals(3, getEntities("type_1").size()); assertEquals(1, getEntities("type_2").size()); assertEquals(true, deleteNextEntity(entityType1, - writeReverseOrderedLong(123l))); + writeReverseOrderedLong(123L))); List entities = getEntities("type_2"); assertEquals(1, entities.size()); verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap( @@ -198,12 +196,12 @@ public void testDeleteEntities() throws IOException, InterruptedException { verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(1), domainId2); - ((LeveldbTimelineStore)store).discardOldEntities(-123l); + ((LeveldbTimelineStore)store).discardOldEntities(0L); assertEquals(2, getEntities("type_1").size()); assertEquals(0, getEntities("type_2").size()); assertEquals(6, ((LeveldbTimelineStore)store).getEntityTypes().size()); - ((LeveldbTimelineStore)store).discardOldEntities(123l); + ((LeveldbTimelineStore)store).discardOldEntities(123L); assertEquals(0, getEntities("type_1").size()); assertEquals(0, getEntities("type_2").size()); assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size()); @@ -240,11 +238,11 @@ public void testDeleteEntitiesPrimaryFilters() verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); - ((LeveldbTimelineStore)store).discardOldEntities(-123l); + ((LeveldbTimelineStore)store).discardOldEntities(-123L); assertEquals(1, getEntitiesWithPrimaryFilter("type_1", pfPair).size()); assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); - ((LeveldbTimelineStore)store).discardOldEntities(123l); + ((LeveldbTimelineStore)store).discardOldEntities(123L); assertEquals(0, getEntities("type_1").size()); assertEquals(0, getEntities("type_2").size()); assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size()); @@ -261,7 +259,7 @@ public void testFromTsWithDeletion() assertEquals(1, getEntitiesFromTs("type_2", l).size()); assertEquals(3, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, l).size()); - ((LeveldbTimelineStore)store).discardOldEntities(123l); + ((LeveldbTimelineStore)store).discardOldEntities(123L); assertEquals(0, getEntitiesFromTs("type_1", l).size()); assertEquals(0, getEntitiesFromTs("type_2", l).size()); assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, @@ -279,7 +277,7 @@ public void testFromTsWithDeletion() assertEquals(1, getEntities("type_2").size()); assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); } - + @Test public void testCheckVersion() throws IOException { LeveldbTimelineStore dbStore = (LeveldbTimelineStore) store; @@ -299,16 +297,15 @@ public void testCheckVersion() throws IOException { Assert.assertEquals(defaultVersion, dbStore.loadVersion()); // incompatible version - Version incompatibleVersion = - Version.newInstance(defaultVersion.getMajorVersion() + 1, - defaultVersion.getMinorVersion()); + Version incompatibleVersion = Version.newInstance( + defaultVersion.getMajorVersion() + 1, defaultVersion.getMinorVersion()); dbStore.storeVersion(incompatibleVersion); try { restartTimelineStore(); Assert.fail("Incompatible version, should expect fail here."); } catch (ServiceStateException e) { - Assert.assertTrue("Exception message mismatch", - e.getMessage().contains("Incompatible version for timeline store")); + Assert.assertTrue("Exception message mismatch", + e.getMessage().contains("Incompatible version for timeline store")); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java index e69de29..d2d0860 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import java.io.File; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.iq80.leveldb.DB; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** Test class for verification of RollingLevelDB. */ +public class TestRollingLevelDB { + private Configuration conf = new YarnConfiguration(); + private FileSystem lfs; + private MyRollingLevelDB rollingLevelDB; + + /** RollingLevelDB for testing that has a setting current time. */ + public static class MyRollingLevelDB extends RollingLevelDB { + private long currentTimeMillis; + + MyRollingLevelDB() { + super("Test"); + this.currentTimeMillis = System.currentTimeMillis(); + } + + @Override + protected long currentTimeMillis() { + return currentTimeMillis; + } + + public void setCurrentTimeMillis(long time) { + this.currentTimeMillis = time; + } + }; + + @Before + public void setup() throws Exception { + lfs = FileSystem.getLocal(conf); + File fsPath = new File("target", this.getClass().getSimpleName() + + "-tmpDir").getAbsoluteFile(); + conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, + fsPath.getAbsolutePath()); + lfs.delete(new Path(fsPath.getAbsolutePath()), true); + rollingLevelDB = new MyRollingLevelDB(); + } + + @Test + public void testInsertAfterRollPeriodRollsDB() throws Exception { + + rollingLevelDB.init(conf); + long now = rollingLevelDB.currentTimeMillis(); + DB db = rollingLevelDB.getDBForStartTime(now); + long startTime = rollingLevelDB.getStartTimeFor(db); + Assert.assertEquals("Received level db for incorrect start time", + rollingLevelDB.computeCurrentCheckMillis(now), + startTime); + now = rollingLevelDB.getNextRollingTimeMillis(); + rollingLevelDB.setCurrentTimeMillis(now); + db = rollingLevelDB.getDBForStartTime(now); + startTime = rollingLevelDB.getStartTimeFor(db); + Assert.assertEquals("Received level db for incorrect start time", + rollingLevelDB.computeCurrentCheckMillis(now), + startTime); + } + + @Test + public void testInsertForPreviousPeriodAfterRollPeriodRollsDB() + throws Exception { + + rollingLevelDB.init(conf); + long now = rollingLevelDB.currentTimeMillis(); + now = rollingLevelDB.computeCurrentCheckMillis(now); + rollingLevelDB.setCurrentTimeMillis(now); + DB db = rollingLevelDB.getDBForStartTime(now - 1); + long startTime = rollingLevelDB.getStartTimeFor(db); + Assert.assertEquals("Received level db for incorrect start time", + rollingLevelDB.computeCurrentCheckMillis(now - 1), + startTime); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java index e69de29..956e9e9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java @@ -0,0 +1,427 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.records.Version; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mortbay.log.Log; + +/** Test class to verify RollingLevelDBTimelineStore. */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TestRollingLevelDBTimelineStore extends TimelineStoreTestUtils { + private FileContext fsContext; + private File fsPath; + private Configuration config = new YarnConfiguration(); + + @Before + public void setup() throws Exception { + fsContext = FileContext.getLocalFSFileContext(); + fsPath = new File("target", this.getClass().getSimpleName() + + "-tmpDir").getAbsoluteFile(); + fsContext.delete(new Path(fsPath.getAbsolutePath()), true); + config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, + fsPath.getAbsolutePath()); + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); + store = new RollingLevelDBTimelineStore(); + store.init(config); + store.start(); + loadTestEntityData(); + loadVerificationEntityData(); + loadTestDomainData(); + } + + @After + public void tearDown() throws Exception { + store.stop(); + fsContext.delete(new Path(fsPath.getAbsolutePath()), true); + } + + @Test + public void testRootDirPermission() throws IOException { + FileSystem fs = FileSystem.getLocal(new YarnConfiguration()); + FileStatus file = fs.getFileStatus(new Path(fsPath.getAbsolutePath(), + RollingLevelDBTimelineStore.FILENAME)); + assertNotNull(file); + assertEquals(RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK, + file.getPermission()); + } + + @Test + public void testGetSingleEntity() throws IOException { + super.testGetSingleEntity(); + ((RollingLevelDBTimelineStore)store).clearStartTimeCache(); + super.testGetSingleEntity(); + loadTestEntityData(); + } + + @Test + public void testGetEntities() throws IOException { + super.testGetEntities(); + } + + @Test + public void testGetEntitiesWithFromId() throws IOException { + super.testGetEntitiesWithFromId(); + } + + @Test + public void testGetEntitiesWithFromTs() throws IOException { + // feature not supported + } + + @Test + public void testGetEntitiesWithPrimaryFilters() throws IOException { + super.testGetEntitiesWithPrimaryFilters(); + } + + @Test + public void testGetEntitiesWithSecondaryFilters() throws IOException { + super.testGetEntitiesWithSecondaryFilters(); + } + + @Test + public void testGetEvents() throws IOException { + super.testGetEvents(); + } + + @Test + public void testCacheSizes() { + Configuration conf = new Configuration(); + assertEquals(10000, + RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf)); + assertEquals(10000, + RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf)); + conf.setInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + 10001); + assertEquals(10001, + RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf)); + conf = new Configuration(); + conf.setInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + 10002); + assertEquals(10002, + RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf)); + } + + @Test + public void testCheckVersion() throws IOException { + RollingLevelDBTimelineStore dbStore = (RollingLevelDBTimelineStore) store; + // default version + Version defaultVersion = dbStore.getCurrentVersion(); + Assert.assertEquals(defaultVersion, dbStore.loadVersion()); + + // compatible version + Version compatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion(), + defaultVersion.getMinorVersion() + 2); + dbStore.storeVersion(compatibleVersion); + Assert.assertEquals(compatibleVersion, dbStore.loadVersion()); + restartTimelineStore(); + dbStore = (RollingLevelDBTimelineStore) store; + // overwrite the compatible version + Assert.assertEquals(defaultVersion, dbStore.loadVersion()); + + // incompatible version + Version incompatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion() + 1, + defaultVersion.getMinorVersion()); + dbStore.storeVersion(incompatibleVersion); + try { + restartTimelineStore(); + Assert.fail("Incompatible version, should expect fail here."); + } catch (ServiceStateException e) { + Assert.assertTrue("Exception message mismatch", + e.getMessage().contains("Incompatible version for timeline store")); + } + } + + @Test + public void testValidateConfig() throws IOException { + Configuration copyConfig = new YarnConfiguration(config); + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig.setLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, 0); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration.TIMELINE_SERVICE_TTL_MS)); + } + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig.setLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, 0); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS)); + } + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig.setLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, -1); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); + } + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig.setLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + 0); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration + .TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE)); + } + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig.setLong( + YarnConfiguration + .TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + 0); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration + .TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE)); + } + config = copyConfig; + restartTimelineStore(); + } + + private void restartTimelineStore() throws IOException { + // need to close so leveldb releases database lock + if (store != null) { + store.close(); + } + store = new RollingLevelDBTimelineStore(); + store.init(config); + store.start(); + } + + @Test + public void testGetDomain() throws IOException { + super.testGetDomain(); + } + + @Test + public void testGetDomains() throws IOException { + super.testGetDomains(); + } + + @Test + public void testRelatingToNonExistingEntity() throws IOException { + TimelineEntity entityToStore = new TimelineEntity(); + entityToStore.setEntityType("TEST_ENTITY_TYPE_1"); + entityToStore.setEntityId("TEST_ENTITY_ID_1"); + entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID); + entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2"); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entityToStore); + store.put(entities); + TimelineEntity entityToGet = + store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null); + Assert.assertNotNull(entityToGet); + Assert.assertEquals("DEFAULT", entityToGet.getDomainId()); + Assert.assertEquals("TEST_ENTITY_TYPE_1", + entityToGet.getRelatedEntities().keySet().iterator().next()); + Assert.assertEquals("TEST_ENTITY_ID_1", + entityToGet.getRelatedEntities().values().iterator().next() + .iterator().next()); + } + + @Test + public void testRelatingToEntityInSamePut() throws IOException { + TimelineEntity entityToRelate = new TimelineEntity(); + entityToRelate.setEntityType("TEST_ENTITY_TYPE_2"); + entityToRelate.setEntityId("TEST_ENTITY_ID_2"); + entityToRelate.setDomainId("TEST_DOMAIN"); + TimelineEntity entityToStore = new TimelineEntity(); + entityToStore.setEntityType("TEST_ENTITY_TYPE_1"); + entityToStore.setEntityId("TEST_ENTITY_ID_1"); + entityToStore.setDomainId("TEST_DOMAIN"); + entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2"); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entityToStore); + entities.addEntity(entityToRelate); + store.put(entities); + TimelineEntity entityToGet = + store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null); + Assert.assertNotNull(entityToGet); + Assert.assertEquals("TEST_DOMAIN", entityToGet.getDomainId()); + Assert.assertEquals("TEST_ENTITY_TYPE_1", + entityToGet.getRelatedEntities().keySet().iterator().next()); + Assert.assertEquals("TEST_ENTITY_ID_1", + entityToGet.getRelatedEntities().values().iterator().next() + .iterator().next()); + } + + @Test + public void testRelatingToOldEntityWithoutDomainId() throws IOException { + // New entity is put in the default domain + TimelineEntity entityToStore = new TimelineEntity(); + entityToStore.setEntityType("NEW_ENTITY_TYPE_1"); + entityToStore.setEntityId("NEW_ENTITY_ID_1"); + entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID); + entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1"); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entityToStore); + store.put(entities); + + TimelineEntity entityToGet = + store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null); + Assert.assertNotNull(entityToGet); + Assert.assertEquals("DEFAULT", entityToGet.getDomainId()); + Assert.assertEquals("NEW_ENTITY_TYPE_1", + entityToGet.getRelatedEntities().keySet().iterator().next()); + Assert.assertEquals("NEW_ENTITY_ID_1", + entityToGet.getRelatedEntities().values().iterator().next() + .iterator().next()); + + // New entity is not put in the default domain + entityToStore = new TimelineEntity(); + entityToStore.setEntityType("NEW_ENTITY_TYPE_2"); + entityToStore.setEntityId("NEW_ENTITY_ID_2"); + entityToStore.setDomainId("NON_DEFAULT"); + entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1"); + entities = new TimelineEntities(); + entities.addEntity(entityToStore); + TimelinePutResponse response = store.put(entities); + Assert.assertEquals(1, response.getErrors().size()); + Assert.assertEquals(TimelinePutError.FORBIDDEN_RELATION, + response.getErrors().get(0).getErrorCode()); + entityToGet = + store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null); + Assert.assertNotNull(entityToGet); + Assert.assertEquals("DEFAULT", entityToGet.getDomainId()); + // Still have one related entity + Assert.assertEquals(1, entityToGet.getRelatedEntities().keySet().size()); + Assert.assertEquals(1, entityToGet.getRelatedEntities().values() + .iterator().next().size()); + } + + public void testStorePerformance() throws IOException { + TimelineEntity entityToStorePrep = new TimelineEntity(); + entityToStorePrep.setEntityType("TEST_ENTITY_TYPE_PREP"); + entityToStorePrep.setEntityId("TEST_ENTITY_ID_PREP"); + entityToStorePrep.setDomainId("TEST_DOMAIN"); + entityToStorePrep.addRelatedEntity("TEST_ENTITY_TYPE_2", + "TEST_ENTITY_ID_2"); + entityToStorePrep.setStartTime(0L); + + TimelineEntities entitiesPrep = new TimelineEntities(); + entitiesPrep.addEntity(entityToStorePrep); + store.put(entitiesPrep); + + long start = System.currentTimeMillis(); + int num = 1000000; + + Log.info("Start test for " + num); + + final String tezTaskAttemptId = "TEZ_TA"; + final String tezEntityId = "attempt_1429158534256_0001_1_00_000000_"; + final String tezTaskId = "TEZ_T"; + final String tezDomainId = "Tez_ATS_application_1429158534256_0001"; + + TimelineEntity entityToStore = new TimelineEntity(); + TimelineEvent startEvt = new TimelineEvent(); + entityToStore.setEntityType(tezTaskAttemptId); + + startEvt.setEventType("TASK_ATTEMPT_STARTED"); + startEvt.setTimestamp(0); + entityToStore.addEvent(startEvt); + entityToStore.setDomainId(tezDomainId); + + entityToStore.addPrimaryFilter("status", "SUCCEEDED"); + entityToStore.addPrimaryFilter("applicationId", + "application_1429158534256_0001"); + entityToStore.addPrimaryFilter("TEZ_VERTEX_ID", + "vertex_1429158534256_0001_1_00"); + entityToStore.addPrimaryFilter("TEZ_DAG_ID", "dag_1429158534256_0001_1"); + entityToStore.addPrimaryFilter("TEZ_TASK_ID", + "task_1429158534256_0001_1_00_000000"); + + entityToStore.setStartTime(0L); + entityToStore.addOtherInfo("startTime", 0); + entityToStore.addOtherInfo("inProgressLogsURL", + "localhost:8042/inProgressLogsURL"); + entityToStore.addOtherInfo("completedLogsURL", ""); + entityToStore.addOtherInfo("nodeId", "localhost:54450"); + entityToStore.addOtherInfo("nodeHttpAddress", "localhost:8042"); + entityToStore.addOtherInfo("containerId", + "container_1429158534256_0001_01_000002"); + entityToStore.addOtherInfo("status", "RUNNING"); + entityToStore.addRelatedEntity(tezTaskId, "TEZ_TASK_ID_1"); + + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entityToStore); + + for (int i = 0; i < num; ++i) { + entityToStore.setEntityId(tezEntityId + i); + store.put(entities); + } + + long duration = System.currentTimeMillis() - start; + Log.info("Duration for " + num + ": " + duration); + } + + public static void main(String[] args) throws Exception { + TestRollingLevelDBTimelineStore store = + new TestRollingLevelDBTimelineStore(); + store.setup(); + store.testStorePerformance(); + store.tearDown(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java index 6ac5a35..71e298c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java @@ -70,7 +70,7 @@ protected String entityId6; protected String entityId7; protected String entityType7; - + protected Map> primaryFilters; protected Map secondaryFilters; protected Map allFilters; @@ -105,7 +105,7 @@ protected void loadTestEntityData() throws IOException { Set l1 = new HashSet(); l1.add("username"); Set l2 = new HashSet(); - l2.add((long)Integer.MAX_VALUE); + l2.add(Integer.MAX_VALUE); Set l3 = new HashSet(); l3.add("123abc"); Set l4 = new HashSet(); @@ -115,7 +115,7 @@ protected void loadTestEntityData() throws IOException { primaryFilters.put("other", l3); primaryFilters.put("long", l4); Map secondaryFilters = new HashMap(); - secondaryFilters.put("startTime", 123456l); + secondaryFilters.put("startTime", 123456); secondaryFilters.put("status", "RUNNING"); Map otherInfo1 = new HashMap(); otherInfo1.put("info1", "val1"); @@ -139,7 +139,7 @@ protected void loadTestEntityData() throws IOException { relatedEntities.put(entityType2, Collections.singleton(entityId2)); TimelineEvent ev3 = createEvent(789l, "launch_event", null); - TimelineEvent ev4 = createEvent(-123l, "init_event", null); + TimelineEvent ev4 = createEvent(0l, "init_event", null); List events = new ArrayList(); events.add(ev3); events.add(ev4); @@ -302,7 +302,7 @@ protected void loadVerificationEntityData() throws Exception { relEntityMap2.put(entityType4, Collections.singleton(entityId4)); ev3 = createEvent(789l, "launch_event", null); - ev4 = createEvent(-123l, "init_event", null); + ev4 = createEvent(0l, "init_event", null); events2 = new ArrayList(); events2.add(ev3); events2.add(ev4); @@ -384,7 +384,7 @@ public void testGetSingleEntity() throws IOException { entityType1, EnumSet.allOf(Field.class)), domainId1); verifyEntityInfo(entityId2, entityType2, events2, relEntityMap, - EMPTY_PRIMARY_FILTERS, EMPTY_MAP, -123l, store.getEntity(entityId2, + EMPTY_PRIMARY_FILTERS, EMPTY_MAP, 0l, store.getEntity(entityId2, entityType2, EnumSet.allOf(Field.class)), domainId1); verifyEntityInfo(entityId4, entityType4, EMPTY_EVENTS, EMPTY_REL_ENTITIES,