e, Scan scan,
+ RegionScanner scanner) throws IOException {
+ return new FlowScanner(region, scan.getBatch(), scanner);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
new file mode 100644
index 0000000..b2b423c
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+
+/**
+ * Represents a rowkey for the flow run table.
+ */
+public class FlowRunRowKey {
+ // TODO: more methods are needed for this class like parse row key
+
+ /**
+ * Constructs a row key for the entity table as follows: {
+ * clusterId!userI!flowId!Inverted Flow Run Id}
+ *
+ * @param clusterId
+ * @param userId
+ * @param flowId
+ * @param flowRunId
+ * @return byte array with the row key
+ */
+ public static byte[] getRowKey(String clusterId, String userId,
+ String flowId, Long flowRunId) {
+ byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId,
+ userId, flowId));
+ // Note that flowRunId is a long, so we can't encode them all at the same
+ // time.
+ byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+ return Separator.QUALIFIERS.join(first, second);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
new file mode 100644
index 0000000..b1b93c1
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+
+/**
+ * The flow run table has column family info
+ * Stores per flow run information
+ * aggregated across applications.
+ *
+ * Metrics are also stored in the info column family.
+ *
+ * Example flow run table record:
+ *
+ *
+ * flow_run table
+ * |-------------------------------------------|
+ * | Row key | Column Family |
+ * | | info |
+ * |-------------------------------------------|
+ * | clusterId! | flow_version:version7 |
+ * | userName! | |
+ * | flowId! | running_apps:1 |
+ * | flowRunId | |
+ * | | min_start_time:1392995080000 |
+ * | | #0:"" |
+ * | | |
+ * | | min_start_time:1392995081012 |
+ * | | #0:appId2 |
+ * | | |
+ * | | min_start_time:1392993083210 |
+ * | | #0:appId3 |
+ * | | |
+ * | | |
+ * | | max_end_time:1392993084018 |
+ * | | #0:"" |
+ * | | |
+ * | | |
+ * | | m!mapInputRecords:127 |
+ * | | #0:"" |
+ * | | |
+ * | | m!mapInputRecords:31 |
+ * | | #2:appId2 |
+ * | | |
+ * | | m!mapInputRecords:37 |
+ * | | #1:appId3 |
+ * | | |
+ * | | |
+ * | | m!mapOutputRecords:181 |
+ * | | #0:"" |
+ * | | |
+ * | | m!mapOutputRecords:37 |
+ * | | #1:appId3 |
+ * | | |
+ * | | |
+ * |-------------------------------------------|
+ *
+ */
+public class FlowRunTable extends BaseTable {
+ /** entity prefix */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun";
+
+ /** config param name that specifies the flowrun table name */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /** default value for flowrun table name */
+ public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun";
+
+ private static final Log LOG = LogFactory.getLog(FlowRunTable.class);
+
+ /** default max number of versions */
+ public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE;
+
+ public FlowRunTable() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+ * (org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor flowRunTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor infoCF =
+ new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes());
+ infoCF.setBloomFilterType(BloomType.ROWCOL);
+ flowRunTableDescp.addFamily(infoCF);
+ infoCF.setMinVersions(1);
+ infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS);
+
+ // TODO: figure the split policy
+ flowRunTableDescp.addCoprocessor(FlowRunCoprocessor.class
+ .getCanonicalName());
+ admin.createTable(flowRunTableDescp);
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
new file mode 100644
index 0000000..4143b76
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Invoked via the coprocessor when a Get or a Scan is issued for flow run
+ * table. Looks through the list of cells per row, checks their tags and does
+ * operation on those cells as per the cell tags. Transforms reads of the stored
+ * metrics into calculated sums for each column Also, finds the min and max for
+ * start and end times in a flow run.
+ */
+class FlowScanner implements RegionScanner, Closeable {
+
+ private static final Log LOG = LogFactory.getLog(FlowScanner.class);
+
+ private final HRegion region;
+ private final InternalScanner flowRunScanner;
+ private RegionScanner regionScanner;
+ private final int limit;
+ private boolean hasMore;
+ private byte[] currentRow;
+ private List availableCells = new ArrayList<>();
+ private int currentIndex;
+
+ FlowScanner(HRegion region, int limit, InternalScanner internalScanner) {
+ this.region = region;
+ this.limit = limit;
+ this.flowRunScanner = internalScanner;
+ if (internalScanner instanceof RegionScanner) {
+ this.regionScanner = (RegionScanner) internalScanner;
+ }
+ // TODO: note if it's compaction/flush
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
+ */
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ @Override
+ public boolean nextRaw(List cells) throws IOException {
+ return nextRaw(cells, limit);
+ }
+
+ @Override
+ public boolean nextRaw(List cells, int limit) throws IOException {
+ return nextInternal(cells, limit);
+ }
+
+ @Override
+ public boolean next(List cells) throws IOException {
+ return next(cells, limit);
+ }
+
+ @Override
+ public boolean next(List cells, int limit) throws IOException {
+ return nextInternal(cells, limit);
+ }
+
+ private String getAggregationCompactionDimension(List tags) {
+ String appId = null;
+ for (Tag t : tags) {
+ if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
+ .getType()) {
+ appId = Bytes.toString(t.getValue());
+ }
+ }
+ return appId;
+ }
+
+ /**
+ * This method loops through the cells in a given row of the
+ * {@link FlowRunTable}. It looks at the tags of each cell to figure out how
+ * to process the contents. It then calculates the sum or min or max for each
+ * column or returns the cell as is.
+ *
+ * @param cells
+ * @param limit
+ * @return true if next row is available for the scanner, false otherwise
+ * @throws IOException
+ */
+ private boolean nextInternal(List cells, int limit) throws IOException {
+ Cell cell = null;
+ startNext();
+ // Loop through all the cells in this row
+ // For min/max/metrics we do need to scan the entire set of cells to get the
+ // right one
+ // But with flush/compaction, the number of cells being scanned will go down
+ // cells are grouped per column qualifier then sorted by cell timestamp
+ // (latest to oldest) per column qualifier
+ // So all cells in one qualifier come one after the other before we see the
+ // next column qualifier
+ ByteArrayComparator comp = new ByteArrayComparator();
+ byte[] currentColumnQualifier = TimelineWriterUtils.EMPTY_BYTES;
+ AggregationOperation currentAggOp = null;
+ SortedSet currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
+ Set alreadySeenAggDim = new HashSet<>();
+ int addedCnt = 0;
+ while (((cell = peekAtNextCell(limit)) != null)
+ && (limit <= 0 || addedCnt < limit)) {
+ byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
+ if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
+ addedCnt += emitCells(cells, currentColumnCells, currentAggOp);
+ resetState(currentColumnCells, alreadySeenAggDim);
+ currentColumnQualifier = newColumnQualifier;
+ currentAggOp = getCurrentAggOp(cell);
+ }
+ collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim);
+ nextCell(limit);
+ }
+ if (!currentColumnCells.isEmpty()) {
+ emitCells(cells, currentColumnCells, currentAggOp);
+ }
+ return hasMore();
+ }
+
+ private AggregationOperation getCurrentAggOp(Cell cell) {
+ List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ // We assume that all the operations for a particular column are the same
+ return TimelineWriterUtils.getAggregationOperationFromTagsList(tags);
+ }
+
+ /**
+ * resets the parameters to an intialized state for next loop iteration
+ *
+ * @param cell
+ * @param currentAggOp
+ * @param currentColumnCells
+ * @param alreadySeenAggDim
+ * @param collectedButNotEmitted
+ */
+ private void resetState(SortedSet currentColumnCells,
+ Set alreadySeenAggDim) {
+ currentColumnCells.clear();
+ alreadySeenAggDim.clear();
+ }
+
+ private void collectCells(SortedSet currentColumnCells,
+ AggregationOperation currentAggOp, Cell cell,
+ Set alreadySeenAggDim) throws IOException {
+ if (currentAggOp == null) {
+ // not a min/max/metric cell, so just return it as is
+ currentColumnCells.add(cell);
+ nextCell(limit);
+ return;
+ }
+
+ switch (currentAggOp) {
+ case MIN:
+ if (currentColumnCells.size() == 0) {
+ currentColumnCells.add(cell);
+ } else {
+ Cell currentMinCell = currentColumnCells.first();
+ Cell newMinCell = compareCellValues(currentMinCell, cell, currentAggOp);
+ if (!currentMinCell.equals(newMinCell)) {
+ currentColumnCells.remove(currentMinCell);
+ currentColumnCells.add(newMinCell);
+ }
+ }
+ break;
+ case MAX:
+ if (currentColumnCells.size() == 0) {
+ currentColumnCells.add(cell);
+ } else {
+ Cell currentMaxCell = currentColumnCells.first();
+ Cell newMaxCell = compareCellValues(currentMaxCell, cell, currentAggOp);
+ if (!currentMaxCell.equals(newMaxCell)) {
+ currentColumnCells.remove(currentMaxCell);
+ currentColumnCells.add(newMaxCell);
+ }
+ }
+ break;
+ case SUM:
+ case SUM_FINAL:
+ // only if this app has not been seen yet, add to current column cells
+ List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ String aggDim = getAggregationCompactionDimension(tags);
+ if (alreadySeenAggDim.contains(aggDim)) {
+ // if this agg dimension has already been seen,
+ // since they show up in sorted order
+ // we drop the rest which are older
+ // in other words, this cell is older than previously seen cells
+ // for that agg dim
+ } else {
+ // not seen this agg dim, hence consider this cell in our working set
+ currentColumnCells.add(cell);
+ alreadySeenAggDim.add(aggDim);
+ }
+ break;
+ default:
+ break;
+ } // end of switch case
+ }
+
+ /*
+ * Processes the cells in input param currentColumnCells and populates
+ * List cells as the output based on the input AggregationOperation
+ * parameter.
+ */
+ private int emitCells(List cells, SortedSet currentColumnCells,
+ AggregationOperation currentAggOp) throws IOException {
+ if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
+ return 0;
+ }
+ if (currentAggOp == null) {
+ cells.addAll(currentColumnCells);
+ return currentColumnCells.size();
+ }
+
+ switch (currentAggOp) {
+ case MIN:
+ case MAX:
+ cells.addAll(currentColumnCells);
+ return currentColumnCells.size();
+ case SUM:
+ case SUM_FINAL:
+ Cell sumCell = processSummation(currentColumnCells);
+ cells.add(sumCell);
+ return 1;
+ default:
+ cells.addAll(currentColumnCells);
+ return currentColumnCells.size();
+ }
+ }
+
+ /*
+ * Returns a cell whose value is the sum of all cell values in the input set.
+ * The new cell created has the timestamp of the most recent metric cell. The
+ * sum of a metric for a flow run is the summation at the point of the last
+ * metric update in that flow till that time.
+ */
+ private Cell processSummation(SortedSet| currentColumnCells)
+ throws IOException {
+ Number sum = 0;
+ Number currentValue = 0;
+ long ts = 0L;
+ long mostCurrentTimestamp = 0l;
+ Cell mostRecentCell = null;
+ for (Cell cell : currentColumnCells) {
+ currentValue = (Number) GenericObjectMapper.read(CellUtil
+ .cloneValue(cell));
+ ts = cell.getTimestamp();
+ if (mostCurrentTimestamp < ts) {
+ mostCurrentTimestamp = ts;
+ mostRecentCell = cell;
+ }
+ sum = sum.longValue() + currentValue.longValue();
+ }
+ Cell sumCell = createNewCell(mostRecentCell, sum);
+ return sumCell;
+ }
+
+ /**
+ * Determines which cell is to be returned based on the values in each cell
+ * and the comparison operation MIN or MAX.
+ *
+ * @param previouslyChosenCell
+ * @param currentCell
+ * @param currentAggOp
+ * @return the cell which is the min (or max) cell
+ * @throws IOException
+ */
+ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
+ AggregationOperation currentAggOp) throws IOException {
+ if (previouslyChosenCell == null) {
+ return currentCell;
+ }
+ try {
+ long previouslyChosenCellValue = ((Number) GenericObjectMapper
+ .read(CellUtil.cloneValue(previouslyChosenCell))).longValue();
+ long currentCellValue = ((Number) GenericObjectMapper.read(CellUtil
+ .cloneValue(currentCell))).longValue();
+ switch (currentAggOp) {
+ case MIN:
+ if (currentCellValue < previouslyChosenCellValue) {
+ // new value is minimum, hence return this cell
+ return currentCell;
+ } else {
+ // previously chosen value is miniumum, hence return previous min cell
+ return previouslyChosenCell;
+ }
+ case MAX:
+ if (currentCellValue > previouslyChosenCellValue) {
+ // new value is max, hence return this cell
+ return currentCell;
+ } else {
+ // previously chosen value is max, hence return previous max cell
+ return previouslyChosenCell;
+ }
+ default:
+ return currentCell;
+ }
+ } catch (IllegalArgumentException iae) {
+ LOG.error("caught iae during conversion to long ", iae);
+ return currentCell;
+ }
+ }
+
+ private Cell createNewCell(Cell origCell, Number number) throws IOException {
+ byte[] newValue = GenericObjectMapper.write(number);
+ return CellUtil.createCell(CellUtil.cloneRow(origCell),
+ CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
+ origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
+ }
+
+ @Override
+ public void close() throws IOException {
+ flowRunScanner.close();
+ }
+
+ /**
+ * Called to signal the start of the next() call by the scanner.
+ */
+ public void startNext() {
+ currentRow = null;
+ }
+
+ /**
+ * Returns whether or not the underlying scanner has more rows.
+ */
+ public boolean hasMore() {
+ return currentIndex < availableCells.size() ? true : hasMore;
+ }
+
+ /**
+ * Returns the next available cell for the current row and advances the
+ * pointer to the next cell. This method can be called multiple times in a row
+ * to advance through all the available cells.
+ *
+ * @param limit
+ * the limit of number of cells to return if the next batch must be
+ * fetched by the wrapped scanner
+ * @return the next available cell or null if no more cells are available for
+ * the current row
+ * @throws IOException
+ */
+ public Cell nextCell(int limit) throws IOException {
+ Cell cell = peekAtNextCell(limit);
+ if (cell != null) {
+ currentIndex++;
+ }
+ return cell;
+ }
+
+ /**
+ * Returns the next available cell for the current row, without advancing the
+ * pointer. Calling this method multiple times in a row will continue to
+ * return the same cell.
+ *
+ * @param limit
+ * the limit of number of cells to return if the next batch must be
+ * fetched by the wrapped scanner
+ * @return the next available cell or null if no more cells are available for
+ * the current row
+ * @throws IOException
+ */
+ public Cell peekAtNextCell(int limit) throws IOException {
+ if (currentIndex >= availableCells.size()) {
+ // done with current batch
+ availableCells.clear();
+ currentIndex = 0;
+ hasMore = flowRunScanner.next(availableCells, limit);
+ }
+ Cell cell = null;
+ if (currentIndex < availableCells.size()) {
+ cell = availableCells.get(currentIndex);
+ if (currentRow == null) {
+ currentRow = CellUtil.cloneRow(cell);
+ } else if (!CellUtil.matchingRow(cell, currentRow)) {
+ // moved on to the next row
+ // don't use the current cell
+ // also signal no more cells for this row
+ return null;
+ }
+ }
+ return cell;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
+ */
+ @Override
+ public long getMaxResultSize() {
+ if (regionScanner == null) {
+ throw new IllegalStateException(
+ "RegionScanner.isFilterDone() called when the flow "
+ + "scanner's scanner is not a RegionScanner");
+ }
+ return regionScanner.getMaxResultSize();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
+ */
+ @Override
+ public long getMvccReadPoint() {
+ if (regionScanner == null) {
+ throw new IllegalStateException(
+ "RegionScanner.isFilterDone() called when the flow "
+ + "scanner's internal scanner is not a RegionScanner");
+ }
+ return regionScanner.getMvccReadPoint();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
+ */
+ @Override
+ public boolean isFilterDone() throws IOException {
+ if (regionScanner == null) {
+ throw new IllegalStateException(
+ "RegionScanner.isFilterDone() called when the flow "
+ + "scanner's internal scanner is not a RegionScanner");
+ }
+ return regionScanner.isFilterDone();
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
+ */
+ @Override
+ public boolean reseek(byte[] bytes) throws IOException {
+ if (regionScanner == null) {
+ throw new IllegalStateException(
+ "RegionScanner.reseek() called when the flow "
+ + "scanner's internal scanner is not a RegionScanner");
+ }
+ return regionScanner.reseek(bytes);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
index 2875e01..3962341 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java
@@ -53,7 +53,6 @@
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey;
import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
@@ -88,20 +87,15 @@ public static void setupBeforeClass() throws Exception {
}
private static void createSchema() throws IOException {
- new EntityTable()
- .createTable(util.getHBaseAdmin(), util.getConfiguration());
- new AppToFlowTable()
- .createTable(util.getHBaseAdmin(), util.getConfiguration());
- new ApplicationTable()
- .createTable(util.getHBaseAdmin(), util.getConfiguration());
+ TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
}
@Test
public void testWriteApplicationToHBase() throws Exception {
TimelineEntities te = new TimelineEntities();
ApplicationEntity entity = new ApplicationEntity();
- String id = "hello";
- entity.setId(id);
+ String appId = "application_1000178881110_2002";
+ entity.setId(appId);
long cTime = 1425016501000L;
long mTime = 1425026901000L;
entity.setCreatedTime(cTime);
@@ -173,12 +167,12 @@ public void testWriteApplicationToHBase() throws Exception {
String flow = "some_flow_name";
String flowVersion = "AB7822C10F1111";
long runid = 1002345678919L;
- hbi.write(cluster, user, flow, flowVersion, runid, id, te);
+ hbi.write(cluster, user, flow, flowVersion, runid, appId, te);
hbi.stop();
// retrieve the row
byte[] rowKey =
- ApplicationRowKey.getRowKey(cluster, user, flow, runid, id);
+ ApplicationRowKey.getRowKey(cluster, user, flow, runid, appId);
Get get = new Get(rowKey);
get.setMaxVersions(Integer.MAX_VALUE);
Connection conn = ConnectionFactory.createConnection(c1);
@@ -190,11 +184,11 @@ public void testWriteApplicationToHBase() throws Exception {
// check the row key
byte[] row1 = result.getRow();
assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid,
- id));
+ appId));
// check info column family
String id1 = ApplicationColumn.ID.readResult(result).toString();
- assertEquals(id, id1);
+ assertEquals(appId, id1);
Number val =
(Number) ApplicationColumn.CREATED_TIME.readResult(result);
@@ -252,17 +246,17 @@ public void testWriteApplicationToHBase() throws Exception {
assertEquals(metricValues, metricMap);
// read the timeline entity using the reader this time
- TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id,
+ TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
entity.getType(), entity.getId(),
EnumSet.of(TimelineReader.Field.ALL));
Set es1 = hbr.getEntities(user, cluster, flow, runid,
- id, entity.getType(), null, null, null, null, null, null, null,
+ appId, entity.getType(), null, null, null, null, null, null, null,
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
assertNotNull(e1);
assertEquals(1, es1.size());
// verify attributes
- assertEquals(id, e1.getId());
+ assertEquals(appId, e1.getId());
assertEquals(TimelineEntityType.YARN_APPLICATION.toString(),
e1.getType());
assertEquals(cTime, e1.getCreatedTime());
@@ -576,7 +570,7 @@ public void testEvents() throws IOException {
String flow = "other_flow_name";
String flowVersion = "1111F01C2287BA";
long runid = 1009876543218L;
- String appName = "some app name";
+ String appName = "application_123465899910_1001";
hbi.write(cluster, user, flow, flowVersion, runid, appName, entities);
hbi.stop();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
new file mode 100644
index 0000000..3a1ab6d
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+
+/**
+ * Generates the data/entities for the FlowRun and FlowActivity Tables
+ */
+class TestFlowDataGenerator {
+
+ private final static String metric1 = "MAP_SLOT_MILLIS";
+ private final static String metric2 = "HDFS_BYTES_READ";
+
+
+ static TimelineEntity getEntityMetricsApp1() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunMetrics_test";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+
+ // add metrics
+ Set metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map metricValues = new HashMap();
+ long ts = System.currentTimeMillis();
+ metricValues.put(ts - 100000, 2);
+ metricValues.put(ts - 80000, 40);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId(metric2);
+ metricValues = new HashMap();
+ ts = System.currentTimeMillis();
+ metricValues.put(ts - 100000, 31);
+ metricValues.put(ts - 80000, 57);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues);
+ metrics.add(m2);
+
+ entity.addMetrics(metrics);
+ return entity;
+ }
+
+ static TimelineEntity getEntityMetricsApp2() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunMetrics_test";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+ // add metrics
+ Set metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map metricValues = new HashMap();
+ long ts = System.currentTimeMillis();
+ metricValues.put(ts - 100000, 5L);
+ metricValues.put(ts - 80000, 101L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ entity.addMetrics(metrics);
+ return entity;
+ }
+
+ static TimelineEntity getEntity1() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunHello";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 20000000000000L;
+ Long mTime = 1425026901000L;
+ entity.setCreatedTime(cTime);
+ entity.setModifiedTime(mTime);
+ // add metrics
+ Set metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map metricValues = new HashMap();
+ long ts = System.currentTimeMillis();
+ metricValues.put(ts - 120000, 100000000);
+ metricValues.put(ts - 100000, 200000000);
+ metricValues.put(ts - 80000, 300000000);
+ metricValues.put(ts - 60000, 400000000);
+ metricValues.put(ts - 40000, 50000000000L);
+ metricValues.put(ts - 20000, 60000000000L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ entity.addMetrics(metrics);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ Long expTs = 1436512802000L;
+ event.setTimestamp(expTs);
+ String expKey = "foo_event";
+ Object expVal = "test";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+
+ event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ event.setTimestamp(1436512801000L);
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+
+ return entity;
+ }
+
+ static TimelineEntity getEntityGreaterStartTime() {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setCreatedTime(30000000000000L);
+ entity.setId("flowRunHello with greater start time");
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setType(type);
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ long endTs = 1439379885000L;
+ event.setTimestamp(endTs);
+ String expKey = "foo_event_greater";
+ String expVal = "test_app_greater";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+ return entity;
+ }
+
+ static TimelineEntity getEntityMaxEndTime(long endTs) {
+ TimelineEntity entity = new TimelineEntity();
+ entity.setId("flowRunHello Max End time");
+ entity.setType(TimelineEntityType.YARN_APPLICATION.toString());
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ event.setTimestamp(endTs);
+ String expKey = "foo_even_max_ finished";
+ String expVal = "test_app_max_finished";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+ return entity;
+ }
+
+ static TimelineEntity getEntityMinStartTime() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunHelloMInStartTime";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 10000000000000L;
+ entity.setCreatedTime(cTime);
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event.setTimestamp(System.currentTimeMillis());
+ entity.addEvent(event);
+ return entity;
+ }
+
+
+ static TimelineEntity getFlowApp1() {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowActivity_test";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ Long expTs = 1436512802000L;
+ event.setTimestamp(expTs);
+ String expKey = "foo_event";
+ Object expVal = "test";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+
+ return entity;
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
new file mode 100644
index 0000000..f414fc5
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the FlowRun and FlowActivity Tables
+ */
+public class TestHBaseStorageFlowActivity {
+
+ private static HBaseTestingUtility util;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ util = new HBaseTestingUtility();
+ Configuration conf = util.getConfiguration();
+ conf.setInt("hfile.format.version", 3);
+ util.startMiniCluster();
+ createSchema();
+ }
+
+ private static void createSchema() throws IOException {
+ TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+ }
+
+ /**
+ * Writes 4 timeline entities belonging to one flow run through the
+ * {@link HBaseTimelineWriterImpl}
+ *
+ * Checks the flow run table contents
+ *
+ * The first entity has a created event, metrics and a finish event.
+ *
+ * The second entity has a created event and this is the entity with smallest
+ * start time. This should be the start time for the flow run.
+ *
+ * The third entity has a finish event and this is the entity with the max end
+ * time. This should be the end time for the flow run.
+ *
+ * The fourth entity has a created event which has a start time that is
+ * greater than min start time.
+ *
+ * The test also checks in the flow activity table that one entry has been
+ * made for all of these 4 application entities since they belong to the same
+ * flow run.
+ */
+ @Test
+ public void testWriteFlowRunMinMax() throws Exception {
+
+ TimelineEntities te = new TimelineEntities();
+ te.addEntity(TestFlowDataGenerator.getEntity1());
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
+ String user = "testWriteFlowRunMinMaxToHBase_user1";
+ String flow = "testing_flowRun_flow_name";
+ String flowVersion = "CF7022C10F1354";
+ Long runid = 1002345678919L;
+ String appName = "application_100000000000_1111";
+ long endTs = 1439750690000L;
+ TimelineEntity entityMinStartTime = TestFlowDataGenerator
+ .getEntityMinStartTime();
+
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // write another entity with the right min start time
+ te = new TimelineEntities();
+ te.addEntity(entityMinStartTime);
+ appName = "application_100000000000_3333";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // writer another entity for max end time
+ TimelineEntity entityMaxEndTime = TestFlowDataGenerator
+ .getEntityMaxEndTime(endTs);
+ te = new TimelineEntities();
+ te.addEntity(entityMaxEndTime);
+ appName = "application_100000000000_4444";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // writer another entity with greater start time
+ TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
+ .getEntityGreaterStartTime();
+ te = new TimelineEntities();
+ te.addEntity(entityGreaterStartTime);
+ appName = "application_1000000000000000_2222";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // flush everything to hbase
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+
+ Connection conn = ConnectionFactory.createConnection(c1);
+ // check in flow activity table
+ Table table1 = conn.getTable(TableName
+ .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+ byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+ Get g = new Get(startRow);
+ Result r1 = table1.get(g);
+ assertNotNull(r1);
+ assertTrue(!r1.isEmpty());
+ Map values = r1.getFamilyMap(FlowActivityColumnFamily.INFO
+ .getBytes());
+ assertEquals(1, values.size());
+ byte[] row = r1.getRow();
+ FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey.parseRowKey(row);
+ assertNotNull(flowActivityRowKey);
+ assertEquals(cluster, flowActivityRowKey.getClusterId());
+ assertEquals(user, flowActivityRowKey.getUserId());
+ assertEquals(flow, flowActivityRowKey.getFlowId());
+ long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+ .currentTimeMillis());
+ assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+ assertEquals(1, values.size());
+ checkFlowActivityRunId(runid, flowVersion, values);
+ }
+
+ /**
+ * Write 1 application entity and checks the record for today in the flow
+ * activity table
+ */
+ @Test
+ public void testWriteFlowActivityOneFlow() throws Exception {
+ String cluster = "testWriteFlowActivityOneFlow_cluster1";
+ String user = "testWriteFlowActivityOneFlow_user1";
+ String flow = "flow_activity_test_flow_name";
+ String flowVersion = "A122110F135BC4";
+ Long runid = 1001111178919L;
+
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
+ te.addEntity(entityApp1);
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ String appName = "application_1111999999_1234";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+ // check flow activity
+ checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1);
+ }
+
+ private void checkFlowActivityTable(String cluster, String user, String flow,
+ String flowVersion, Long runid, Configuration c1) throws IOException {
+ Scan s = new Scan();
+ s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
+ byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+ s.setStartRow(startRow);
+ String clusterStop = cluster + "1";
+ byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
+ s.setStopRow(stopRow);
+ Connection conn = ConnectionFactory.createConnection(c1);
+ Table table1 = conn.getTable(TableName
+ .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+ ResultScanner scanner = table1.getScanner(s);
+ int rowCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ Map values = result
+ .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
+ rowCount++;
+ byte[] row = result.getRow();
+ FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
+ .parseRowKey(row);
+ assertNotNull(flowActivityRowKey);
+ assertEquals(cluster, flowActivityRowKey.getClusterId());
+ assertEquals(user, flowActivityRowKey.getUserId());
+ assertEquals(flow, flowActivityRowKey.getFlowId());
+ long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+ .currentTimeMillis());
+ assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+ assertEquals(1, values.size());
+ checkFlowActivityRunId(runid, flowVersion, values);
+ }
+ assertEquals(1, rowCount);
+ }
+
+ /**
+ * Writes 3 applications each with a different run id and version for the same
+ * {cluster, user, flow}
+ *
+ * They should be getting inserted into one record in the flow activity table
+ * with 3 columns, one per run id
+ */
+ @Test
+ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException {
+ String cluster = "testManyRunsFlowActivity_cluster1";
+ String user = "testManyRunsFlowActivity_c_user1";
+ String flow = "flow_activity_test_flow_name";
+ String flowVersion1 = "A122110F135BC4";
+ Long runid1 = 11111111111L;
+
+ String flowVersion2 = "A12222222222C4";
+ long runid2 = 2222222222222L;
+
+ String flowVersion3 = "A1333333333C4";
+ long runid3 = 3333333333333L;
+
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entityApp1 = TestFlowDataGenerator.getFlowApp1();
+ te.addEntity(entityApp1);
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ String appName = "application_11888888888_1111";
+ hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te);
+
+ // write an application with to this flow but a different runid/ version
+ te = new TimelineEntities();
+ te.addEntity(entityApp1);
+ appName = "application_11888888888_2222";
+ hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te);
+
+ // write an application with to this flow but a different runid/ version
+ te = new TimelineEntities();
+ te.addEntity(entityApp1);
+ appName = "application_11888888888_3333";
+ hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te);
+
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+ // check flow activity
+ checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1,
+ runid1, flowVersion2, runid2, flowVersion3, runid3);
+
+ }
+
+ private void checkFlowActivityTableSeveralRuns(String cluster, String user,
+ String flow, Configuration c1, String flowVersion1, Long runid1,
+ String flowVersion2, Long runid2, String flowVersion3, Long runid3)
+ throws IOException {
+ Scan s = new Scan();
+ s.addFamily(FlowActivityColumnFamily.INFO.getBytes());
+ byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow);
+ s.setStartRow(startRow);
+ String clusterStop = cluster + "1";
+ byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow);
+ s.setStopRow(stopRow);
+ Connection conn = ConnectionFactory.createConnection(c1);
+ Table table1 = conn.getTable(TableName
+ .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME));
+ ResultScanner scanner = table1.getScanner(s);
+ int rowCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ byte[] row = result.getRow();
+ FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey
+ .parseRowKey(row);
+ assertNotNull(flowActivityRowKey);
+ assertEquals(cluster, flowActivityRowKey.getClusterId());
+ assertEquals(user, flowActivityRowKey.getUserId());
+ assertEquals(flow, flowActivityRowKey.getFlowId());
+ long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System
+ .currentTimeMillis());
+ assertEquals(dayTs, flowActivityRowKey.getDayTimestamp());
+
+ Map values = result
+ .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes());
+ rowCount++;
+ assertEquals(3, values.size());
+ checkFlowActivityRunId(runid1, flowVersion1, values);
+ checkFlowActivityRunId(runid2, flowVersion2, values);
+ checkFlowActivityRunId(runid3, flowVersion3, values);
+ }
+ // the flow activity table is such that it will insert
+ // into current day's record
+ // hence, if this test runs across the midnight boundary,
+ // it may fail since it would insert into two records
+ // one for each day
+ assertEquals(1, rowCount);
+ }
+
+ private void checkFlowActivityRunId(Long runid, String flowVersion,
+ Map values) throws IOException {
+ byte[] rq = ColumnHelper.getColumnQualifier(
+ FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(),
+ GenericObjectMapper.write(runid));
+ for (Map.Entry k : values.entrySet()) {
+ String actualQ = Bytes.toString(k.getKey());
+ if (Bytes.toString(rq).equals(actualQ)) {
+ String actualV = (String) GenericObjectMapper.read(k.getValue());
+ assertEquals(flowVersion, actualV);
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
new file mode 100644
index 0000000..ee32c64
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
+import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey;
+import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the FlowRun and FlowActivity Tables
+ */
+public class TestHBaseStorageFlowRun {
+
+ private static HBaseTestingUtility util;
+
+ private final String metric1 = "MAP_SLOT_MILLIS";
+ private final String metric2 = "HDFS_BYTES_READ";
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ util = new HBaseTestingUtility();
+ Configuration conf = util.getConfiguration();
+ conf.setInt("hfile.format.version", 3);
+ util.startMiniCluster();
+ createSchema();
+ }
+
+ private static void createSchema() throws IOException {
+ TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+ }
+
+ /**
+ * Writes 4 timeline entities belonging to one flow run through the
+ * {@link HBaseTimelineWriterImpl}
+ *
+ * Checks the flow run table contents
+ *
+ * The first entity has a created event, metrics and a finish event.
+ *
+ * The second entity has a created event and this is the entity with smallest
+ * start time. This should be the start time for the flow run.
+ *
+ * The third entity has a finish event and this is the entity with the max end
+ * time. This should be the end time for the flow run.
+ *
+ * The fourth entity has a created event which has a start time that is
+ * greater than min start time.
+ *
+ */
+ @Test
+ public void testWriteFlowRunMinMax() throws Exception {
+
+ TimelineEntities te = new TimelineEntities();
+ te.addEntity(TestFlowDataGenerator.getEntity1());
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ String cluster = "testWriteFlowRunMinMaxToHBase_cluster1";
+ String user = "testWriteFlowRunMinMaxToHBase_user1";
+ String flow = "testing_flowRun_flow_name";
+ String flowVersion = "CF7022C10F1354";
+ Long runid = 1002345678919L;
+ String appName = "application_100000000000_1111";
+ long endTs = 1439750690000L;
+ TimelineEntity entityMinStartTime = TestFlowDataGenerator
+ .getEntityMinStartTime();
+
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // write another entity with the right min start time
+ te = new TimelineEntities();
+ te.addEntity(entityMinStartTime);
+ appName = "application_100000000000_3333";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // writer another entity for max end time
+ TimelineEntity entityMaxEndTime = TestFlowDataGenerator
+ .getEntityMaxEndTime(endTs);
+ te = new TimelineEntities();
+ te.addEntity(entityMaxEndTime);
+ appName = "application_100000000000_4444";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // writer another entity with greater start time
+ TimelineEntity entityGreaterStartTime = TestFlowDataGenerator
+ .getEntityGreaterStartTime();
+ te = new TimelineEntities();
+ te.addEntity(entityGreaterStartTime);
+ appName = "application_1000000000000000_2222";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // flush everything to hbase
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+
+ Connection conn = ConnectionFactory.createConnection(c1);
+ // check in flow run table
+ Table table1 = conn.getTable(TableName
+ .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ // scan the table and see that we get back the right min and max
+ // timestamps
+ byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+ Get g = new Get(startRow);
+ g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+ FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
+ g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+ FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
+ Result r1 = table1.get(g);
+ assertNotNull(r1);
+ assertTrue(!r1.isEmpty());
+ Map values = r1.getFamilyMap(FlowRunColumnFamily.INFO
+ .getBytes());
+
+ assertEquals(2, r1.size());
+ Long starttime = (Long) GenericObjectMapper.read(values
+ .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
+ Long expmin = entityMinStartTime.getCreatedTime();
+ assertEquals(expmin, starttime);
+ assertEquals(endTs, GenericObjectMapper.read(values
+ .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
+ }
+
+ boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user,
+ String flow, Long runid) {
+ byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1);
+ assertTrue(rowKeyComponents.length == 4);
+ assertEquals(cluster, Bytes.toString(rowKeyComponents[0]));
+ assertEquals(user, Bytes.toString(rowKeyComponents[1]));
+ assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
+ assertEquals(TimelineWriterUtils.invert(runid),
+ Bytes.toLong(rowKeyComponents[3]));
+ return true;
+ }
+
+ /**
+ * Writes two application entities of the same flow run. Each application has
+ * two metrics: slot millis and hdfs bytes read. Each metric has values at two
+ * timestamps.
+ *
+ * Checks the metric values of the flow in the flow run table. Flow metric
+ * values should be the sum of individual metric values that belong to the
+ * latest timestamp for that metric
+ */
+ @Test
+ public void testWriteFlowRunMetricsOneFlow() throws Exception {
+ String cluster = "testWriteFlowRunMetricsOneFlow_cluster1";
+ String user = "testWriteFlowRunMetricsOneFlow_user1";
+ String flow = "testing_flowRun_metrics_flow_name";
+ String flowVersion = "CF7022C10F1354";
+ Long runid = 1002345678919L;
+
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+ te.addEntity(entityApp1);
+
+ HBaseTimelineWriterImpl hbi = null;
+ Configuration c1 = util.getConfiguration();
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ String appName = "application_11111111111111_1111";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+ // write another application with same metric to this flow
+ te = new TimelineEntities();
+ TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+ te.addEntity(entityApp2);
+ appName = "application_11111111111111_2222";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+ hbi.flush();
+ } finally {
+ hbi.close();
+ }
+
+ // check flow run
+ checkFlowRunTable(cluster, user, flow, runid, c1);
+ }
+
+ private void checkFlowRunTable(String cluster, String user, String flow,
+ long runid, Configuration c1) throws IOException {
+ Scan s = new Scan();
+ s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+ byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+ s.setStartRow(startRow);
+ String clusterStop = cluster + "1";
+ byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
+ s.setStopRow(stopRow);
+ Connection conn = ConnectionFactory.createConnection(c1);
+ Table table1 = conn.getTable(TableName
+ .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ ResultScanner scanner = table1.getScanner(s);
+
+ int rowCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ Map values = result.getFamilyMap(FlowRunColumnFamily.INFO
+ .getBytes());
+ rowCount++;
+ // check metric1
+ byte[] q = ColumnHelper.getColumnQualifier(
+ FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
+ assertTrue(values.containsKey(q));
+ assertEquals(141, GenericObjectMapper.read(values.get(q)));
+
+ // check metric2
+ assertEquals(2, values.size());
+ q = ColumnHelper.getColumnQualifier(
+ FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
+ assertTrue(values.containsKey(q));
+ assertEquals(57, GenericObjectMapper.read(values.get(q)));
+ }
+ assertEquals(1, rowCount);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+}
| | | | | | | | | | | | |