diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml index 71605a5..cc4a16d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/pom.xml @@ -65,6 +65,17 @@ test + org.apache.hadoop + hadoop-hdfs + test + + + org.apache.hadoop + hadoop-hdfs + test-jar + test + + com.google.inject.extensions guice-servlet @@ -159,6 +170,70 @@ org.fusesource.leveldbjni leveldbjni-all - + + + org.apache.hbase + hbase-client + 0.98.0-hadoop2 + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-auth + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.apache.hadoop + hadoop-annotations + + + + + + org.apache.hbase + hbase-testing-util + 0.98.0-hadoop2 + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-auth + + + org.apache.hadoop + hadoop-client + + + org.apache.hadoop + hadoop-mapreduce-client-core + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + + + org.apache.hadoop + hadoop-hdfs + + + org.apache.hadoop + hadoop-annotations + + + org.apache.hadoop + hadoop-minicluster + + + test + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/AbstractTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/AbstractTimelineStore.java new file mode 100644 index 0000000..cc8aadf --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/AbstractTimelineStore.java @@ -0,0 +1,808 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; +import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.WritableComparator; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.Map.Entry; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public abstract class AbstractTimelineStore extends AbstractService implements + TimelineStore { + + static final Log LOG = LogFactory.getLog(AbstractTimelineStore.class); + + public static final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + + public static final byte NULL_BYTE = 0x0; + public static final byte ONE_BYTE = 0x1; + + private static final String TABLE_NAME_PREFIX = "timeline."; + + static final String START_TIME_TABLE = TABLE_NAME_PREFIX + + "starttime"; + static final String ENTITY_TABLE = TABLE_NAME_PREFIX + "entity"; + static final String INDEX_TABLE = TABLE_NAME_PREFIX + "index"; + + static final byte[] START_TIME_COLUMN = "s".getBytes(UTF8_CHARSET); + static final byte[] INSERT_TIME_COLUMN = "t".getBytes(UTF8_CHARSET); + + static final byte[] EVENTS_COLUMN = "e".getBytes(UTF8_CHARSET); + static final byte[] PRIMARY_FILTERS_COLUMN = + "f".getBytes(UTF8_CHARSET); + static final byte[] OTHER_INFO_COLUMN = "i".getBytes(UTF8_CHARSET); + static final byte[] RELATED_ENTITIES_COLUMN = + "r".getBytes(UTF8_CHARSET); + + static final byte[] EMPTY_BYTES = new byte[0]; + + /** + * Default age off time is one week + */ + static final int DEFAULT_TTL = 60 * 60 * 24 * 7; + + public AbstractTimelineStore() { + super(AbstractTimelineStore.class.getName()); + } + + public AbstractTimelineStore(String name) { + super(name); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + // Set up data store schema + // Timestamp table + ArrayList timestampTableCol = new ArrayList(1); + timestampTableCol.add(START_TIME_COLUMN); + checkAndMakeTable(START_TIME_TABLE, timestampTableCol); + // Entity table + ArrayList entityTableCol = new ArrayList(5); + entityTableCol.add(INSERT_TIME_COLUMN); + entityTableCol.add(EVENTS_COLUMN); + entityTableCol.add(PRIMARY_FILTERS_COLUMN); + entityTableCol.add(OTHER_INFO_COLUMN); + entityTableCol.add(RELATED_ENTITIES_COLUMN); + checkAndMakeTable(ENTITY_TABLE, entityTableCol); + // Index table has the same column schema as entity table + checkAndMakeTable(INDEX_TABLE, entityTableCol); + + super.serviceInit(conf); + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + @Override + public TimelineEntity getEntity(String entityId, String entityType, + EnumSet fieldsToRetrieve) throws IOException { + byte[] revStartTime = getStartTime(entityId, entityType, null, null); + if (revStartTime == null) { + return null; + } + + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(entityId); + entity.setEntityType(entityType); + entity.setStartTime(readReverseOrderedLong(revStartTime, 0)); + + ArrayList columnFamilies = new ArrayList(Field.values().length); + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.allOf(Field.class); + } + if (fieldsToRetrieve.contains(Field.EVENTS) || + fieldsToRetrieve.contains(Field.LAST_EVENT_ONLY)) { + columnFamilies.add(EVENTS_COLUMN); + } + if (fieldsToRetrieve.contains(Field.RELATED_ENTITIES)) { + columnFamilies.add(RELATED_ENTITIES_COLUMN); + } + if (fieldsToRetrieve.contains(Field.PRIMARY_FILTERS)) { + columnFamilies.add(PRIMARY_FILTERS_COLUMN); + } + if (fieldsToRetrieve.contains(Field.OTHER_INFO)) { + columnFamilies.add(OTHER_INFO_COLUMN); + } + + try { + byte[] key = TimelineEntityKeyBuilder.createEntityRow(entityId, + entityType, revStartTime); + AbstractResult result = getFamilies(ENTITY_TABLE, key, columnFamilies); + TimelineEntityParser.parseEntityFromResult(entity, result, fieldsToRetrieve); + return entity; + } finally { + cleanUp(ENTITY_TABLE); + } + } + + @Override + public TimelineEntities getEntities(String entityType, Long limit, + Long starttime, Long endtime, String fromId, Long fromTs, + NameValuePair primaryFilter, Collection secondaryFilters, + EnumSet fieldsToRetrieve) throws IOException { + + if (endtime == null) { + // if end time is null, place no restriction on end time + endtime = Long.MAX_VALUE; + } + + // using end time, construct a first key that will be seeked to + byte[] firstRow = TimelineEntityKeyBuilder.createEntityStartOrEndRow( + entityType, writeReverseOrderedLong(endtime)); + byte[] lastRow = TimelineEntityKeyBuilder.createEntityTypeEndRow(entityType); + + if (starttime != null) { + // if start time is not null, set a last key that will not be + // iterated past + lastRow = TimelineEntityKeyBuilder.createEntityStartOrEndRow(entityType, + writeReverseOrderedLong(starttime)); + } + + if (fromId != null) { + // deal with fromId by updating firstRow + byte[] fromIdStartTimeRow = TimelineEntityKeyBuilder.createStartTimeRow( + fromId, entityType); + + AbstractResult startTimeResult = getColumn(START_TIME_TABLE, + fromIdStartTimeRow, START_TIME_COLUMN, EMPTY_BYTES); + + if (startTimeResult.isEmpty()) { + // no start time for provided id, so return empty entities + return new TimelineEntities(); + } else { + Long fromIdStartTime = readReverseOrderedLong( + startTimeResult.getVal(START_TIME_COLUMN, EMPTY_BYTES), 0); + // override endtime only if the fromId entity's start time is before + // endtime + // TODO: verify if this is the supposed behavior of fromID. currently + // consistent with the behavior in leveldb timeline storage + if (fromIdStartTime < endtime) { + firstRow = TimelineEntityKeyBuilder.createEntityRow(fromId, entityType, + writeReverseOrderedLong(fromIdStartTime)); + } + } + } + + if (limit == null) { + // if limit is not specified, use the default + limit = DEFAULT_LIMIT; + } + + int entityOffset = 0; + String tableName = null; + AbstractScan scan = null; + + try { + if (primaryFilter == null) { + tableName = ENTITY_TABLE; + } else { + tableName = INDEX_TABLE; + entityOffset = firstRow.length; + firstRow = TimelineEntityKeyBuilder.createIndexRow( + primaryFilter.getName(), primaryFilter.getValue(), firstRow); + entityOffset = firstRow.length - entityOffset; + lastRow = TimelineEntityKeyBuilder.createIndexRow( + primaryFilter.getName(), primaryFilter.getValue(), lastRow); + } + + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.allOf(Field.class); + } + ArrayList columnsForScan = new ArrayList(); + if (fieldsToRetrieve.contains(Field.EVENTS) || + fieldsToRetrieve.contains(Field.LAST_EVENT_ONLY)) { + columnsForScan.add(EVENTS_COLUMN); + } + if (fieldsToRetrieve.contains(Field.RELATED_ENTITIES)) { + columnsForScan.add(RELATED_ENTITIES_COLUMN); + } + if (secondaryFilters != null || + fieldsToRetrieve.contains(Field.PRIMARY_FILTERS)) { + columnsForScan.add(PRIMARY_FILTERS_COLUMN); + } + if (secondaryFilters != null || + fieldsToRetrieve.contains(Field.OTHER_INFO)) { + columnsForScan.add(OTHER_INFO_COLUMN); + } + if (fromTs != null) { + columnsForScan.add(INSERT_TIME_COLUMN); + } + + TimelineEntities entities = new TimelineEntities(); + scan = scan(tableName, firstRow, lastRow, columnsForScan); + for (AbstractResult result = scan.next(); result != null; result = scan.next()) { + byte[] row = result.getRowKey(); + TimelineEntity entity = TimelineEntityParser.parseEntityRow(row, + entityOffset, row.length - entityOffset); + // get and check insert time if fromTs is set + if (fromTs != null) { + long insertTime = readReverseOrderedLong( + result.getVal(INSERT_TIME_COLUMN, EMPTY_BYTES), 0); + // skip if the current row is inserted after the given time + if (insertTime > fromTs) { + continue; + } + } + if (TimelineEntityParser.parseEntityFromResult(entity, result, fieldsToRetrieve)) { + //TODO: remove client-side filtering once server-side is working + // determine if the retrieved entity matches the provided secondary + // filters, and if so add it to the list of entities to return + boolean filterPassed = true; + if (secondaryFilters != null) { + for (NameValuePair filter : secondaryFilters) { + // check other info for filtered field + Object v = entity.getOtherInfo().get(filter.getName()); + if (v == null) { + // if field is not found in other info, check in primary filters + Set vs = entity.getPrimaryFilters() + .get(filter.getName()); + if (vs == null || !vs.contains(filter.getValue())) { + // if field is not found in primary filters, or if it is found + // with a different value, do not return the entity + filterPassed = false; + break; + } + } else if (!v.equals(filter.getValue())) { + // if field is found in other info with a different value, + // do not return the entity + filterPassed = false; + break; + } + } + } + if (filterPassed) { + entities.addEntity(entity); + } + } + if (entities.getEntities().size() >= limit) { + break; + } + } + return entities; + } finally { + cleanUp(tableName); + if (scan != null) { + scan.cleanUp(); + } + } + } + + @Override + public TimelineEvents getEntityTimelines(String entityType, + SortedSet entityIds, Long limit, Long windowStart, + Long windowEnd, Set eventTypes) throws IOException { + TimelineEvents events = new TimelineEvents(); + if (entityIds == null || entityIds.isEmpty()) { + return events; + } + // create a lexicographically-ordered map from start time to entities + Map> startTimeMap = new TreeMap>(new Comparator() { + @Override + public int compare(byte[] o1, byte[] o2) { + return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, + o2.length); + } + }); + + if (windowEnd == null) { + windowEnd = Long.MAX_VALUE; + } + if (limit == null) { + limit = DEFAULT_LIMIT; + } + + try { + // look up start times for the specified entities + // skip entities with no start time + for (String entityId : entityIds) { + byte[] startTime = getStartTime(entityId, entityType, null, null); + if (startTime != null) { + List entities = startTimeMap.get(startTime); + if (entities == null) { + entities = new ArrayList(); + startTimeMap.put(startTime, entities); + } + entities.add(new EntityIdentifier(entityId, entityType)); + } + } + for (Entry> entry : + startTimeMap.entrySet()) { + // look up the events matching the given parameters (limit, + // start time, end time, event types) for entities whose start times + // were found and add the entities to the return list + byte[] revStartTime = entry.getKey(); + for (EntityIdentifier entityIdentifier : entry.getValue()) { + EventsOfOneEntity entity = new EventsOfOneEntity(); + entity.setEntityId(entityIdentifier.getId()); + entity.setEntityType(entityType); + events.addEvent(entity); + + byte[] entityRow = TimelineEntityKeyBuilder.createEntityRow( + entityIdentifier.getId(), entityType, revStartTime); + ArrayList scanFamily = new ArrayList(1); + scanFamily.add(EVENTS_COLUMN); + // Note: Here, setting end row to the same as begin row should return + // the only row + AbstractScan scan = scan(ENTITY_TABLE, entityRow, entityRow, scanFamily); + // TODO: Enable filter support in abstract timeline store + byte[] first = writeReverseOrderedLong(windowEnd); + byte[] last = (windowStart == null) + ? null + : writeReverseOrderedLong(windowStart); + for (AbstractResult result = scan.next(); result != null; result = scan.next()) { + for (AbstractResult cell = result.next(); cell != null; cell = result.next()) { + byte[] family = cell.getFamily(); + byte[] qualifier = cell.getQualifier(); + if (WritableComparator.compareBytes(qualifier, 0, first.length, + first, 0, first.length) < 0) { + continue; + } + if (last != null && + WritableComparator.compareBytes(qualifier, 0, last.length, + last, 0, last.length) >= 0) { + break; + } + byte[] value = cell.getVal(family, qualifier); + TimelineEvent event = TimelineEntityParser.getEvent(qualifier, 0, + qualifier.length, value, 0, value.length); + if (eventTypes == null + || eventTypes.contains(event.getEventType())) { + entity.addEvent(event); + } + if (entity.getEvents().size() >= limit) { + break; + } + } + if (entity.getEvents().size() >= limit) { + break; + } + } + scan.cleanUp(); + } + } + } finally { + cleanUp(ENTITY_TABLE); + } + return events; + } + + @Override + public TimelinePutResponse put(TimelineEntities data) throws IOException { + TimelinePutResponse response = new TimelinePutResponse(); + for (TimelineEntity entity : data.getEntities()) { + put(entity, response); + } + return response; + } + + protected byte[] getStartTime(String entityId, String entityType, + Long startTime, List events) throws IOException { + try { + byte[] row = TimelineEntityKeyBuilder.createStartTimeRow(entityId, + entityType); + AbstractResult startTimeResult = getColumn(START_TIME_TABLE, row, + START_TIME_COLUMN, EMPTY_BYTES); + if (startTimeResult.isEmpty()) { + if (startTime == null) { + if (events != null) { + Long min = Long.MAX_VALUE; + for (TimelineEvent e : events) { + if (min > e.getTimestamp()) { + min = e.getTimestamp(); + } + } + startTime = min; + } + } + if (startTime == null) { + return null; + } + byte[] value = writeReverseOrderedLong(startTime); + AbstractPut startTimePut = createPut(START_TIME_TABLE, row); + startTimePut.addVal(START_TIME_COLUMN, EMPTY_BYTES, value); + + if (testAndPut(START_TIME_TABLE, row, START_TIME_COLUMN, + EMPTY_BYTES, null, startTimePut)) { + return value; + } else { + startTimeResult = getColumn(START_TIME_TABLE, row, + START_TIME_COLUMN, EMPTY_BYTES); + if (startTimeResult.isEmpty()) { + throw new IOException("Couldn't retrieve or set start time"); + } else { + return startTimeResult.getVal(START_TIME_COLUMN, EMPTY_BYTES); + } + } + } else { + return startTimeResult.getVal(START_TIME_COLUMN, EMPTY_BYTES); + } + } finally { + cleanUp(START_TIME_TABLE); + } + } + + private void put(TimelineEntity entity, TimelinePutResponse response) { + try { + List events = entity.getEvents(); + // look up the start time for the entity + byte[] revStartTime = getStartTime(entity.getEntityId(), + entity.getEntityType(), entity.getStartTime(), events); + if (revStartTime == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.NO_START_TIME); + response.addError(error); + return; + } + + Long revStartTimeLong = readReverseOrderedLong(revStartTime, 0); + Map> primaryFilters = entity.getPrimaryFilters(); + + byte[] entityRow = TimelineEntityKeyBuilder.createEntityRow( + entity.getEntityId(), entity.getEntityType(), revStartTime); + AbstractPut entityPut = createPut(ENTITY_TABLE, entityRow); + List entityPuts = new ArrayList(); + entityPuts.add(entityPut); + + // create index puts + List indexPuts = new ArrayList(); + if (primaryFilters != null && !primaryFilters.isEmpty()) { + for (Entry> primaryFilter : + primaryFilters.entrySet()) { + for (Object primaryFilterValue : primaryFilter.getValue()) { + AbstractPut indexPut = createPut(INDEX_TABLE, + TimelineEntityKeyBuilder.createIndexRow(primaryFilter.getKey(), + primaryFilterValue, entityRow)); + indexPuts.add(indexPut); + } + } + } + + // add insert time to entity put and index put if it's a new row + AbstractResult insertTimeResult = getColumn(ENTITY_TABLE, entityRow, + INSERT_TIME_COLUMN, EMPTY_BYTES); + if (insertTimeResult.isEmpty()) { + entityPut.addVal(INSERT_TIME_COLUMN, EMPTY_BYTES, + writeReverseOrderedLong(System.currentTimeMillis())); + for (AbstractPut indexPut : indexPuts) { + indexPut.addVal(INSERT_TIME_COLUMN, EMPTY_BYTES, + writeReverseOrderedLong(System.currentTimeMillis())); + } + } + + // add events to entity put + if (events != null && !events.isEmpty()) { + for (TimelineEvent event : events) { + byte[] revts = writeReverseOrderedLong(event.getTimestamp()); + byte[] columnQualifier = + TimelineEntityKeyBuilder.createEventColumnQualifier(revts, + event.getEventType()); + byte[] value = GenericObjectMapper.write(event.getEventInfo()); + entityPut.addVal(EVENTS_COLUMN, columnQualifier, value); + for (AbstractPut indexPut : indexPuts) { + indexPut.addVal(EVENTS_COLUMN, columnQualifier, value); + } + } + } + + // create related entity puts + Map> relatedEntities = + entity.getRelatedEntities(); + if (relatedEntities != null && !relatedEntities.isEmpty()) { + for (Entry> relatedEntityList : + relatedEntities.entrySet()) { + String relatedEntityType = relatedEntityList.getKey(); + for (String relatedEntityId : relatedEntityList.getValue()) { + // look up start time of related entity + byte[] relatedEntityStartTime = getStartTime(relatedEntityId, + relatedEntityType, revStartTimeLong, null); + // write "forward" entry (related entity -> entity) + AbstractPut relatedEntityPut = createPut(ENTITY_TABLE, + TimelineEntityKeyBuilder.createEntityRow(relatedEntityId, + relatedEntityType, relatedEntityStartTime)); + relatedEntityPut.addVal(RELATED_ENTITIES_COLUMN, + TimelineEntityKeyBuilder.createRelatedEntityColumnQualifier( + entity.getEntityId(), entity.getEntityType()), + EMPTY_BYTES); + entityPuts.add(relatedEntityPut); + } + } + } + + // add primary filters to entity put + if (primaryFilters != null && !primaryFilters.isEmpty()) { + for (Entry> primaryFilter : + primaryFilters.entrySet()) { + for (Object primaryFilterValue : primaryFilter.getValue()) { + byte[] columnQualifier = + TimelineEntityKeyBuilder.createPrimaryFilterColumnQualifier( + primaryFilter.getKey(), primaryFilterValue); + entityPut.addVal(PRIMARY_FILTERS_COLUMN, columnQualifier, + EMPTY_BYTES); + for (AbstractPut indexPut : indexPuts) { + indexPut.addVal(PRIMARY_FILTERS_COLUMN, columnQualifier, + EMPTY_BYTES); + } + } + } + } + + // add other info to entity put + Map otherInfo = entity.getOtherInfo(); + if (otherInfo != null && !otherInfo.isEmpty()) { + for (Entry i : otherInfo.entrySet()) { + byte[] columnQualifier = i.getKey().getBytes(UTF8_CHARSET); + byte[] value = GenericObjectMapper.write(i.getValue()); + entityPut.addVal(OTHER_INFO_COLUMN, columnQualifier, value); + for (AbstractPut indexPut : indexPuts) { + indexPut.addVal(OTHER_INFO_COLUMN, columnQualifier, value); + } + } + } + + if (entityPut.isEmpty()) { + entityPuts.remove(entityPut); + } + if (entityPuts.size() > 0) { + batchCommitPuts(entityPuts); + } + if (indexPuts.size() > 0) { + batchCommitPuts(indexPuts); + } + } catch (IOException e) { + LOG.error("Error putting entity " + entity.getEntityId() + + " of type " + entity.getEntityType(), e); + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.IO_EXCEPTION); + response.addError(error); + } finally { + cleanUp(ENTITY_TABLE); + cleanUp(INDEX_TABLE); + } + } + + // Abstract methods and interfaces + /** + * If the table with tableName does not exist in the db, create it with the + * given column names in columnFamilies. Or else, do nothing. + * + * @param tableName + * @param columnFamilies + */ + protected abstract void checkAndMakeTable(String tableName, + List columnFamilies) throws Exception; + + /** + * Select a column from tableName, where row equals to key + * + * @param key + * @param tableName + * @param columnFamilies + * @return query result + * @throws Exception + */ + protected abstract AbstractResult getColumn(String tableName, byte[] key, + byte[] family, byte[] qualifier) throws IOException; + + /** + * Select columnFamilies from tableName where key equals to the given key + * + * @param tableName + * @param key + * @param columnFamilies + * @return query results + * @throws IOException + */ + protected abstract AbstractResult getFamilies(String tableName, byte[] key, + List columnFamilies) throws IOException; + + /** + * Create an abstract put object to tableName with row key equals to the given + * key + * + * @param tableName + * @param key + * @return the created abstract put object + * @throws IOException + */ + protected abstract AbstractPut createPut(String tableName, byte[] key) + throws IOException; + + /** + * Commit a group of abstract puts + * + * @param puts + * @throws IOException + */ + protected abstract void batchCommitPuts(List puts) + throws IOException; + + /** + * Cleaning up the connection to a table with a given name + * + * @param tableName + */ + protected abstract void cleanUp(String tableName); + + /** + * Atomically checks if in table tableName, a given cell on + * (row, family:qualifier) equals to value, and if so execute the put. + * + * Note: will automatically "commit" the put if succeeded + * + * @param tableName + * @param row + * @param family + * @param qualifier + * @param value + * @param put + * @return true only if the put succeeded, otherwise false. + */ + protected abstract boolean testAndPut(String tableName, byte[] row, + byte[] family, byte[] qualifier, byte[] value, AbstractPut put) + throws IOException; + + /** + * Return an abstract scan object to scan the storage, from begin to end, with + * specified column families. + * + * @param tableName + * @param begin row key + * @param end row key + * @param families + * @return the abstract scan object to scan through the results + * @throws IOException + */ + protected abstract AbstractScan scan(String tableName, + byte[] begin, byte[] end, List families) throws IOException; + + /** + * Abstract put interface for storage updates + */ + interface AbstractPut { + /** + * Add a value on family:qualifier with a given value + * + * @param family + * @param qualifier + * @param value + */ + void addVal(byte[] family, byte[] qualifier, byte[] value); + + /** + * Commit the updates in this put + * + * @throws IOException + */ + void commit() throws IOException; + + /** + * Returns if this abstract put object is empty + * @return true if the put object is empty, otherwise false + */ + boolean isEmpty(); + } + + interface AbstractResult { + /** + * Returns the query result, in byte array, for the given column family and + * qualifier + * + * @param family + * @param qualifier + * @return query result + */ + byte[] getVal(byte[] family, byte[] qualifier); + + /** + * Returns the row key of this result + * + * @return row key + */ + byte[] getRowKey(); + + /** + * Return the column family of the "current" element in a result. "Current" + * element is the first element in the result, unless next() method has been + * called. + * + * @return column family of current element + */ + byte[] getFamily(); + + /** + * Return the column qualifier of the "current" element in a result. "Current" + * element is the first element in the result, unless next() method has been + * called. + * + * @return column qualifier of current element + */ + byte[] getQualifier(); + + /** + * Move the "current" poiter to the next element in the result + * + * @return the next element in the result, wrapped as an abstract result object + * @throws IOException + */ + AbstractResult next() throws IOException; + + /** + * Returns if this abstract result object is empty + * + * @return true if the result is empty, otherwise false + */ + boolean isEmpty(); + } + + interface AbstractScan { + /** + * Move the "current" pointer to the next abstract result in the results of + * a scan + * + * @return the next abstract result in the scan, wrapped as an abstract + * result object + * @throws IOException + */ + AbstractResult next() throws IOException; + + /** + * Cleaning up the scan object if a persistent connection has been established + */ + void cleanUp(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/HBaseTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/HBaseTimelineStore.java new file mode 100644 index 0000000..aec96ff --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/HBaseTimelineStore.java @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.ColumnRangeFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timeline.AbstractTimelineStore.AbstractResult; +import org.apache.hadoop.yarn.server.timeline.AbstractTimelineStore.AbstractPut; +import org.apache.hadoop.yarn.server.timeline.AbstractTimelineStore.AbstractScan; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; + +import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; +import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong; + +import com.google.common.annotations.VisibleForTesting; + +public class HBaseTimelineStore extends AbstractTimelineStore { + + static final Log LOG = LogFactory.getLog(HBaseTimelineStore.class); + + public static final String HBASE_TTL_PROPERTY = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "hbase-ttl"; + public static final String HBASE_MASTER_PROPERTY = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "hbase-master"; + + private HConnection connection; + private HBaseAdmin hbase; + private int ttl = DEFAULT_TTL; + + private Map openTables; + + public HBaseTimelineStore() { + super(HBaseTimelineStore.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + // Set up hbase connection and ttl + hbase = initHBase(conf); + ttl = conf.getInt(HBASE_TTL_PROPERTY, DEFAULT_TTL); + + super.serviceInit(conf); + } + + @Override + protected void serviceStop() throws Exception { + IOUtils.cleanup(LOG, connection); + super.serviceStop(); + } + + @VisibleForTesting + protected HBaseAdmin initHBase(Configuration conf) throws IOException { + String master = conf.get(HBASE_MASTER_PROPERTY); + // Use HBase default master setting unless specified + Configuration hbaseConf = HBaseConfiguration.create(); + if (master != null) { + hbaseConf.set("hbase.master", master); + } + + connection = HConnectionManager.createConnection(hbaseConf); + return new HBaseAdmin(hbaseConf); + } + + @VisibleForTesting + protected HTableInterface getTable(String tableName) throws IOException { + if (openTables == null) { + openTables = new HashMap(); + } + // TODO: Currently, for each table, can only support one opened instance + // for each store instance + HTableInterface table = openTables.get(tableName); + if (table != null) { + LOG.error("More than one active instances for the same table! "); + } else { + table = connection.getTable(tableName); + openTables.put(tableName, table); + } + return table; + } + + @Override + protected void checkAndMakeTable(String tableName, + List columnFamilies) throws Exception { + TableName table = TableName.valueOf(tableName); + if (!hbase.tableExists(table)) { + HTableDescriptor desc = new HTableDescriptor(table); + for (byte[] col : columnFamilies) { + desc.addFamily(createFamily(col, ttl)); + } + hbase.createTable(desc); + LOG.info("Created hbase table " + tableName); + } + } + + @Override + protected AbstractResult getColumn(String tableName, byte[] key, + byte[] columnFamily, byte[] qualifier) throws IOException { + HTableInterface table = getTable(tableName); + Get get = new Get(key); + get.addColumn(columnFamily, qualifier); + Result result = table.get(get); + return new HBaseGetResult(result); + } + + @Override + protected AbstractResult getFamilies(String tableName, byte[] key, + List columnFamilies) throws IOException { + HTableInterface table = getTable(tableName); + Get get = new Get(key); + for (byte[] col : columnFamilies) { + get.addFamily(col); + } + Result result = table.get(get); + return new HBaseGetResult(result); + } + + @Override + protected void batchCommitPuts(List puts) throws IOException { + // Return on an empty changelist + if ((puts == null) || (puts.size() == 0)) { + return; + } + + String currTable = null; + // Create a hbase changelist + List hbasePuts = new ArrayList(puts.size()); + for (AbstractPut p : puts) { + // Throw an exception on any type mismatching. + HBasePut hp = (HBasePut) p; + if (currTable == null) { + currTable = hp.tableName; + } else { + // All puts should operate on the same HBase table + if (!currTable.equals(hp.tableName)) { + LOG.error("More than one table in one batch commit! Abort. "); + return; + } + } + hbasePuts.add(hp.put); + } + + // Commit changes + HTableInterface table = getTable(currTable); + table.put(hbasePuts); + table.flushCommits(); + return; + } + + @Override + protected void cleanUp(String tableName) { + if (openTables != null) { + HTableInterface table = openTables.get(tableName); + openTables.remove(tableName); + IOUtils.cleanup(LOG, table); + } + } + + @Override + protected boolean testAndPut(String tableName, byte[] row, + byte[] family, byte[] qualifier, byte[] value, AbstractPut put) + throws IOException { + // Throw an exception on any type mismatching. + HBasePut hp = (HBasePut) put; + Put p = hp.put; + HTableInterface table = getTable(tableName); + boolean isSuccessful = table.checkAndPut(row, family, qualifier, value, p); + if (isSuccessful) { + table.flushCommits(); + } + return isSuccessful; + } + + @Override + protected AbstractPut createPut(String tableName, byte[] key) + throws IOException { + return new HBasePut(tableName, key); + } + + @Override + protected AbstractScan scan(String tableName, + byte[] begin, byte[] end, List families) throws IOException { + Scan scan = new Scan(begin, end); + for (byte[] family : families) { + scan.addFamily(family); + } + HTableInterface table = getTable(tableName); + ResultScanner rs = table.getScanner(scan); + return new HBaseScanResult(rs); + } + + class HBaseGetResult implements AbstractResult { + + private Result result; + CellScanner cScanner; + + public HBaseGetResult(Result r) { + result = r; + cScanner = r.cellScanner(); + } + + @Override + public AbstractResult next() throws IOException { + boolean flag = cScanner.advance(); + if (flag) { + return this; + } else { + // Finished, indicated by flag + return null; + } + } + + @Override + public byte[] getVal(byte[] colFamilyName, byte[] qualifier) { + return result.getValue(colFamilyName, qualifier); + } + + @Override + public byte[] getRowKey() { + return result.getRow(); + } + + @Override + public byte[] getFamily() { + Cell currCell = cScanner.current(); + return CellUtil.cloneFamily(currCell); + } + + @Override + public byte[] getQualifier() { + Cell currCell = cScanner.current(); + return CellUtil.cloneQualifier(currCell); + } + + @Override + public boolean isEmpty() { + return (result != null) && result.isEmpty(); + } + } + + class HBasePut implements AbstractPut { + private String tableName; + private Put put; + private byte[] key; + + public HBasePut(String tn, byte[] k) { + tableName = tn; + key = k; + put = new Put(k); + } + + @Override + public void addVal(byte[] family, byte[] qualifier, byte[] value) { + if (put != null) { + put.add(family, qualifier, value); + } + } + + @Override + public void commit() throws IOException { + HTableInterface table = getTable(tableName); + table.put(put); + table.flushCommits(); + } + + @Override + public boolean isEmpty() { + return (put != null) && put.isEmpty(); + } + } + + class HBaseScanResult implements AbstractScan { + private ResultScanner resultScanner; + + public HBaseScanResult(ResultScanner rs) { + resultScanner = rs; + } + + @Override + public HBaseGetResult next() throws IOException { + Result nextResult = resultScanner.next(); + if (nextResult == null) { + return null; + } + return new HBaseGetResult(nextResult); + } + + @Override + public void cleanUp() { + if (resultScanner != null) { + IOUtils.cleanup(LOG, resultScanner); + } + } + } + + private HColumnDescriptor createFamily(byte[] b, int ttl) { + HColumnDescriptor column = new HColumnDescriptor(b); + column.setTimeToLive(ttl); + return column; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityKeyBuilder.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityKeyBuilder.java new file mode 100644 index 0000000..170bdaf --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityKeyBuilder.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static org.apache.hadoop.yarn.server.timeline.AbstractTimelineStore.UTF8_CHARSET; +import static org.apache.hadoop.yarn.server.timeline.AbstractTimelineStore.NULL_BYTE; +import static org.apache.hadoop.yarn.server.timeline.AbstractTimelineStore.ONE_BYTE; + +/** + * Key builder class for timeline storage with key-value storage interface + */ +class TimelineEntityKeyBuilder { + + public static byte[] createStartTimeRow(String entityId, String entityType) + throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(entityId.length() + + entityType.length() + 1); + baos.write(entityType.getBytes(UTF8_CHARSET)); + baos.write(NULL_BYTE); + baos.write(entityId.getBytes(UTF8_CHARSET)); + return baos.toByteArray(); + } + + public static byte[] createEntityRow(String entityId, String entityType, + byte[] revStartTime) throws IOException { + return createEntityRow(entityId, entityType, revStartTime, false); + } + + public static byte[] createEntityRow(String entityId, String entityType, + byte[] revStartTime, boolean forLookup) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream( + entityId.length() + entityType.length() + revStartTime.length + 1); + baos.write(entityType.getBytes(UTF8_CHARSET)); + baos.write(NULL_BYTE); + baos.write(revStartTime); + baos.write(entityId.getBytes(UTF8_CHARSET)); + if (forLookup) { + baos.write(NULL_BYTE); + } + return baos.toByteArray(); + } + + public static byte[] createEntityStartOrEndRow(String entityType, + byte[] revStartTime) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream( + entityType.length() + revStartTime.length + 1); + baos.write(entityType.getBytes(UTF8_CHARSET)); + baos.write(NULL_BYTE); + baos.write(revStartTime); + return baos.toByteArray(); + } + + public static byte[] createEntityTypeEndRow(String entityType) + throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream( + entityType.length() + 1); + baos.write(entityType.getBytes(UTF8_CHARSET)); + baos.write(ONE_BYTE); + return baos.toByteArray(); + } + + public static byte[] createIndexRow(String name, Object value, + byte[] entityRow) throws IOException { + byte[] mappedValue = GenericObjectMapper.write(value); + ByteArrayOutputStream baos = new ByteArrayOutputStream( + name.length() + mappedValue.length + entityRow.length + 2); + baos.write(name.getBytes(UTF8_CHARSET)); + baos.write(NULL_BYTE); + baos.write(mappedValue); + baos.write(NULL_BYTE); + baos.write(entityRow); + return baos.toByteArray(); + } + + public static byte[] createEventColumnQualifier(byte[] revTimestamp, + String eventType) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream( + revTimestamp.length + eventType.length()); + baos.write(revTimestamp); + baos.write(eventType.getBytes(UTF8_CHARSET)); + return baos.toByteArray(); + } + + public static byte[] createRelatedEntityColumnQualifier(String entityId, + String entityType) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(entityId.length() + + entityType.length() + 1); + baos.write(entityType.getBytes(UTF8_CHARSET)); + baos.write(NULL_BYTE); + baos.write(entityId.getBytes(UTF8_CHARSET)); + return baos.toByteArray(); + } + + public static byte[] createPrimaryFilterColumnQualifier( + String primaryFilterName, Object primaryFilterValue) throws IOException { + byte[] mappedValue = GenericObjectMapper.write(primaryFilterValue); + ByteArrayOutputStream baos = new ByteArrayOutputStream( + primaryFilterName.length() + mappedValue.length + 1); + baos.write(primaryFilterName.getBytes(UTF8_CHARSET)); + baos.write(NULL_BYTE); + baos.write(mappedValue); + return baos.toByteArray(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityParser.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityParser.java new file mode 100644 index 0000000..2e726a9 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineEntityParser.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timeline; + +import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.server.timeline.AbstractTimelineStore.AbstractResult; +import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; + +import static org.apache.hadoop.yarn.server.timeline.AbstractTimelineStore.UTF8_CHARSET; +import static org.apache.hadoop.yarn.server.timeline.AbstractTimelineStore.NULL_BYTE; +import static org.apache.hadoop.yarn.server.timeline.AbstractTimelineStore.ONE_BYTE; + +/** + * Entity parser class for timeline storage with key-value storage interface + */ +public class TimelineEntityParser { + + public static TimelineEntity parseEntityRow(byte[] b, int offset, + int length) { + String entityType = getString(b, offset, length); + long startTime = GenericObjectMapper.readReverseOrderedLong(b, + offset + entityType.length() + 1); + String entityId = getString(b, offset + entityType.length() + 9, + length - entityType.length() - 9); + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(entityId); + entity.setEntityType(entityType); + entity.setStartTime(startTime); + return entity; + } + + public static boolean parseEntityFromResult(TimelineEntity entity, + AbstractResult result, EnumSet fieldsToRetrieve) + throws IOException { + if (!fieldsToRetrieve.contains(Field.EVENTS) && + !fieldsToRetrieve.contains(Field.LAST_EVENT_ONLY)) { + entity.setEvents(null); + } + if (!fieldsToRetrieve.contains(Field.RELATED_ENTITIES)) { + entity.setRelatedEntities(null); + } + if (!fieldsToRetrieve.contains(Field.PRIMARY_FILTERS)) { + entity.setPrimaryFilters(null); + } + if (!fieldsToRetrieve.contains(Field.OTHER_INFO)) { + entity.setOtherInfo(null); + } + + boolean lastEventOnly = fieldsToRetrieve.contains(Field.LAST_EVENT_ONLY) && + !fieldsToRetrieve.contains(Field.EVENTS); + boolean haveEvent = false; + System.out.println(fieldsToRetrieve); + int counter = 0; + for (AbstractResult cell = result.next(); cell != null; cell = result.next()) { + counter++; + AbstractTimelineStore.LOG.info("cell "+cell); + byte[] family = cell.getFamily(); + final byte firstByteOfFamily = family[0]; + AbstractTimelineStore.LOG.info("family "+((char) family[0])); + byte[] qualifier = cell.getQualifier(); + byte[] value = cell.getVal(family, qualifier); + if (firstByteOfFamily == AbstractTimelineStore.EVENTS_COLUMN[0]) { + if (lastEventOnly && haveEvent) { + continue; + } + AbstractTimelineStore.LOG.warn("event "+qualifier.length+" "+value.length); + addEvent(entity, qualifier, value); + haveEvent = true; + } else if (firstByteOfFamily + == AbstractTimelineStore.RELATED_ENTITIES_COLUMN[0]) { + addRelatedEntity(entity, qualifier); + } else if (firstByteOfFamily + == AbstractTimelineStore.PRIMARY_FILTERS_COLUMN[0]) { + if (fieldsToRetrieve.contains(Field.PRIMARY_FILTERS)) { + AbstractTimelineStore.LOG.warn("filter "+qualifier); + addPrimaryFilter(entity, qualifier); + } + } else if (firstByteOfFamily + == AbstractTimelineStore.OTHER_INFO_COLUMN[0]) { + if (fieldsToRetrieve.contains(Field.OTHER_INFO)) { + addOtherInfo(entity, qualifier, value); + } + } else if (firstByteOfFamily + == AbstractTimelineStore.INSERT_TIME_COLUMN[0]) { + // Do nothing for insert time + } else { + AbstractTimelineStore.LOG.warn( + "Found unexpected column family starting with " + + (char)firstByteOfFamily); + } + } + AbstractTimelineStore.LOG.warn("parser returns with counter "+counter); + return (counter != 0); + } + + public static TimelineEvent getEvent(byte[] b, int offset, + int length, byte[] value, int valueOffset, int valueLength) + throws IOException { + TimelineEvent timelineEvent = new TimelineEvent(); + timelineEvent.setTimestamp(GenericObjectMapper.readReverseOrderedLong(b, + offset)); + timelineEvent.setEventType(new String(b, offset + 8, length - 8, + UTF8_CHARSET)); + if (valueLength != 0) { + @SuppressWarnings("unchecked") + Map eventInfo = + (Map) GenericObjectMapper.read(value, valueOffset); + timelineEvent.setEventInfo(eventInfo); + } else { + timelineEvent.setEventInfo(null); + } + return timelineEvent; + } + + protected static void addEvent(TimelineEntity entity, byte[] b, byte[] value) + throws IOException { + entity.addEvent(getEvent(b, 0, b.length, value, 0, value.length)); + } + + protected static void addRelatedEntity(TimelineEntity entity, byte[] qualifier) { + String relatedEntityType = getString(qualifier, 0, qualifier.length); + String relatedEntityId = getString(qualifier, + relatedEntityType.length() + 1, + qualifier.length - relatedEntityType.length() - 1); + entity.addRelatedEntity(relatedEntityType, relatedEntityId); + } + + protected static void addPrimaryFilter(TimelineEntity entity, byte[] qualifier) throws IOException { + String primaryFilterName = getString(qualifier, 0, qualifier.length); + Object primaryFilterValue = GenericObjectMapper.read(qualifier, + primaryFilterName.length() + 1); + entity.addPrimaryFilter(primaryFilterName, primaryFilterValue); + } + + protected static void addOtherInfo(TimelineEntity entity, byte[] qualifier, byte[] value) throws IOException { + entity.addOtherInfo(new String(qualifier, 0, qualifier.length, UTF8_CHARSET), + GenericObjectMapper.read(value, 0)); + } + + private static String getString(byte[] b, int offset, int length) { + int i = 0; + while (i < length && b[offset+i] != 0x0) { + i++; + } + return new String(b, offset, i, UTF8_CHARSET); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestHBaseTimelineStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestHBaseTimelineStore.java new file mode 100644 index 0000000..4015ea8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestHBaseTimelineStore.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timeline.HBaseTimelineStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.Ignore; + +import java.io.IOException; + +public class TestHBaseTimelineStore extends TimelineStoreTestUtils { + private class HBaseTestStore extends HBaseTimelineStore { + @Override + protected HBaseAdmin initHBase(Configuration conf) throws IOException { + return utility.getHBaseAdmin(); + } + + @Override + protected HTableInterface getTable(String tableName) throws IOException { + return new HTable(utility.getConfiguration(), tableName); + } + } + + private HBaseTestingUtility utility; + + @Before + public void setup() throws Exception { + utility = new HBaseTestingUtility(); + utility.startMiniCluster(); + + store = new HBaseTestStore(); + store.init(new YarnConfiguration()); + store.start(); + loadTestData(); + loadVerificationData(); + } + + @After + public void tearDown() throws Exception { + store.stop(); + utility.shutdownMiniCluster(); + } + + @Ignore("Not work with trunk branch, should work with branch2") + @Test + public void test() throws IOException { + // all tests are in the same method so that the hbase minicluster is only + // started once + super.testGetSingleEntity(); + super.testGetEntities(); + super.testGetEntitiesWithPrimaryFilters(); + super.testGetEntitiesWithSecondaryFilters(); + super.testGetEvents(); + // execute tests for fromId and fromTs once implemented + super.testGetEntitiesWithFromId(); + super.testGetEntitiesWithFromTs(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestHBaseTimelineStoreUtil.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestHBaseTimelineStoreUtil.java new file mode 100644 index 0000000..6c64da6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestHBaseTimelineStoreUtil.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.server.timeline.TimelineEntityKeyBuilder; +import org.apache.hadoop.yarn.server.timeline.TimelineEntityParser; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestHBaseTimelineStoreUtil { + @Test + public void testEntityRow() throws IOException { + byte[] b = TimelineEntityKeyBuilder.createEntityRow("id", "type", + GenericObjectMapper.writeReverseOrderedLong(123l)); + TimelineEntity entity = TimelineEntityParser.parseEntityRow(b, 0, + b.length); + assertEquals("id", entity.getEntityId()); + assertEquals("type", entity.getEntityType()); + assertEquals((Long)123l, entity.getStartTime()); + } + + @Test + public void testEvent() throws IOException { + byte[] b = TimelineEntityKeyBuilder.createEventColumnQualifier + (GenericObjectMapper.writeReverseOrderedLong(123l), "type"); + Map eventInfo = new HashMap(); + eventInfo.put("key", "value"); + byte[] value = GenericObjectMapper.write(eventInfo); + TimelineEntity entity = new TimelineEntity(); + TimelineEntityParser.addEvent(entity, b, null); + TimelineEntityParser.addEvent(entity, b, value); + assertEquals(2, entity.getEvents().size()); + assertEquals(123l, entity.getEvents().get(0).getTimestamp()); + assertEquals("type", entity.getEvents().get(0).getEventType()); + assertNull(entity.getEvents().get(0).getEventInfo()); + assertEquals(123l, entity.getEvents().get(1).getTimestamp()); + assertEquals("type", entity.getEvents().get(1).getEventType()); + assertEquals(eventInfo, entity.getEvents().get(1).getEventInfo()); + } + + @Test + public void testRelatedEntity() throws IOException { + byte[] b = TimelineEntityKeyBuilder.createRelatedEntityColumnQualifier("id", + "type"); + TimelineEntity entity = new TimelineEntity(); + TimelineEntityParser.addRelatedEntity(entity, b); + assertEquals(1, entity.getRelatedEntities().size()); + assertTrue(entity.getRelatedEntities().containsKey("type")); + assertEquals(1, entity.getRelatedEntities().get("type").size()); + assertTrue(entity.getRelatedEntities().get("type").contains("id")); + } + + @Test + public void testPrimaryFilter() throws IOException { + byte[] b1 = TimelineEntityKeyBuilder.createPrimaryFilterColumnQualifier( + "name", "value"); + byte[] b2 = TimelineEntityKeyBuilder.createPrimaryFilterColumnQualifier( + "name", 123); + TimelineEntity entity = new TimelineEntity(); + TimelineEntityParser.addPrimaryFilter(entity, b1); + TimelineEntityParser.addPrimaryFilter(entity, b2); + assertEquals(1, entity.getPrimaryFilters().size()); + assertTrue(entity.getPrimaryFilters().containsKey("name")); + assertEquals(2, entity.getPrimaryFilters().get("name").size()); + assertTrue(entity.getPrimaryFilters().get("name").contains("value")); + assertTrue(entity.getPrimaryFilters().get("name").contains(123)); + } + +}