diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
index 71605a5..cc4a16d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml
@@ -65,6 +65,17 @@
test
+ org.apache.hadoop
+ hadoop-hdfs
+ test
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test-jar
+ test
+
+
com.google.inject.extensions
guice-servlet
@@ -159,6 +170,70 @@
org.fusesource.leveldbjni
leveldbjni-all
-
+
+
+ org.apache.hbase
+ hbase-client
+ 0.98.0-hadoop2
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-auth
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-annotations
+
+
+
+
+
+ org.apache.hbase
+ hbase-testing-util
+ 0.98.0-hadoop2
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-auth
+
+
+ org.apache.hadoop
+ hadoop-client
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ org.apache.hadoop
+ hadoop-annotations
+
+
+ org.apache.hadoop
+ hadoop-minicluster
+
+
+ test
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/AbstractTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/AbstractTimelineStore.java
new file mode 100644
index 0000000..cc8aadf
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/AbstractTimelineStore.java
@@ -0,0 +1,808 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
+import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.WritableComparator;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class AbstractTimelineStore extends AbstractService implements
+ TimelineStore {
+
+ static final Log LOG = LogFactory.getLog(AbstractTimelineStore.class);
+
+ public static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+ public static final byte NULL_BYTE = 0x0;
+ public static final byte ONE_BYTE = 0x1;
+
+ private static final String TABLE_NAME_PREFIX = "timeline.";
+
+ static final String START_TIME_TABLE = TABLE_NAME_PREFIX +
+ "starttime";
+ static final String ENTITY_TABLE = TABLE_NAME_PREFIX + "entity";
+ static final String INDEX_TABLE = TABLE_NAME_PREFIX + "index";
+
+ static final byte[] START_TIME_COLUMN = "s".getBytes(UTF8_CHARSET);
+ static final byte[] INSERT_TIME_COLUMN = "t".getBytes(UTF8_CHARSET);
+
+ static final byte[] EVENTS_COLUMN = "e".getBytes(UTF8_CHARSET);
+ static final byte[] PRIMARY_FILTERS_COLUMN =
+ "f".getBytes(UTF8_CHARSET);
+ static final byte[] OTHER_INFO_COLUMN = "i".getBytes(UTF8_CHARSET);
+ static final byte[] RELATED_ENTITIES_COLUMN =
+ "r".getBytes(UTF8_CHARSET);
+
+ static final byte[] EMPTY_BYTES = new byte[0];
+
+ /**
+ * Default age off time is one week
+ */
+ static final int DEFAULT_TTL = 60 * 60 * 24 * 7;
+
+ public AbstractTimelineStore() {
+ super(AbstractTimelineStore.class.getName());
+ }
+
+ public AbstractTimelineStore(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ // Set up data store schema
+ // Timestamp table
+ ArrayList timestampTableCol = new ArrayList(1);
+ timestampTableCol.add(START_TIME_COLUMN);
+ checkAndMakeTable(START_TIME_TABLE, timestampTableCol);
+ // Entity table
+ ArrayList entityTableCol = new ArrayList(5);
+ entityTableCol.add(INSERT_TIME_COLUMN);
+ entityTableCol.add(EVENTS_COLUMN);
+ entityTableCol.add(PRIMARY_FILTERS_COLUMN);
+ entityTableCol.add(OTHER_INFO_COLUMN);
+ entityTableCol.add(RELATED_ENTITIES_COLUMN);
+ checkAndMakeTable(ENTITY_TABLE, entityTableCol);
+ // Index table has the same column schema as entity table
+ checkAndMakeTable(INDEX_TABLE, entityTableCol);
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ @Override
+ public TimelineEntity getEntity(String entityId, String entityType,
+ EnumSet fieldsToRetrieve) throws IOException {
+ byte[] revStartTime = getStartTime(entityId, entityType, null, null);
+ if (revStartTime == null) {
+ return null;
+ }
+
+ TimelineEntity entity = new TimelineEntity();
+ entity.setEntityId(entityId);
+ entity.setEntityType(entityType);
+ entity.setStartTime(readReverseOrderedLong(revStartTime, 0));
+
+ ArrayList columnFamilies = new ArrayList(Field.values().length);
+ if (fieldsToRetrieve == null) {
+ fieldsToRetrieve = EnumSet.allOf(Field.class);
+ }
+ if (fieldsToRetrieve.contains(Field.EVENTS) ||
+ fieldsToRetrieve.contains(Field.LAST_EVENT_ONLY)) {
+ columnFamilies.add(EVENTS_COLUMN);
+ }
+ if (fieldsToRetrieve.contains(Field.RELATED_ENTITIES)) {
+ columnFamilies.add(RELATED_ENTITIES_COLUMN);
+ }
+ if (fieldsToRetrieve.contains(Field.PRIMARY_FILTERS)) {
+ columnFamilies.add(PRIMARY_FILTERS_COLUMN);
+ }
+ if (fieldsToRetrieve.contains(Field.OTHER_INFO)) {
+ columnFamilies.add(OTHER_INFO_COLUMN);
+ }
+
+ try {
+ byte[] key = TimelineEntityKeyBuilder.createEntityRow(entityId,
+ entityType, revStartTime);
+ AbstractResult result = getFamilies(ENTITY_TABLE, key, columnFamilies);
+ TimelineEntityParser.parseEntityFromResult(entity, result, fieldsToRetrieve);
+ return entity;
+ } finally {
+ cleanUp(ENTITY_TABLE);
+ }
+ }
+
+ @Override
+ public TimelineEntities getEntities(String entityType, Long limit,
+ Long starttime, Long endtime, String fromId, Long fromTs,
+ NameValuePair primaryFilter, Collection secondaryFilters,
+ EnumSet fieldsToRetrieve) throws IOException {
+
+ if (endtime == null) {
+ // if end time is null, place no restriction on end time
+ endtime = Long.MAX_VALUE;
+ }
+
+ // using end time, construct a first key that will be seeked to
+ byte[] firstRow = TimelineEntityKeyBuilder.createEntityStartOrEndRow(
+ entityType, writeReverseOrderedLong(endtime));
+ byte[] lastRow = TimelineEntityKeyBuilder.createEntityTypeEndRow(entityType);
+
+ if (starttime != null) {
+ // if start time is not null, set a last key that will not be
+ // iterated past
+ lastRow = TimelineEntityKeyBuilder.createEntityStartOrEndRow(entityType,
+ writeReverseOrderedLong(starttime));
+ }
+
+ if (fromId != null) {
+ // deal with fromId by updating firstRow
+ byte[] fromIdStartTimeRow = TimelineEntityKeyBuilder.createStartTimeRow(
+ fromId, entityType);
+
+ AbstractResult startTimeResult = getColumn(START_TIME_TABLE,
+ fromIdStartTimeRow, START_TIME_COLUMN, EMPTY_BYTES);
+
+ if (startTimeResult.isEmpty()) {
+ // no start time for provided id, so return empty entities
+ return new TimelineEntities();
+ } else {
+ Long fromIdStartTime = readReverseOrderedLong(
+ startTimeResult.getVal(START_TIME_COLUMN, EMPTY_BYTES), 0);
+ // override endtime only if the fromId entity's start time is before
+ // endtime
+ // TODO: verify if this is the supposed behavior of fromID. currently
+ // consistent with the behavior in leveldb timeline storage
+ if (fromIdStartTime < endtime) {
+ firstRow = TimelineEntityKeyBuilder.createEntityRow(fromId, entityType,
+ writeReverseOrderedLong(fromIdStartTime));
+ }
+ }
+ }
+
+ if (limit == null) {
+ // if limit is not specified, use the default
+ limit = DEFAULT_LIMIT;
+ }
+
+ int entityOffset = 0;
+ String tableName = null;
+ AbstractScan scan = null;
+
+ try {
+ if (primaryFilter == null) {
+ tableName = ENTITY_TABLE;
+ } else {
+ tableName = INDEX_TABLE;
+ entityOffset = firstRow.length;
+ firstRow = TimelineEntityKeyBuilder.createIndexRow(
+ primaryFilter.getName(), primaryFilter.getValue(), firstRow);
+ entityOffset = firstRow.length - entityOffset;
+ lastRow = TimelineEntityKeyBuilder.createIndexRow(
+ primaryFilter.getName(), primaryFilter.getValue(), lastRow);
+ }
+
+ if (fieldsToRetrieve == null) {
+ fieldsToRetrieve = EnumSet.allOf(Field.class);
+ }
+ ArrayList columnsForScan = new ArrayList();
+ if (fieldsToRetrieve.contains(Field.EVENTS) ||
+ fieldsToRetrieve.contains(Field.LAST_EVENT_ONLY)) {
+ columnsForScan.add(EVENTS_COLUMN);
+ }
+ if (fieldsToRetrieve.contains(Field.RELATED_ENTITIES)) {
+ columnsForScan.add(RELATED_ENTITIES_COLUMN);
+ }
+ if (secondaryFilters != null ||
+ fieldsToRetrieve.contains(Field.PRIMARY_FILTERS)) {
+ columnsForScan.add(PRIMARY_FILTERS_COLUMN);
+ }
+ if (secondaryFilters != null ||
+ fieldsToRetrieve.contains(Field.OTHER_INFO)) {
+ columnsForScan.add(OTHER_INFO_COLUMN);
+ }
+ if (fromTs != null) {
+ columnsForScan.add(INSERT_TIME_COLUMN);
+ }
+
+ TimelineEntities entities = new TimelineEntities();
+ scan = scan(tableName, firstRow, lastRow, columnsForScan);
+ for (AbstractResult result = scan.next(); result != null; result = scan.next()) {
+ byte[] row = result.getRowKey();
+ TimelineEntity entity = TimelineEntityParser.parseEntityRow(row,
+ entityOffset, row.length - entityOffset);
+ // get and check insert time if fromTs is set
+ if (fromTs != null) {
+ long insertTime = readReverseOrderedLong(
+ result.getVal(INSERT_TIME_COLUMN, EMPTY_BYTES), 0);
+ // skip if the current row is inserted after the given time
+ if (insertTime > fromTs) {
+ continue;
+ }
+ }
+ if (TimelineEntityParser.parseEntityFromResult(entity, result, fieldsToRetrieve)) {
+ //TODO: remove client-side filtering once server-side is working
+ // determine if the retrieved entity matches the provided secondary
+ // filters, and if so add it to the list of entities to return
+ boolean filterPassed = true;
+ if (secondaryFilters != null) {
+ for (NameValuePair filter : secondaryFilters) {
+ // check other info for filtered field
+ Object v = entity.getOtherInfo().get(filter.getName());
+ if (v == null) {
+ // if field is not found in other info, check in primary filters
+ Set