diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 6ac6fb9..4093d30 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1757,6 +1757,22 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int
DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
+ /**
+ * The name for setting that controls how long the final value of
+ * a metric of a completed app is retained before merging
+ * into the flow sum
+ */
+ public static final String APP_FINAL_VALUE_RETENTION_THRESHOLD =
+ TIMELINE_SERVICE_PREFIX
+ + "coprocessor.app-final-value-retention-milliseconds";
+
+ /**
+ * The setting that controls how long the final value of a metric
+ * of a completed app is retained before merging into the flow sum
+ */
+ public static final long DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD = 3 * 24
+ * 60 * 60 * 1000L;
+
public static final String ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS =
TIMELINE_SERVICE_PREFIX + "app-collector.linger-period.ms";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 2cbc836..86287f6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2067,6 +2067,7 @@
604800
+
The setting that controls how often the timeline collector
flushes the timeline writer.
@@ -2088,6 +2089,15 @@
yarn.timeline-service.timeline-client.number-of-async-entities-to-merge
10
+
+
+ The setting that controls how long the final value
+ of a metric of a completed app is retained before merging into
+ the flow sum
+ yarn.timeline-service.coprocessor.app-final-value-retention-milliseconds
+ 259200000
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
index 1afe878..b75007d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -261,7 +261,8 @@ private void storeFlowMetricsAppRunning(String clusterId, String userId,
byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName,
flowRunId);
storeFlowMetrics(rowKey, metrics,
- AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId));
+ AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId),
+ AggregationOperation.SUM.getAttribute());
}
}
@@ -500,4 +501,4 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
-}
\ No newline at end of file
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
index 605dbe7..b5fc214 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java
@@ -24,9 +24,13 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
+import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -475,4 +479,55 @@ public static boolean isIntegralValue(Object obj) {
return (obj instanceof Short) || (obj instanceof Integer) ||
(obj instanceof Long);
}
+
+ /**
+ * creates a new cell based on the input cell but with the new value.
+ *
+ * @param origCell Original cell
+ * @param newValue new cell value
+ * @return cell
+ * @throws IOException while creating new cell.
+ */
+ public static Cell createNewCell(Cell origCell, byte[] newValue)
+ throws IOException {
+ return CellUtil.createCell(CellUtil.cloneRow(origCell),
+ CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
+ origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
+ }
+
+ /**
+ * creates a cell with the given inputs.
+ *
+ * @param row row of the cell to be created
+ * @param family column family name of the new cell
+ * @param qualifier qualifier for the new cell
+ * @param ts timestamp of the new cell
+ * @param newValue value of the new cell
+ * @param tags tags in the new cell
+ * @return cell
+ * @throws IOException while creating the cell.
+ */
+ public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier,
+ long ts, byte[] newValue, byte[] tags) throws IOException {
+ return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put,
+ newValue, tags);
+ }
+
+ /**
+ * returns app id from the list of tags.
+ *
+ * @param tags cell tags to be looked into
+ * @return App Id as the AggregationCompactionDimension
+ */
+ public static String getAggregationCompactionDimension(List tags) {
+ String appId = null;
+ for (Tag t : tags) {
+ if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
+ .getType()) {
+ appId = Bytes.toString(t.getValue());
+ return appId;
+ }
+ }
+ return appId;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
index 7238efa..288046c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java
@@ -33,7 +33,7 @@
* if this is changed, then reading cell timestamps written with older
* multiplier value will not work
*/
- public static final long TS_MULTIPLIER = 1000L;
+ public static final long TS_MULTIPLIER = 1000000L;
private final AtomicLong lastTimestamp = new AtomicLong();
@@ -74,13 +74,14 @@ public long getUniqueTimestamp() {
}
/**
- * returns a timestamp multiplied with TS_MULTIPLIER and last few digits of
- * application id
+ * Returns a timestamp multiplied with TS_MULTIPLIER and last few digits of
+ * application id.
*
* Unlikely scenario of generating a timestamp that is a duplicate: If more
- * than a 1000 concurrent apps are running in one flow run AND write to same
- * column at the same time, then say appId of 1001 will overlap with appId of
- * 001 and there may be collisions for that flow run's specific column.
+ * than a 1M concurrent apps are running in one flow run AND write to same
+ * column at the same time, then say appId of 1M and 1 will overlap
+ * with appId of 001 and there may be collisions for that flow run's
+ * specific column.
*
* @param incomingTS Timestamp to be converted.
* @param appId Application Id.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
index 6240e81..40cdd2c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java
@@ -21,19 +21,19 @@
/**
* Identifies the attributes to be set for puts into the {@link FlowRunTable}.
- * The numbers used for tagType are prime numbers
+ * The numbers used for tagType are prime numbers.
*/
public enum AggregationOperation {
/**
* When the flow was started.
*/
- MIN((byte) 71),
+ GLOBAL_MIN((byte) 71),
/**
* When it ended.
*/
- MAX((byte) 73),
+ GLOBAL_MAX((byte) 73),
/**
* The metrics of the flow.
@@ -46,9 +46,16 @@
SUM_FINAL((byte) 83),
/**
- * compact.
+ * Min value as per the latest timestamp
+ * seen for a given app.
*/
- COMPACT((byte) 89);
+ LATEST_MIN((byte) 89),
+
+ /**
+ * Max value as per the latest timestamp
+ * seen for a given app.
+ */
+ LATEST_MAX((byte) 97);
private byte tagType;
private byte[] inBytes;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
index 148a37f..d50bb16 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java
@@ -41,14 +41,14 @@
* application start times.
*/
MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time",
- AggregationOperation.MIN, LongConverter.getInstance()),
+ AggregationOperation.GLOBAL_MIN, LongConverter.getInstance()),
/**
* When the flow ended. This is the maximum of currently known application end
* times.
*/
MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time",
- AggregationOperation.MAX, LongConverter.getInstance()),
+ AggregationOperation.GLOBAL_MAX, LongConverter.getInstance()),
/**
* The version of the flow that this flow belongs to.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
index 3d7c40e..fa94aae 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java
@@ -40,7 +40,7 @@
/**
* To store flow run info values.
*/
- METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM,
+ METRIC(FlowRunColumnFamily.INFO, "m", null,
LongConverter.getInstance());
private final ColumnHelper column;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 9698f06..450640a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -40,7 +40,12 @@
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
@@ -51,7 +56,6 @@
*/
public class FlowRunCoprocessor extends BaseRegionObserver {
- @SuppressWarnings("unused")
private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
private HRegion region;
@@ -160,8 +164,8 @@ public void preGetOp(ObserverContext e,
scan.setMaxVersions();
RegionScanner scanner = null;
try {
- scanner = new FlowScanner(region, scan.getBatch(),
- region.getScanner(scan));
+ scanner = new FlowScanner(e.getEnvironment(), scan.getBatch(),
+ region.getScanner(scan), FlowScannerOperation.READ);
scanner.next(results);
e.bypass();
} finally {
@@ -209,6 +213,64 @@ public RegionScanner preScannerOpen(
public RegionScanner postScannerOpen(
ObserverContext e, Scan scan,
RegionScanner scanner) throws IOException {
- return new FlowScanner(region, scan.getBatch(), scanner);
+ return new FlowScanner(e.getEnvironment(), scan.getBatch(),
+ scanner, FlowScannerOperation.READ);
+ }
+
+ @Override
+ public InternalScanner preFlush(
+ ObserverContext c, Store store,
+ InternalScanner scanner) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ if (store != null) {
+ LOG.debug("preFlush store = " + store.getColumnFamilyName()
+ + " flushableSize=" + store.getFlushableSize()
+ + " flushedCellsCount=" + store.getFlushedCellsCount()
+ + " compactedCellsCount=" + store.getCompactedCellsCount()
+ + " majorCompactedCellsCount="
+ + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
+ + store.getMemstoreFlushSize() + " memstoreSize="
+ + store.getMemStoreSize() + " size=" + store.getSize()
+ + " storeFilesCount=" + store.getStorefilesCount());
+ }
+ }
+ return new FlowScanner(c.getEnvironment(), -1, scanner,
+ FlowScannerOperation.FLUSH);
+ }
+
+ @Override
+ public void postFlush(ObserverContext c,
+ Store store, StoreFile resultFile) {
+ if (LOG.isDebugEnabled()) {
+ if (store != null) {
+ LOG.debug("postFlush store = " + store.getColumnFamilyName()
+ + " flushableSize=" + store.getFlushableSize()
+ + " flushedCellsCount=" + store.getFlushedCellsCount()
+ + " compactedCellsCount=" + store.getCompactedCellsCount()
+ + " majorCompactedCellsCount="
+ + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
+ + store.getMemstoreFlushSize() + " memstoreSize="
+ + store.getMemStoreSize() + " size=" + store.getSize()
+ + " storeFilesCount=" + store.getStorefilesCount());
+ }
+ }
+ }
+
+ @Override
+ public InternalScanner preCompact(
+ ObserverContext e, Store store,
+ InternalScanner scanner, ScanType scanType, CompactionRequest request)
+ throws IOException {
+
+ FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
+ if (request != null) {
+ requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
+ : FlowScannerOperation.MINOR_COMPACTION);
+ LOG.info("Compactionrequest= " + request.toString() + " "
+ + requestOp.toString() + " RegionName="
+ + e.getEnvironment().getRegion().getRegionNameAsString());
+ }
+
+ return new FlowScanner(e.getEnvironment(), -1, scanner, requestOp);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 6fefd15..55179e6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -29,20 +29,26 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* Invoked via the coprocessor when a Get or a Scan is issued for flow run
@@ -55,23 +61,42 @@
private static final Log LOG = LogFactory.getLog(FlowScanner.class);
+ /**
+ * use a special application id to represent the flow id this is needed since
+ * TimestampGenerator parses the app id to generate a cell timestamp.
+ */
+ private static final String FLOW_APP_ID = "application_00000000000_0000";
+
private final HRegion region;
private final InternalScanner flowRunScanner;
- private RegionScanner regionScanner;
private final int limit;
+ private final long appFinalValueRetentionThreshold;
+ private RegionScanner regionScanner;
private boolean hasMore;
private byte[] currentRow;
private List availableCells = new ArrayList<>();
private int currentIndex;
+ private FlowScannerOperation action = FlowScannerOperation.READ;
- FlowScanner(HRegion region, int limit, InternalScanner internalScanner) {
- this.region = region;
+ FlowScanner(RegionCoprocessorEnvironment env, int limit,
+ InternalScanner internalScanner, FlowScannerOperation action) {
this.limit = limit;
this.flowRunScanner = internalScanner;
if (internalScanner instanceof RegionScanner) {
this.regionScanner = (RegionScanner) internalScanner;
}
- // TODO: note if it's compaction/flush
+ this.action = action;
+ if (env == null) {
+ this.appFinalValueRetentionThreshold =
+ YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD;
+ this.region = null;
+ } else {
+ this.region = env.getRegion();
+ Configuration hbaseConf = env.getConfiguration();
+ this.appFinalValueRetentionThreshold = hbaseConf.getLong(
+ YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD,
+ YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD);
+ }
}
/*
@@ -104,17 +129,6 @@ public boolean next(List cells, int cellLimit) throws IOException {
return nextInternal(cells, cellLimit);
}
- private String getAggregationCompactionDimension(List tags) {
- String appId = null;
- for (Tag t : tags) {
- if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
- .getType()) {
- appId = Bytes.toString(t.getValue());
- }
- }
- return appId;
- }
-
/**
* Get value converter associated with a column or a column prefix. If nothing
* matches, generic converter is returned.
@@ -183,14 +197,18 @@ private boolean nextInternal(List cells, int cellLimit)
SortedSet currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR);
Set alreadySeenAggDim = new HashSet<>();
int addedCnt = 0;
+ long currentTimestamp = System.currentTimeMillis();
ValueConverter converter = null;
- while (((cell = peekAtNextCell(cellLimit)) != null)
- && (cellLimit <= 0 || addedCnt < cellLimit)) {
+ while (cellLimit <= 0 || addedCnt < cellLimit) {
+ cell = peekAtNextCell(cellLimit);
+ if (cell == null) {
+ break;
+ }
byte[] newColumnQualifier = CellUtil.cloneQualifier(cell);
if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) {
if (converter != null && isNumericConverter(converter)) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
- (NumericValueConverter)converter);
+ (NumericValueConverter)converter, currentTimestamp);
}
resetState(currentColumnCells, alreadySeenAggDim);
currentColumnQualifier = newColumnQualifier;
@@ -207,8 +225,16 @@ private boolean nextInternal(List cells, int cellLimit)
nextCell(cellLimit);
}
if (!currentColumnCells.isEmpty()) {
- emitCells(cells, currentColumnCells, currentAggOp,
- (NumericValueConverter)converter);
+ addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
+ (NumericValueConverter)converter, currentTimestamp);
+ if (LOG.isDebugEnabled()) {
+ if (addedCnt > 0) {
+ LOG.debug("emitted cells. " + addedCnt + " for " + this.action
+ + " rowKey=");
+ } else {
+ LOG.debug("emitted no cells for " + this.action);
+ }
+ }
}
return hasMore();
}
@@ -247,7 +273,7 @@ private void collectCells(SortedSet currentColumnCells,
}
switch (currentAggOp) {
- case MIN:
+ case GLOBAL_MIN:
if (currentColumnCells.size() == 0) {
currentColumnCells.add(cell);
} else {
@@ -260,7 +286,7 @@ private void collectCells(SortedSet currentColumnCells,
}
}
break;
- case MAX:
+ case GLOBAL_MAX:
if (currentColumnCells.size() == 0) {
currentColumnCells.add(cell);
} else {
@@ -275,16 +301,32 @@ private void collectCells(SortedSet currentColumnCells,
break;
case SUM:
case SUM_FINAL:
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("In collect cells "
+ + " FlowSannerOperation="
+ + this.action
+ + " currentAggOp="
+ + currentAggOp
+ + " cell qualifier="
+ + Bytes.toString(CellUtil.cloneQualifier(cell))
+ + " cell value= "
+ + (Number) converter.decodeValue(CellUtil.cloneValue(cell))
+ + " timestamp=" + cell.getTimestamp());
+ }
+
// only if this app has not been seen yet, add to current column cells
List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
- String aggDim = getAggregationCompactionDimension(tags);
-
- // If this agg dimension has already been seen, since they show up in
- // sorted order, we drop the rest which are older. In other words, this
- // cell is older than previously seen cells for that agg dim.
+ String aggDim = TimelineStorageUtils
+ .getAggregationCompactionDimension(tags);
if (!alreadySeenAggDim.contains(aggDim)) {
- // Not seen this agg dim, hence consider this cell in our working set
+ // if this agg dimension has already been seen,
+ // since they show up in sorted order
+ // we drop the rest which are older
+ // in other words, this cell is older than previously seen cells
+ // for that agg dim
+ // but when this agg dim is not seen,
+ // consider this cell in our working set
currentColumnCells.add(cell);
alreadySeenAggDim.add(aggDim);
}
@@ -300,8 +342,8 @@ private void collectCells(SortedSet currentColumnCells,
* parameter.
*/
private int emitCells(List cells, SortedSet currentColumnCells,
- AggregationOperation currentAggOp, NumericValueConverter converter)
- throws IOException {
+ AggregationOperation currentAggOp, NumericValueConverter converter,
+ long currentTimestamp) throws IOException {
if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) {
return 0;
}
@@ -309,17 +351,36 @@ private int emitCells(List cells, SortedSet currentColumnCells,
cells.addAll(currentColumnCells);
return currentColumnCells.size();
}
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("In emitCells " + this.action + " currentColumnCells size= "
+ + currentColumnCells.size() + " currentAggOp" + currentAggOp);
+ }
switch (currentAggOp) {
- case MIN:
- case MAX:
+ case GLOBAL_MIN:
+ case GLOBAL_MAX:
cells.addAll(currentColumnCells);
return currentColumnCells.size();
case SUM:
case SUM_FINAL:
- Cell sumCell = processSummation(currentColumnCells, converter);
- cells.add(sumCell);
- return 1;
+ switch (action) {
+ case FLUSH:
+ case MINOR_COMPACTION:
+ cells.addAll(currentColumnCells);
+ return currentColumnCells.size();
+ case READ:
+ Cell sumCell = processSummation(currentColumnCells, converter);
+ cells.add(sumCell);
+ return 1;
+ case MAJOR_COMPACTION:
+ List finalCells = processSummationMajorCompaction(
+ currentColumnCells, converter, currentTimestamp);
+ cells.addAll(finalCells);
+ return finalCells.size();
+ default:
+ cells.addAll(currentColumnCells);
+ return currentColumnCells.size();
+ }
default:
cells.addAll(currentColumnCells);
return currentColumnCells.size();
@@ -349,10 +410,122 @@ private Cell processSummation(SortedSet currentColumnCells,
sum = converter.add(sum, currentValue);
}
byte[] sumBytes = converter.encodeValue(sum);
- Cell sumCell = createNewCell(mostRecentCell, sumBytes);
+ Cell sumCell = TimelineStorageUtils.createNewCell(mostRecentCell, sumBytes);
return sumCell;
}
+
+ /**
+ * Returns a list of cells that contains
+ *
+ * A) the latest cells for applications that haven't finished yet
+ * B) summation
+ * for the flow, based on applications that have completed and are older than
+ * a certain time
+ *
+ * The new cell created has the timestamp of the most recent metric cell. The
+ * sum of a metric for a flow run is the summation at the point of the last
+ * metric update in that flow till that time.
+ */
+ @VisibleForTesting
+ List processSummationMajorCompaction(
+ SortedSet currentColumnCells, NumericValueConverter converter,
+ long currentTimestamp)
+ throws IOException {
+ Number sum = 0;
+ Number currentValue = 0;
+ long ts = 0L;
+ boolean summationDone = false;
+ List finalCells = new ArrayList();
+ if (currentColumnCells == null) {
+ return finalCells;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("In processSummationMajorCompaction,"
+ + " will drop cells older than " + currentTimestamp
+ + " CurrentColumnCells size=" + currentColumnCells.size());
+ }
+
+ for (Cell cell : currentColumnCells) {
+ AggregationOperation cellAggOp = getCurrentAggOp(cell);
+ // if this is the existing flow sum cell
+ List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ String appId = TimelineStorageUtils
+ .getAggregationCompactionDimension(tags);
+ if (appId == FLOW_APP_ID) {
+ sum = converter.add(sum, currentValue);
+ summationDone = true;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("reading flow app id sum=" + sum);
+ }
+ } else {
+ currentValue = (Number) converter.decodeValue(CellUtil
+ .cloneValue(cell));
+ // read the timestamp truncated by the generator
+ ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp());
+ if ((cellAggOp == AggregationOperation.SUM_FINAL)
+ && ((ts + this.appFinalValueRetentionThreshold)
+ < currentTimestamp)) {
+ sum = converter.add(sum, currentValue);
+ summationDone = true;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("MAJOR COMPACTION loop sum= " + sum
+ + " discarding now: " + " qualifier="
+ + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value="
+ + (Number) converter.decodeValue(CellUtil.cloneValue(cell))
+ + " timestamp=" + cell.getTimestamp() + " " + this.action);
+ }
+ } else {
+ // not a final value but it's the latest cell for this app
+ // so include this cell in the list of cells to write back
+ finalCells.add(cell);
+ }
+ }
+ }
+ if (summationDone) {
+ Cell anyCell = currentColumnCells.first();
+ List tags = new ArrayList();
+ Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Bytes.toBytes(FLOW_APP_ID));
+ tags.add(t);
+ t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
+ Bytes.toBytes(FLOW_APP_ID));
+ tags.add(t);
+ byte[] tagByteArray = Tag.fromList(tags);
+ Cell sumCell = TimelineStorageUtils.createNewCell(
+ CellUtil.cloneRow(anyCell),
+ CellUtil.cloneFamily(anyCell),
+ CellUtil.cloneQualifier(anyCell),
+ TimestampGenerator.getSupplementedTimestamp(
+ System.currentTimeMillis(), FLOW_APP_ID),
+ converter.encodeValue(sum), tagByteArray);
+ finalCells.add(sumCell);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("MAJOR COMPACTION final sum= " + sum + " for "
+ + Bytes.toString(CellUtil.cloneQualifier(sumCell))
+ + " " + this.action);
+ }
+ LOG.info("After major compaction for qualifier="
+ + Bytes.toString(CellUtil.cloneQualifier(sumCell))
+ + " with currentColumnCells.size="
+ + currentColumnCells.size()
+ + " returning finalCells.size=" + finalCells.size()
+ + " with sum=" + sum.longValue()
+ + " with cell timestamp " + sumCell.getTimestamp());
+ } else {
+ String qualifier = "";
+ LOG.info("After major compaction for qualifier=" + qualifier
+ + " with currentColumnCells.size="
+ + currentColumnCells.size()
+ + " returning finalCells.size=" + finalCells.size()
+ + " with zero sum="
+ + sum.longValue());
+ }
+ return finalCells;
+ }
+
/**
* Determines which cell is to be returned based on the values in each cell
* and the comparison operation MIN or MAX.
@@ -375,7 +548,7 @@ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
Number currentCellValue = (Number) converter.decodeValue(CellUtil
.cloneValue(currentCell));
switch (currentAggOp) {
- case MIN:
+ case GLOBAL_MIN:
if (converter.compare(
currentCellValue, previouslyChosenCellValue) < 0) {
// new value is minimum, hence return this cell
@@ -384,7 +557,7 @@ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
// previously chosen value is miniumum, hence return previous min cell
return previouslyChosenCell;
}
- case MAX:
+ case GLOBAL_MAX:
if (converter.compare(
currentCellValue, previouslyChosenCellValue) > 0) {
// new value is max, hence return this cell
@@ -402,16 +575,13 @@ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell,
}
}
- private Cell createNewCell(Cell origCell, byte[] newValue)
- throws IOException {
- return CellUtil.createCell(CellUtil.cloneRow(origCell),
- CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell),
- origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue);
- }
-
@Override
public void close() throws IOException {
- flowRunScanner.close();
+ if (flowRunScanner != null) {
+ flowRunScanner.close();
+ } else {
+ LOG.warn("scanner close called but scanner is null");
+ }
}
/**
@@ -423,8 +593,6 @@ public void startNext() {
/**
* Returns whether or not the underlying scanner has more rows.
- *
- * @return true, if there are more cells to return, false otherwise.
*/
public boolean hasMore() {
return currentIndex < availableCells.size() ? true : hasMore;
@@ -440,8 +608,7 @@ public boolean hasMore() {
* fetched by the wrapped scanner
* @return the next available cell or null if no more cells are available for
* the current row
- * @throws IOException if any problem is encountered while grabbing the next
- * cell.
+ * @throws IOException
*/
public Cell nextCell(int cellLimit) throws IOException {
Cell cell = peekAtNextCell(cellLimit);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
new file mode 100644
index 0000000..73c666f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+
+/**
+ * Identifies the scanner operation on the {@link FlowRunTable}.
+ */
+public enum FlowScannerOperation {
+
+ /**
+ * If the scanner is opened for reading
+ * during preGet or preScan.
+ */
+ READ,
+
+ /**
+ * If the scanner is opened during preFlush.
+ */
+ FLUSH,
+
+ /**
+ * If the scanner is opened during minor Compaction.
+ */
+ MINOR_COMPACTION,
+
+ /**
+ * If the scanner is opened during major Compaction.
+ */
+ MAJOR_COMPACTION
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
index d45df57..0016485 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
-
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -29,17 +28,60 @@
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.conf.Configuration;
/**
* Generates the data/entities for the FlowRun and FlowActivity Tables
*/
class TestFlowDataGenerator {
- private final static String metric1 = "MAP_SLOT_MILLIS";
- private final static String metric2 = "HDFS_BYTES_READ";
+ private static final String metric1 = "MAP_SLOT_MILLIS";
+ private static final String metric2 = "HDFS_BYTES_READ";
+ public static final long END_TS_INCR = 10000L;
+
+ static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunMetrics_test";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+ // add metrics
+ Set metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map metricValues = new HashMap();
+ long ts = insertTs;
- static TimelineEntity getEntityMetricsApp1() {
+ for (int k=1; k< 100 ; k++) {
+ metricValues.put(ts - k*200000, 20L);
+ }
+ metricValues.put(ts - 80000, 40L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId(metric2);
+ metricValues = new HashMap();
+ ts = System.currentTimeMillis();
+ for (int k=1; k< 100 ; k++) {
+ metricValues.put(ts - k*100000, 31L);
+ }
+
+ metricValues.put(ts - 80000, 57L);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues);
+ metrics.add(m2);
+
+ entity.addMetrics(metrics);
+ return entity;
+ }
+
+
+ static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration c1) {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunMetrics_test";
String type = TimelineEntityType.YARN_APPLICATION.toString();
@@ -53,7 +95,48 @@ static TimelineEntity getEntityMetricsApp1() {
TimelineMetric m1 = new TimelineMetric();
m1.setId(metric1);
Map metricValues = new HashMap();
- long ts = System.currentTimeMillis();
+ long ts = insertTs;
+
+ metricValues.put(ts - 80000, 40L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId(metric2);
+ metricValues = new HashMap();
+ ts = insertTs;
+ metricValues.put(ts - 80000, 57L);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues);
+ metrics.add(m2);
+
+ entity.addMetrics(metrics);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ event.setTimestamp(insertTs);
+ event.addInfo("done", "insertTs=" + insertTs);
+ entity.addEvent(event);
+ return entity;
+ }
+
+
+ static TimelineEntity getEntityMetricsApp1(long insertTs) {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunMetrics_test";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ long cTime = 1425016501000L;
+ entity.setCreatedTime(cTime);
+
+ // add metrics
+ Set metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map metricValues = new HashMap();
+ long ts = insertTs;
metricValues.put(ts - 100000, 2L);
metricValues.put(ts - 80000, 40L);
m1.setType(Type.TIME_SERIES);
@@ -63,7 +146,7 @@ static TimelineEntity getEntityMetricsApp1() {
TimelineMetric m2 = new TimelineMetric();
m2.setId(metric2);
metricValues = new HashMap();
- ts = System.currentTimeMillis();
+ ts = insertTs;
metricValues.put(ts - 100000, 31L);
metricValues.put(ts - 80000, 57L);
m2.setType(Type.TIME_SERIES);
@@ -74,7 +157,8 @@ static TimelineEntity getEntityMetricsApp1() {
return entity;
}
- static TimelineEntity getEntityMetricsApp2() {
+
+ static TimelineEntity getEntityMetricsApp2(long insertTs) {
TimelineEntity entity = new TimelineEntity();
String id = "flowRunMetrics_test";
String type = TimelineEntityType.YARN_APPLICATION.toString();
@@ -87,7 +171,7 @@ static TimelineEntity getEntityMetricsApp2() {
TimelineMetric m1 = new TimelineMetric();
m1.setId(metric1);
Map metricValues = new HashMap();
- long ts = System.currentTimeMillis();
+ long ts = insertTs;
metricValues.put(ts - 100000, 5L);
metricValues.put(ts - 80000, 101L);
m1.setType(Type.TIME_SERIES);
@@ -140,6 +224,55 @@ static TimelineEntity getEntity1() {
return entity;
}
+ static TimelineEntity getAFullEntity(long ts, long endTs) {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunFullEntity";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ entity.setCreatedTime(ts);
+ // add metrics
+ Set metrics = new HashSet<>();
+ TimelineMetric m1 = new TimelineMetric();
+ m1.setId(metric1);
+ Map metricValues = new HashMap();
+ metricValues.put(ts - 120000, 100000000L);
+ metricValues.put(ts - 100000, 200000000L);
+ metricValues.put(ts - 80000, 300000000L);
+ metricValues.put(ts - 60000, 400000000L);
+ metricValues.put(ts - 40000, 50000000000L);
+ metricValues.put(ts - 20000, 60000000000L);
+ m1.setType(Type.TIME_SERIES);
+ m1.setValues(metricValues);
+ metrics.add(m1);
+ TimelineMetric m2 = new TimelineMetric();
+ m2.setId(metric2);
+ metricValues = new HashMap();
+ metricValues.put(ts - 900000, 31L);
+ metricValues.put(ts - 30000, 57L);
+ m2.setType(Type.TIME_SERIES);
+ m2.setValues(metricValues);
+ metrics.add(m2);
+ entity.addMetrics(metrics);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event.setTimestamp(ts);
+ String expKey = "foo_event";
+ Object expVal = "test";
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+
+ event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ long expTs = ts + 21600000;// start time + 6hrs
+ event.setTimestamp(expTs);
+ event.addInfo(expKey, expVal);
+ entity.addEvent(event);
+
+ return entity;
+ }
+
static TimelineEntity getEntityGreaterStartTime(long startTs) {
TimelineEntity entity = new TimelineEntity();
entity.setCreatedTime(startTs);
@@ -179,11 +312,39 @@ static TimelineEntity getEntityMinStartTime(long startTs) {
entity.setCreatedTime(startTs);
TimelineEvent event = new TimelineEvent();
event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
+ event.setTimestamp(System.currentTimeMillis());
+ entity.addEvent(event);
+ return entity;
+ }
+
+ static TimelineEntity getMinFlushEntity(long startTs) {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunHelloFlushEntityMin";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ entity.setCreatedTime(startTs);
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
event.setTimestamp(startTs);
entity.addEvent(event);
return entity;
}
+ static TimelineEntity getMaxFlushEntity(long startTs) {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "flowRunHelloFlushEntityMax";
+ String type = TimelineEntityType.YARN_APPLICATION.toString();
+ entity.setId(id);
+ entity.setType(type);
+ entity.setCreatedTime(startTs);
+
+ TimelineEvent event = new TimelineEvent();
+ event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
+ event.setTimestamp(startTs + END_TS_INCR);
+ entity.addEvent(event);
+ return entity;
+ }
static TimelineEntity getFlowApp1(long appCreatedTime) {
TimelineEntity entity = new TimelineEntity();
@@ -203,5 +364,4 @@ static TimelineEntity getFlowApp1(long appCreatedTime) {
return entity;
}
-
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index b234bfd..f04dd48 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -52,8 +52,8 @@
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -216,7 +216,8 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception {
long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities();
- TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+ TimelineEntity entityApp1 = TestFlowDataGenerator
+ .getEntityMetricsApp1(System.currentTimeMillis());
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
@@ -228,7 +229,8 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception {
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow
te = new TimelineEntities();
- TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+ TimelineEntity entityApp2 = TestFlowDataGenerator
+ .getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2);
appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
@@ -323,7 +325,8 @@ public void testWriteFlowRunMetricsPrefix() throws Exception {
long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities();
- TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+ TimelineEntity entityApp1 = TestFlowDataGenerator
+ .getEntityMetricsApp1(System.currentTimeMillis());
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
@@ -335,7 +338,8 @@ public void testWriteFlowRunMetricsPrefix() throws Exception {
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow
te = new TimelineEntities();
- TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+ TimelineEntity entityApp2 = TestFlowDataGenerator
+ .getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2);
appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
@@ -420,7 +424,8 @@ public void testWriteFlowRunsMetricFields() throws Exception {
long runid = 1002345678919L;
TimelineEntities te = new TimelineEntities();
- TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1();
+ TimelineEntity entityApp1 = TestFlowDataGenerator
+ .getEntityMetricsApp1(System.currentTimeMillis());
te.addEntity(entityApp1);
HBaseTimelineWriterImpl hbi = null;
@@ -432,7 +437,8 @@ public void testWriteFlowRunsMetricFields() throws Exception {
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
// write another application with same metric to this flow
te = new TimelineEntities();
- TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2();
+ TimelineEntity entityApp2 = TestFlowDataGenerator
+ .getEntityMetricsApp2(System.currentTimeMillis());
te.addEntity(entityApp2);
appName = "application_11111111111111_2222";
hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
@@ -494,6 +500,98 @@ public void testWriteFlowRunsMetricFields() throws Exception {
}
}
+ @Test
+ public void testWriteFlowRunFlush() throws Exception {
+ String cluster = "atestFlushFlowRun_cluster1";
+ String user = "atestFlushFlowRun__user1";
+ String flow = "atestFlushFlowRun_flow_name";
+ String flowVersion = "AF1021C19F1351";
+ long runid = 1449526652000L;
+
+ int start = 10;
+ int count = 20000;
+ int appIdSuffix = 1;
+ HBaseTimelineWriterImpl hbi = null;
+ long insertTs = 1449796654827L - count;
+ long minTS = insertTs + 1;
+ long startTs = insertTs;
+ Configuration c1 = util.getConfiguration();
+ TimelineEntities te1 = null;
+ TimelineEntity entityApp1 = null;
+ TimelineEntity entityApp2 = null;
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+
+ for (int i = start; i < count; i++) {
+ String appName = "application_1060350000000_" + appIdSuffix;
+ insertTs++;
+ te1 = new TimelineEntities();
+ entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
+ te1.addEntity(entityApp1);
+ entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
+ te1.addEntity(entityApp2);
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+ Thread.sleep(1);
+
+ appName = "application_1001199480000_7" + appIdSuffix;
+ insertTs++;
+ appIdSuffix++;
+ te1 = new TimelineEntities();
+ entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs);
+ te1.addEntity(entityApp1);
+ entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs);
+ te1.addEntity(entityApp2);
+
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+ if (i % 1000 == 0) {
+ hbi.flush();
+ checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow,
+ runid, false);
+ }
+ }
+ } finally {
+ hbi.flush();
+ hbi.close();
+ checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid,
+ true);
+ }
+ }
+
+ private void checkMinMaxFlush(Configuration c1, long minTS, long startTs,
+ int count, String cluster, String user, String flow, long runid,
+ boolean checkMax) throws IOException {
+ Connection conn = ConnectionFactory.createConnection(c1);
+ // check in flow run table
+ Table table1 = conn.getTable(TableName
+ .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ // scan the table and see that we get back the right min and max
+ // timestamps
+ byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+ Get g = new Get(startRow);
+ g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+ FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes());
+ g.addColumn(FlowRunColumnFamily.INFO.getBytes(),
+ FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes());
+
+ Result r1 = table1.get(g);
+ assertNotNull(r1);
+ assertTrue(!r1.isEmpty());
+ Map values = r1.getFamilyMap(FlowRunColumnFamily.INFO
+ .getBytes());
+ int start = 10;
+ assertEquals(2, r1.size());
+ long starttime = Bytes.toLong(values
+ .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()));
+ assertEquals(minTS, starttime);
+ if (checkMax) {
+ assertEquals(startTs + 2 * (count - start)
+ + TestFlowDataGenerator.END_TS_INCR,
+ Bytes.toLong(values
+ .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes())));
+ }
+ }
+
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
new file mode 100644
index 0000000..a5ca458
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -0,0 +1,638 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage.flow;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
+import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+
+/**
+ * Tests the FlowRun and FlowActivity Tables
+ */
+public class TestHBaseStorageFlowRunCompaction {
+
+ private static HBaseTestingUtility util;
+
+ private final String metric1 = "MAP_SLOT_MILLIS";
+ private final String metric2 = "HDFS_BYTES_READ";
+
+ private final byte[] aRowKey = Bytes.toBytes("a");
+ private final byte[] aFamily = Bytes.toBytes("family");
+ private final byte[] aQualifier = Bytes.toBytes("qualifier");
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ util = new HBaseTestingUtility();
+ Configuration conf = util.getConfiguration();
+ conf.setInt("hfile.format.version", 3);
+ util.startMiniCluster();
+ createSchema();
+ }
+
+ private static void createSchema() throws IOException {
+ TimelineSchemaCreator.createAllTables(util.getConfiguration(), false);
+ }
+
+ @Test
+ public void testWriteFlowRunCompaction() throws Exception {
+ String cluster = "kompaction_cluster1";
+ String user = "kompaction_FlowRun__user1";
+ String flow = "kompaction_flowRun_flow_name";
+ String flowVersion = "AF1021C19F1351";
+ long runid = 1449526652000L;
+
+ int start = 10;
+ int count = 2000;
+ int appIdSuffix = 1;
+ HBaseTimelineWriterImpl hbi = null;
+ long insertTs = System.currentTimeMillis() - count;
+ Configuration c1 = util.getConfiguration();
+ TimelineEntities te1 = null;
+ TimelineEntity entityApp1 = null;
+ try {
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ // now insert count * ( 100 + 100) metrics
+ // each call to getEntityMetricsApp1 brings back 100 values
+ // of metric1 and 100 of metric2
+ for (int i = start; i < start + count; i++) {
+ String appName = "application_10240000000000_" + appIdSuffix;
+ insertTs++;
+ te1 = new TimelineEntities();
+ entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1);
+ te1.addEntity(entityApp1);
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+
+ appName = "application_2048000000000_7" + appIdSuffix;
+ insertTs++;
+ te1 = new TimelineEntities();
+ entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs);
+ te1.addEntity(entityApp1);
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+ }
+ } finally {
+ String appName = "application_10240000000000_" + appIdSuffix;
+ te1 = new TimelineEntities();
+ entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1Complete(
+ insertTs + 1, c1);
+ te1.addEntity(entityApp1);
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te1);
+ hbi.flush();
+ hbi.close();
+ }
+
+ // check in flow run table
+ HRegionServer server = util.getRSForFirstRegionInTable(TableName
+ .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ List regions = server.getOnlineRegions(TableName
+ .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ assertTrue("Didn't find any regions for primary table!", regions.size() > 0);
+ // flush and compact all the regions of the primary table
+ for (HRegion region : regions) {
+ region.flushcache();
+ region.compactStores(true);
+ }
+
+ // check flow run for one flow many apps
+ checkFlowRunTable(cluster, user, flow, runid, c1, 3);
+ }
+
+
+ private void checkFlowRunTable(String cluster, String user, String flow,
+ long runid, Configuration c1, int valueCount) throws IOException {
+ Scan s = new Scan();
+ s.addFamily(FlowRunColumnFamily.INFO.getBytes());
+ byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid);
+ s.setStartRow(startRow);
+ String clusterStop = cluster + "1";
+ byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid);
+ s.setStopRow(stopRow);
+ Connection conn = ConnectionFactory.createConnection(c1);
+ Table table1 = conn.getTable(TableName
+ .valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
+ ResultScanner scanner = table1.getScanner(s);
+
+ int rowCount = 0;
+ for (Result result : scanner) {
+ assertNotNull(result);
+ assertTrue(!result.isEmpty());
+ Map values = result.getFamilyMap(FlowRunColumnFamily.INFO
+ .getBytes());
+ assertEquals(valueCount, values.size());
+
+ rowCount++;
+ // check metric1
+ byte[] q = ColumnHelper.getColumnQualifier(
+ FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1);
+ assertTrue(values.containsKey(q));
+ assertEquals(141, Bytes.toLong(values.get(q)));
+
+ // check metric2
+ q = ColumnHelper.getColumnQualifier(
+ FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2);
+ assertTrue(values.containsKey(q));
+ assertEquals(57, Bytes.toLong(values.get(q)));
+ }
+ assertEquals(1, rowCount);
+ }
+
+
+ private FlowScanner getFlowScannerForTestingCompaction() {
+ // create a FlowScanner object with the sole purpose of invoking a process
+ // summation;
+ CompactionRequest request = new CompactionRequest();
+ request.setIsMajor(true, true);
+ // okay to pass in nulls for the constructor arguments
+ // because all we want to do is invoke the process summation
+ FlowScanner fs = new FlowScanner(null, -1, null,
+ (request.isMajor() == true ? FlowScannerOperation.MAJOR_COMPACTION
+ : FlowScannerOperation.MINOR_COMPACTION));
+ assertNotNull(fs);
+ return fs;
+ }
+
+ @Test
+ public void checkProcessSummationMoreCellsSumFinal2()
+ throws IOException {
+ long cellValue1 = 1236L;
+ long cellValue2 = 28L;
+ long cellValue3 = 1236L;
+ long cellValue4 = 1236L;
+ FlowScanner fs = getFlowScannerForTestingCompaction();
+
+ // note down the current timestamp
+ long currentTimestamp = System.currentTimeMillis();
+ long cell1Ts = 1200120L;
+ long cell2Ts = TimestampGenerator.getSupplementedTimestamp(
+ System.currentTimeMillis(),"application_123746661110_11202");
+ long cell3Ts = 1277719L;
+ long cell4Ts = currentTimestamp - 10;
+
+ SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
+
+ List tags = new ArrayList<>();
+ Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ "application_1234588888_91188");
+ tags.add(t);
+ byte[] tagByteArray = Tag.fromList(tags);
+ // create a cell with a VERY old timestamp and attribute SUM_FINAL
+ Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
+ currentColumnCells.add(c1);
+
+ tags = new ArrayList<>();
+ t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ "application_12700000001_29102");
+ tags.add(t);
+ tagByteArray = Tag.fromList(tags);
+ // create a cell with a recent timestamp and attribute SUM_FINAL
+ Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
+ currentColumnCells.add(c2);
+
+ tags = new ArrayList<>();
+ t = new Tag(AggregationOperation.SUM.getTagType(),
+ "application_191780000000001_8195");
+ tags.add(t);
+ tagByteArray = Tag.fromList(tags);
+ // create a cell with a VERY old timestamp but has attribute SUM
+ Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
+ currentColumnCells.add(c3);
+
+ tags = new ArrayList<>();
+ t = new Tag(AggregationOperation.SUM.getTagType(),
+ "application_191780000000001_98104");
+ tags.add(t);
+ tagByteArray = Tag.fromList(tags);
+ // create a cell with a VERY old timestamp but has attribute SUM
+ Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
+ currentColumnCells.add(c4);
+
+ List cells = fs.processSummationMajorCompaction(currentColumnCells,
+ LongConverter.getInstance(), currentTimestamp);
+ assertNotNull(cells);
+
+ // we should be getting back 4 cells
+ // one is the flow sum cell
+ // two are the cells with SUM attribute
+ // one cell with SUM_FINAL
+ assertEquals(4, cells.size());
+
+ for (int i = 0; i < cells.size(); i++) {
+ Cell returnedCell = cells.get(0);
+ assertNotNull(returnedCell);
+
+ long returnTs = returnedCell.getTimestamp();
+ long returnValue = Bytes.toLong(CellUtil
+ .cloneValue(returnedCell));
+ if (returnValue == cellValue2) {
+ assertTrue(returnTs == cell2Ts);
+ } else if (returnValue == cellValue3) {
+ assertTrue(returnTs == cell3Ts);
+ } else if (returnValue == cellValue4) {
+ assertTrue(returnTs == cell4Ts);
+ } else if (returnValue == cellValue1) {
+ assertTrue(returnTs != cell1Ts);
+ assertTrue(returnTs > cell1Ts);
+ assertTrue(returnTs >= currentTimestamp);
+ } else {
+ // raise a failure since we expect only these two values back
+ Assert.fail();
+ }
+ }
+ }
+
+ // tests with many cells
+ // of type SUM and SUM_FINAL
+ // all cells of SUM_FINAL will expire
+ @Test
+ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
+ FlowScanner fs = getFlowScannerForTestingCompaction();
+ int count = 200000;
+
+ long cellValueFinal = 1000L;
+ long cellValueNotFinal = 28L;
+
+ // note down the current timestamp
+ long currentTimestamp = System.currentTimeMillis();
+ long cellTsFinalStart = 10001120L;
+ long cellTsFinal = cellTsFinalStart;
+ long cellTsNotFinalStart = currentTimestamp - 5;
+ long cellTsNotFinal = cellTsNotFinalStart;
+
+ SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
+ List tags = null;
+ Tag t = null;
+ Cell c1 = null;
+
+ // insert SUM_FINAL cells
+ for (int i = 0; i < count; i++) {
+ tags = new ArrayList<>();
+ t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ "application_123450000" + i + "01_19" + i);
+ tags.add(t);
+ byte[] tagByteArray = Tag.fromList(tags);
+ // create a cell with a VERY old timestamp and attribute SUM_FINAL
+ c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
+ currentColumnCells.add(c1);
+ cellTsFinal++;
+ }
+
+ // add SUM cells
+ for (int i = 0; i < count; i++) {
+ tags = new ArrayList<>();
+ t = new Tag(AggregationOperation.SUM.getTagType(),
+ "application_1987650000" + i + "83_911" + i);
+ tags.add(t);
+ byte[] tagByteArray = Tag.fromList(tags);
+ // create a cell with attribute SUM
+ c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
+ currentColumnCells.add(c1);
+ cellTsNotFinal++;
+ }
+
+ List cells = fs.processSummationMajorCompaction(currentColumnCells,
+ LongConverter.getInstance(), currentTimestamp);
+ assertNotNull(cells);
+
+ // we should be getting back count + 1 cells
+ // one is the flow sum cell
+ // others are the cells with SUM attribute
+ assertEquals(count + 1, cells.size());
+
+ for (int i = 0; i < cells.size(); i++) {
+ Cell returnedCell = cells.get(0);
+ assertNotNull(returnedCell);
+
+ long returnTs = returnedCell.getTimestamp();
+ long returnValue = Bytes.toLong(CellUtil
+ .cloneValue(returnedCell));
+ if (returnValue == (count * cellValueFinal)) {
+ assertTrue(returnTs > (cellTsFinalStart + count));
+ assertTrue(returnTs >= currentTimestamp);
+ } else if ((returnValue >= cellValueNotFinal)
+ && (returnValue <= cellValueNotFinal * count)) {
+ assertTrue(returnTs >= cellTsNotFinalStart);
+ assertTrue(returnTs <= cellTsNotFinalStart * count);
+ } else {
+ // raise a failure since we expect only these values back
+ Assert.fail();
+ }
+ }
+ }
+
+ // tests with many cells
+ // of type SUM and SUM_FINAL
+ // NOT cells of SUM_FINAL will expire
+ @Test
+ public void checkProcessSummationMoreCellsSumFinalVariedTags() throws IOException {
+ FlowScanner fs = getFlowScannerForTestingCompaction();
+ int countFinal = 20100;
+ int countNotFinal = 1000;
+ int countFinalNotExpire = 7009;
+
+ long cellValueFinal = 1000L;
+ long cellValueNotFinal = 28L;
+
+ // note down the current timestamp
+ long currentTimestamp = System.currentTimeMillis();
+ long cellTsFinalStart = 10001120L;
+ long cellTsFinal = cellTsFinalStart;
+
+ long cellTsFinalStartNotExpire = TimestampGenerator.getSupplementedTimestamp(
+ System.currentTimeMillis(), "application_10266666661166_118821");
+ long cellTsFinalNotExpire = cellTsFinalStartNotExpire;
+
+ long cellTsNotFinalStart = currentTimestamp - 5;
+ long cellTsNotFinal = cellTsNotFinalStart;
+
+ SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
+ List tags = null;
+ Tag t = null;
+ Cell c1 = null;
+
+ // insert SUM_FINAL cells which will expire
+ for (int i = 0; i < countFinal; i++) {
+ tags = new ArrayList<>();
+ t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ "application_123450000" + i + "01_19" + i);
+ tags.add(t);
+ byte[] tagByteArray = Tag.fromList(tags);
+ // create a cell with a VERY old timestamp and attribute SUM_FINAL
+ c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
+ currentColumnCells.add(c1);
+ cellTsFinal++;
+ }
+
+ // insert SUM_FINAL cells which will NOT expire
+ for (int i = 0; i < countFinalNotExpire; i++) {
+ tags = new ArrayList<>();
+ t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ "application_123450000" + i + "01_19" + i);
+ tags.add(t);
+ byte[] tagByteArray = Tag.fromList(tags);
+ // create a cell with a VERY old timestamp and attribute SUM_FINAL
+ c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
+ currentColumnCells.add(c1);
+ cellTsFinalNotExpire++;
+ }
+
+ // add SUM cells
+ for (int i = 0; i < countNotFinal; i++) {
+ tags = new ArrayList<>();
+ t = new Tag(AggregationOperation.SUM.getTagType(),
+ "application_1987650000" + i + "83_911" + i);
+ tags.add(t);
+ byte[] tagByteArray = Tag.fromList(tags);
+ // create a cell with attribute SUM
+ c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
+ currentColumnCells.add(c1);
+ cellTsNotFinal++;
+ }
+
+ List cells = fs.processSummationMajorCompaction(currentColumnCells,
+ LongConverter.getInstance(), currentTimestamp);
+ assertNotNull(cells);
+
+ // we should be getting back
+ // countNotFinal + countFinalNotExpire + 1 cells
+ // one is the flow sum cell
+ // count = the cells with SUM attribute
+ // count = the cells with SUM_FINAL attribute but not expired
+ assertEquals(countFinalNotExpire + countNotFinal + 1, cells.size());
+
+ for (int i = 0; i < cells.size(); i++) {
+ Cell returnedCell = cells.get(0);
+ assertNotNull(returnedCell);
+
+ long returnTs = returnedCell.getTimestamp();
+ long returnValue = Bytes.toLong(CellUtil
+ .cloneValue(returnedCell));
+ if (returnValue == (countFinal * cellValueFinal)) {
+ assertTrue(returnTs > (cellTsFinalStart + countFinal));
+ assertTrue(returnTs >= currentTimestamp);
+ } else if (returnValue == cellValueNotFinal) {
+ assertTrue(returnTs >= cellTsNotFinalStart);
+ assertTrue(returnTs <= cellTsNotFinalStart + countNotFinal);
+ } else if (returnValue == cellValueFinal){
+ assertTrue(returnTs >= cellTsFinalStartNotExpire);
+ assertTrue(returnTs <= cellTsFinalStartNotExpire + countFinalNotExpire);
+ } else {
+ // raise a failure since we expect only these values back
+ Assert.fail();
+ }
+ }
+ }
+
+ @Test
+ public void testProcessSummationMoreCellsSumFinal() throws IOException {
+ FlowScanner fs = getFlowScannerForTestingCompaction();
+ // note down the current timestamp
+ long currentTimestamp = System.currentTimeMillis();
+ long cellValue1 = 1236L;
+ long cellValue2 = 28L;
+
+ List tags = new ArrayList<>();
+ Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ "application_1234588888_999888");
+ tags.add(t);
+ byte[] tagByteArray = Tag.fromList(tags);
+ SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
+
+ // create a cell with a VERY old timestamp and attribute SUM_FINAL
+ Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ 120L, Bytes.toBytes(cellValue1), tagByteArray);
+ currentColumnCells.add(c1);
+
+ tags = new ArrayList<>();
+ t = new Tag(AggregationOperation.SUM.getTagType(),
+ "application_100000000001_119101");
+ tags.add(t);
+ tagByteArray = Tag.fromList(tags);
+
+ // create a cell with a VERY old timestamp but has attribute SUM
+ Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ 130L, Bytes.toBytes(cellValue2), tagByteArray);
+ currentColumnCells.add(c2);
+ List cells = fs.processSummationMajorCompaction(currentColumnCells,
+ LongConverter.getInstance(), currentTimestamp);
+ assertNotNull(cells);
+
+ // we should be getting back two cells
+ // one is the flow sum cell
+ // another is the cell with SUM attribute
+ assertEquals(2, cells.size());
+
+ Cell returnedCell = cells.get(0);
+ assertNotNull(returnedCell);
+ long inputTs1 = c1.getTimestamp();
+ long inputTs2 = c2.getTimestamp();
+
+ long returnTs = returnedCell.getTimestamp();
+ long returnValue = Bytes.toLong(CellUtil
+ .cloneValue(returnedCell));
+ // the returned Ts will be far greater than input ts as well as the noted
+ // current timestamp
+ if (returnValue == cellValue2) {
+ assertTrue(returnTs == inputTs2);
+ } else if (returnValue == cellValue1) {
+ assertTrue(returnTs >= currentTimestamp);
+ assertTrue(returnTs != inputTs1);
+ } else {
+ // raise a failure since we expect only these two values back
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testProcessSummationOneCellSumFinal() throws IOException {
+ FlowScanner fs = getFlowScannerForTestingCompaction();
+
+ // note down the current timestamp
+ long currentTimestamp = System.currentTimeMillis();
+ List tags = new ArrayList<>();
+ Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ "application_123458888888_999888");
+ tags.add(t);
+ byte[] tagByteArray = Tag.fromList(tags);
+ SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
+
+ // create a cell with a VERY old timestamp
+ Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ 120L, Bytes.toBytes(1110L), tagByteArray);
+ currentColumnCells.add(c1);
+
+ List cells = fs.processSummationMajorCompaction(currentColumnCells,
+ LongConverter.getInstance(), currentTimestamp);
+ assertNotNull(cells);
+ // we should not get the same cell back
+ // but we get back the flow cell
+ assertEquals(1, cells.size());
+
+ Cell returnedCell = cells.get(0);
+ // it's NOT the same cell
+ assertNotEquals(c1, returnedCell);
+ long inputTs = c1.getTimestamp();
+ long returnTs = returnedCell.getTimestamp();
+ // the returned Ts will be far greater than input ts as well as the noted
+ // current timestamp
+ assertTrue(returnTs > inputTs);
+ assertTrue(returnTs >= currentTimestamp);
+ }
+
+ @Test
+ public void testProcessSummationOneCell() throws IOException {
+ FlowScanner fs = getFlowScannerForTestingCompaction();
+
+ // note down the current timestamp
+ long currentTimestamp = System.currentTimeMillis();
+
+ // try for 1 cell with tag SUM
+ List tags = new ArrayList<>();
+ Tag t = new Tag(AggregationOperation.SUM.getTagType(),
+ "application_123458888888_999888");
+ tags.add(t);
+ byte[] tagByteArray = Tag.fromList(tags);
+
+ SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
+
+ Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
+ currentTimestamp, Bytes.toBytes(1110L), tagByteArray);
+ currentColumnCells.add(c1);
+ List cells = fs.processSummationMajorCompaction(currentColumnCells,
+ LongConverter.getInstance(), currentTimestamp);
+ assertNotNull(cells);
+ // we expect the same cell back
+ assertEquals(1, cells.size());
+ Cell c2 = cells.get(0);
+ assertEquals(c1, c2);
+ assertEquals(currentTimestamp, c2.getTimestamp());
+ }
+
+ @Test
+ public void testProcessSummationEmpty() throws IOException {
+ FlowScanner fs = getFlowScannerForTestingCompaction();
+ long currentTimestamp = System.currentTimeMillis();
+
+ SortedSet currentColumnCells = null;
+ List cells = fs.processSummationMajorCompaction(currentColumnCells,
+ LongConverter.getInstance(), currentTimestamp);
+ assertNotNull(cells);
+ assertEquals(0, cells.size());
+
+ currentColumnCells = new TreeSet| (KeyValue.COMPARATOR);
+ cells = fs.processSummationMajorCompaction(currentColumnCells,
+ LongConverter.getInstance(), currentTimestamp);
+ assertNotNull(cells);
+ assertEquals(0, cells.size());
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+}
| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | |