diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/LeveldbApplicationTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/LeveldbApplicationTimelineStore.java index c2e93ca..b6b0647 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/LeveldbApplicationTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/LeveldbApplicationTimelineStore.java @@ -33,6 +33,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections.map.LRUMap; @@ -89,9 +90,20 @@ private static final int START_TIME_CACHE_SIZE = 10000; @SuppressWarnings("unchecked") - private final Map startTimeCache = + private final Map startTimeWriteCache = Collections.synchronizedMap(new LRUMap(START_TIME_CACHE_SIZE)); + @SuppressWarnings("unchecked") + private final Map startTimeReadCache = + Collections.synchronizedMap(new LRUMap(START_TIME_CACHE_SIZE)); + + /** + * Locking is required for {@link #getAndSetStartTime} calls during + * concurrent writes. + */ + private final LockMap writeLocks = + new LockMap(); + private DB db; public LeveldbApplicationTimelineStore() { @@ -120,6 +132,27 @@ protected void serviceStop() throws Exception { super.serviceStop(); } + private static class LockMap { + + private Map locks = + new HashMap(); + + synchronized ReentrantLock getLock(K key) { + ReentrantLock lock = locks.get(key); + if (lock == null) { + lock = new ReentrantLock(); + locks.put(key, lock); + } + return lock; + } + + synchronized void returnLock(K key, ReentrantLock lock) { + if (lock.getHoldCount() == 0) { + locks.remove(key); + } + } + } + private static class KeyBuilder { private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10; private byte[][] b; @@ -216,7 +249,7 @@ public ATSEntity getEntity(String entity, String entityType, EnumSet fields) throws IOException { DBIterator iterator = null; try { - byte[] revStartTime = getStartTime(entity, entityType, null, null, null); + byte[] revStartTime = getStartTime(entity, entityType); if (revStartTime == null) return null; byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) @@ -343,7 +376,7 @@ public int compare(byte[] o1, byte[] o2) { // look up start times for the specified entities // skip entities with no start time for (String entity : entityIds) { - byte[] startTime = getStartTime(entity, entityType, null, null, null); + byte[] startTime = getStartTime(entity, entityType); if (startTime != null) { List entities = startTimeMap.get(startTime); if (entities == null) { @@ -534,7 +567,7 @@ private void put(ATSEntity atsEntity, ATSPutErrors response) { writeBatch = db.createWriteBatch(); List events = atsEntity.getEvents(); // look up the start time for the entity - byte[] revStartTime = getStartTime(atsEntity.getEntityId(), + byte[] revStartTime = getAndSetStartTime(atsEntity.getEntityId(), atsEntity.getEntityType(), atsEntity.getStartTime(), events, writeBatch); if (revStartTime == null) { @@ -571,7 +604,7 @@ private void put(ATSEntity atsEntity, ATSPutErrors response) { String relatedEntityType = relatedEntityList.getKey(); for (String relatedEntityId : relatedEntityList.getValue()) { // look up start time of related entity - byte[] relatedEntityStartTime = getStartTime(relatedEntityId, + byte[] relatedEntityStartTime = getAndSetStartTime(relatedEntityId, relatedEntityType, null, null, writeBatch); if (relatedEntityStartTime == null) { // if start time is not found, set start time of the related @@ -580,7 +613,7 @@ private void put(ATSEntity atsEntity, ATSPutErrors response) { relatedEntityStartTime = revStartTime; writeBatch.put(createStartTimeLookupKey(relatedEntityId, relatedEntityType), relatedEntityStartTime); - startTimeCache.put(new EntityIdentifier(relatedEntityId, + startTimeWriteCache.put(new EntityIdentifier(relatedEntityId, relatedEntityType), revStartTimeLong); } // write reverse entry (related entity -> entity) @@ -660,6 +693,39 @@ public ATSPutErrors put(ATSEntities atsEntities) { * * @param entityId The id of the entity * @param entityType The type of the entity + * @return A byte array + * @throws IOException + */ + private byte[] getStartTime(String entityId, String entityType) + throws IOException { + EntityIdentifier entity = new EntityIdentifier(entityId, entityType); + // start time is not provided, so try to look it up + if (startTimeReadCache.containsKey(entity)) { + // found the start time in the cache + return writeReverseOrderedLong(startTimeReadCache.get(entity)); + } else { + // try to look up the start time in the db + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + byte[] v = db.get(b); + if (v == null) { + // did not find the start time in the db + return null; + } else { + // found the start time in the db + startTimeReadCache.put(entity, readReverseOrderedLong(v, 0)); + return v; + } + } + } + + /** + * Get the unique start time for a given entity as a byte array that sorts + * the timestamps in reverse order (see {@link + * GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time + * doesn't exist, set it based on the information provided. + * + * @param entityId The id of the entity + * @param entityType The type of the entity * @param startTime The start time of the entity, or null * @param events A list of events for the entity, or null * @param writeBatch A leveldb write batch, if the method is called by a @@ -667,62 +733,85 @@ public ATSPutErrors put(ATSEntities atsEntities) { * @return A byte array * @throws IOException */ - private byte[] getStartTime(String entityId, String entityType, + private byte[] getAndSetStartTime(String entityId, String entityType, Long startTime, List events, WriteBatch writeBatch) throws IOException { EntityIdentifier entity = new EntityIdentifier(entityId, entityType); - if (startTime == null) { - // start time is not provided, so try to look it up - if (startTimeCache.containsKey(entity)) { - // found the start time in the cache - startTime = startTimeCache.get(entity); + ReentrantLock lock = writeLocks.getLock(entity); + lock.lock(); + try { + if (startTime == null) { + // start time is not provided, so try to look it up + if (startTimeWriteCache.containsKey(entity)) { + // found the start time in the cache + startTime = startTimeWriteCache.get(entity); + return writeReverseOrderedLong(startTime); + } else { + if (events != null) { + // prepare a start time from events in case it is needed + Long min = Long.MAX_VALUE; + for (ATSEvent e : events) { + if (min > e.getTimestamp()) { + min = e.getTimestamp(); + } + } + startTime = min; + } + return checkStartTimeInDb(entity, startTime, writeBatch); + } } else { - // try to look up the start time in the db - byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); - byte[] v = db.get(b); - if (v == null) { - // did not find the start time in the db - // if this is a put, try to set it from the provided events - if (events == null || writeBatch == null) { - // no events, or not a put, so return null - return null; + // start time is provided + if (startTimeWriteCache.containsKey(entity)) { + // check the provided start time matches the cache + if (!startTime.equals(startTimeWriteCache.get(entity))) { + // the start time is already in the cache, + // and it is different from the provided start time, + // so use the one from the cache + startTime = startTimeWriteCache.get(entity); } - Long min = Long.MAX_VALUE; - for (ATSEvent e : events) - if (min > e.getTimestamp()) - min = e.getTimestamp(); - startTime = min; - // selected start time as minimum timestamp of provided events - // write start time to db and cache - writeBatch.put(b, writeReverseOrderedLong(startTime)); - startTimeCache.put(entity, startTime); + return writeReverseOrderedLong(startTime); } else { - // found the start time in the db - startTime = readReverseOrderedLong(v, 0); - if (writeBatch != null) { - // if this is a put, re-add the start time to the cache - startTimeCache.put(entity, startTime); - } + // check the provided start time matches the db + return checkStartTimeInDb(entity, startTime, writeBatch); } } - } else { - // start time is provided - // TODO: verify start time in db as well as cache? - if (startTimeCache.containsKey(entity)) { - // if the start time is already in the cache, - // and it is different from the provided start time, - // use the one from the cache - if (!startTime.equals(startTimeCache.get(entity))) - startTime = startTimeCache.get(entity); - } else if (writeBatch != null) { - // if this is a put, write the provided start time to the db and the - // cache - byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); - writeBatch.put(b, writeReverseOrderedLong(startTime)); - startTimeCache.put(entity, startTime); + } finally { + lock.unlock(); + writeLocks.returnLock(entity, lock); + } + } + + /** + * Checks db for start time and returns it if it exists. If it doesn't + * exist, writes the suggested start time (if it is not null). This is + * only called when the start time is not found in the cache, + * so it adds it back into the cache if it is found. + */ + private byte[] checkStartTimeInDb(EntityIdentifier entity, + Long suggestedStartTime, WriteBatch writeBatch) throws IOException { + // create lookup key for start time + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + // retrieve value for key + byte[] v = db.get(b); + byte[] revStartTime; + if (v == null) { + // start time doesn't exist in db + if (suggestedStartTime == null) { + return null; } + // write suggested start time + revStartTime = writeReverseOrderedLong(suggestedStartTime); + writeBatch.put(b, revStartTime); + } else { + // found start time in db, so ignore suggested start time + suggestedStartTime = readReverseOrderedLong(v, 0); + revStartTime = v; } - return writeReverseOrderedLong(startTime); + // keep both read/write cache to reduce the synchronization across read and + // write operations + startTimeWriteCache.put(entity, suggestedStartTime); + startTimeReadCache.put(entity, suggestedStartTime); + return revStartTime; } /** @@ -849,6 +938,7 @@ private static void addRelatedEntity(ATSEntity atsEntity, byte[] key, */ @VisibleForTesting void clearStartTimeCache() { - startTimeCache.clear(); + startTimeWriteCache.clear(); + startTimeReadCache.clear(); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestLeveldbApplicationTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestLeveldbApplicationTimelineStore.java index b868049..45ee02b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestLeveldbApplicationTimelineStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/apptimeline/TestLeveldbApplicationTimelineStore.java @@ -25,17 +25,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntities; -import org.apache.hadoop.yarn.api.records.apptimeline.ATSEntity; -import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors; -import org.apache.hadoop.yarn.api.records.apptimeline.ATSPutErrors.ATSPutError; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.After; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.assertEquals; - @InterfaceAudience.Private @InterfaceStability.Unstable public class TestLeveldbApplicationTimelineStore @@ -70,6 +64,7 @@ public void testGetSingleEntity() throws IOException { super.testGetSingleEntity(); ((LeveldbApplicationTimelineStore)store).clearStartTimeCache(); super.testGetSingleEntity(); + loadTestData(); } @Test