diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6ac6fb9..56742fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1757,6 +1757,17 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60; + /** + * The setting that controls how often the coprocessor flushes the final + * values. + */ + public static final String APP_FINAL_VALUE_RETENTION_THRESHOLD = + TIMELINE_SERVICE_PREFIX + + "coprocessor.app-final-value-retention-milliseconds"; + + public static final long DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD = 7 * 24 + * 60 * 60 * 1000L; + public static final String ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS = TIMELINE_SERVICE_PREFIX + "app-collector.linger-period.ms"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 997b175..0c7e965 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -256,7 +256,8 @@ private void storeFlowMetricsAppRunning(String clusterId, String userId, byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId); storeFlowMetrics(rowKey, metrics, - AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId), + AggregationOperation.SUM.getAttribute()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index 2328bba..7dced10 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -27,6 +28,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -494,4 +498,97 @@ public static boolean isIntegralValue(Object obj) { return (obj instanceof Short) || (obj instanceof Integer) || (obj instanceof Long); } + + /** + * creates a new cell based on the input cell but with the new value. + * + * @param origCell Original cell + * @param newValue new cell value + * @return cell + * @throws IOException while creating new cell. + */ + public static Cell createNewCell(Cell origCell, byte[] newValue) + throws IOException { + return CellUtil.createCell(CellUtil.cloneRow(origCell), + CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), + origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); + } + + /** + * creates a cell with the given inputs. + * + * @param row row of the cell to be created + * @param family column family name of the new cell + * @param qualifier qualifier for the new cell + * @param ts timestamp of the new cell + * @param newValue value of the new cell + * @param tags tags in the new cell + * @return cell + * @throws IOException while creating the cell. + */ + public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier, + long ts, byte[] newValue, byte[] tags) throws IOException { + return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put, + newValue, tags); + } + + /** + * creates a readable string representation of a Cell. + * + * @param cell whose data is to be printed + * @return a string representation of a Cell. + */ + public static String printCellDetails(Cell cell) { + StringBuilder cellDetails = new StringBuilder(); + cellDetails.append(" Row="); + cellDetails.append(Bytes.toString(CellUtil.cloneRow(cell))); + cellDetails.append(" Family="); + cellDetails.append(Bytes.toString(CellUtil.cloneFamily(cell))); + cellDetails.append(" Qualifier="); + cellDetails.append(Bytes.toString(CellUtil.cloneQualifier(cell))); + cellDetails.append(" Value="); + cellDetails.append(Bytes.toString(CellUtil.cloneValue(cell))); + cellDetails.append(" Timestamp="); + cellDetails.append(cell.getTimestamp()); + cellDetails.append(" Tags="); + cellDetails.append(printTags(cell)); + return cellDetails.toString(); + } + + /** + * creates a readable string of all the tags of that cell. + * + * @param cell whose tags are to be printed + * @return string representation for the cell tags + */ + public static String printTags(Cell cell) { + StringBuilder tagDetails = new StringBuilder(); + List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + for (Tag t : tags) { + tagDetails.append(" type="); + tagDetails.append(t.getType()); + tagDetails.append(" value="); + tagDetails.append(Bytes.toString(t.getValue())); + } + return tagDetails.toString(); + } + + /** + * returns app id from the list of tags. + * + * @param tags cell tags to be looked into + * @return App Id as the AggregationCompactionDimension + */ + public static String getAggregationCompactionDimension(List tags) { + String appId = null; + for (Tag t : tags) { + if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t + .getType()) { + appId = Bytes.toString(t.getValue()); + return appId; + } + } + return appId; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java index 7238efa..288046c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java @@ -33,7 +33,7 @@ * if this is changed, then reading cell timestamps written with older * multiplier value will not work */ - public static final long TS_MULTIPLIER = 1000L; + public static final long TS_MULTIPLIER = 1000000L; private final AtomicLong lastTimestamp = new AtomicLong(); @@ -74,13 +74,14 @@ public long getUniqueTimestamp() { } /** - * returns a timestamp multiplied with TS_MULTIPLIER and last few digits of - * application id + * Returns a timestamp multiplied with TS_MULTIPLIER and last few digits of + * application id. * * Unlikely scenario of generating a timestamp that is a duplicate: If more - * than a 1000 concurrent apps are running in one flow run AND write to same - * column at the same time, then say appId of 1001 will overlap with appId of - * 001 and there may be collisions for that flow run's specific column. + * than a 1M concurrent apps are running in one flow run AND write to same + * column at the same time, then say appId of 1M and 1 will overlap + * with appId of 001 and there may be collisions for that flow run's + * specific column. * * @param incomingTS Timestamp to be converted. * @param appId Application Id. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java index 6240e81..40cdd2c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java @@ -21,19 +21,19 @@ /** * Identifies the attributes to be set for puts into the {@link FlowRunTable}. - * The numbers used for tagType are prime numbers + * The numbers used for tagType are prime numbers. */ public enum AggregationOperation { /** * When the flow was started. */ - MIN((byte) 71), + GLOBAL_MIN((byte) 71), /** * When it ended. */ - MAX((byte) 73), + GLOBAL_MAX((byte) 73), /** * The metrics of the flow. @@ -46,9 +46,16 @@ SUM_FINAL((byte) 83), /** - * compact. + * Min value as per the latest timestamp + * seen for a given app. */ - COMPACT((byte) 89); + LATEST_MIN((byte) 89), + + /** + * Max value as per the latest timestamp + * seen for a given app. + */ + LATEST_MAX((byte) 97); private byte tagType; private byte[] inBytes; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java index 148a37f..d50bb16 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -41,14 +41,14 @@ * application start times. */ MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time", - AggregationOperation.MIN, LongConverter.getInstance()), + AggregationOperation.GLOBAL_MIN, LongConverter.getInstance()), /** * When the flow ended. This is the maximum of currently known application end * times. */ MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time", - AggregationOperation.MAX, LongConverter.getInstance()), + AggregationOperation.GLOBAL_MAX, LongConverter.getInstance()), /** * The version of the flow that this flow belongs to. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index 3d7c40e..fa94aae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -40,7 +40,7 @@ /** * To store flow run info values. */ - METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM, + METRIC(FlowRunColumnFamily.INFO, "m", null, LongConverter.getInstance()); private final ColumnHelper column; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index 9698f06..e847de7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -40,7 +40,12 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; @@ -51,7 +56,6 @@ */ public class FlowRunCoprocessor extends BaseRegionObserver { - @SuppressWarnings("unused") private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); private HRegion region; @@ -160,8 +164,8 @@ public void preGetOp(ObserverContext e, scan.setMaxVersions(); RegionScanner scanner = null; try { - scanner = new FlowScanner(region, scan.getBatch(), - region.getScanner(scan)); + scanner = new FlowScanner(e.getEnvironment(), scan.getBatch(), + region.getScanner(scan), FlowScannerOperation.READ); scanner.next(results); e.bypass(); } finally { @@ -209,6 +213,64 @@ public RegionScanner preScannerOpen( public RegionScanner postScannerOpen( ObserverContext e, Scan scan, RegionScanner scanner) throws IOException { - return new FlowScanner(region, scan.getBatch(), scanner); + return new FlowScanner(e.getEnvironment(), scan.getBatch(), + scanner, FlowScannerOperation.READ); + } + + @Override + public InternalScanner preFlush( + ObserverContext c, Store store, + InternalScanner scanner) throws IOException { + if (LOG.isDebugEnabled()) { + if (store != null) { + LOG.debug("preFlush store = " + store.getColumnFamilyName() + + " flushableSize=" + store.getFlushableSize() + + " flushedCellsCount=" + store.getFlushedCellsCount() + + " compactedCellsCount=" + store.getCompactedCellsCount() + + " majorCompactedCellsCount=" + + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" + + store.getMemstoreFlushSize() + " memstoreSize=" + + store.getMemStoreSize() + " size=" + store.getSize() + + " storeFilesCount=" + store.getStorefilesCount()); + } + } + return new FlowScanner(c.getEnvironment(), -1, scanner, + FlowScannerOperation.FLUSH); + } + + @Override + public void postFlush(ObserverContext c, + Store store, StoreFile resultFile) { + if (LOG.isDebugEnabled()) { + if (store != null) { + LOG.debug("postFlush store = " + store.getColumnFamilyName() + + " flushableSize=" + store.getFlushableSize() + + " flushedCellsCount=" + store.getFlushedCellsCount() + + " compactedCellsCount=" + store.getCompactedCellsCount() + + " majorCompactedCellsCount=" + + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" + + store.getMemstoreFlushSize() + " memstoreSize=" + + store.getMemStoreSize() + " size=" + store.getSize() + + " storeFilesCount=" + store.getStorefilesCount()); + } + } + } + + @Override + public InternalScanner preCompact( + ObserverContext e, Store store, + InternalScanner scanner, ScanType scanType, CompactionRequest request) + throws IOException { + + FlowScannerOperation requestOp = FlowScannerOperation.MAJOR_COMPACTION; + if (request != null) { + LOG.info("Compactionrequest= " + request.toString() + " " + + request.isMajor() + " for ObserverContext " + + e.getEnvironment().getRegion().getRegionNameAsString()); + requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION + : FlowScannerOperation.MINOR_COMPACTION); + } + + return new FlowScanner(e.getEnvironment(), -1, scanner, requestOp); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 6fefd15..3d4760e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -29,20 +29,26 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; + +import com.google.common.annotations.VisibleForTesting; /** * Invoked via the coprocessor when a Get or a Scan is issued for flow run @@ -55,23 +61,42 @@ private static final Log LOG = LogFactory.getLog(FlowScanner.class); + /** + * use a special application id to represent the flow id this is needed since + * TimestampGenerator parses the app id to generate a cell timestamp. + */ + private static final String FLOW_APP_ID = "application_00000000000_0000"; + private final HRegion region; private final InternalScanner flowRunScanner; - private RegionScanner regionScanner; private final int limit; + private final long appFinalValueRetentionThreshold; + private RegionScanner regionScanner; private boolean hasMore; private byte[] currentRow; private List availableCells = new ArrayList<>(); private int currentIndex; + private FlowScannerOperation action = FlowScannerOperation.READ; - FlowScanner(HRegion region, int limit, InternalScanner internalScanner) { - this.region = region; + FlowScanner(RegionCoprocessorEnvironment env, int limit, + InternalScanner internalScanner, FlowScannerOperation action) { this.limit = limit; this.flowRunScanner = internalScanner; if (internalScanner instanceof RegionScanner) { this.regionScanner = (RegionScanner) internalScanner; } - // TODO: note if it's compaction/flush + this.action = action; + if (env == null) { + this.appFinalValueRetentionThreshold = + YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD; + this.region = null; + } else { + this.region = env.getRegion(); + Configuration hbaseConf = env.getConfiguration(); + this.appFinalValueRetentionThreshold = hbaseConf.getLong( + YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD, + YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD); + } } /* @@ -104,17 +129,6 @@ public boolean next(List cells, int cellLimit) throws IOException { return nextInternal(cells, cellLimit); } - private String getAggregationCompactionDimension(List tags) { - String appId = null; - for (Tag t : tags) { - if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t - .getType()) { - appId = Bytes.toString(t.getValue()); - } - } - return appId; - } - /** * Get value converter associated with a column or a column prefix. If nothing * matches, generic converter is returned. @@ -161,7 +175,7 @@ private static boolean isNumericConverter(ValueConverter converter) { * column or returns the cell as is. * * @param cells - * @param cellLimit + * @param limit * @return true if next row is available for the scanner, false otherwise * @throws IOException */ @@ -183,14 +197,18 @@ private boolean nextInternal(List cells, int cellLimit) SortedSet currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); Set alreadySeenAggDim = new HashSet<>(); int addedCnt = 0; + long currentTimestamp = System.currentTimeMillis(); ValueConverter converter = null; - while (((cell = peekAtNextCell(cellLimit)) != null) - && (cellLimit <= 0 || addedCnt < cellLimit)) { + while (cellLimit <= 0 || addedCnt < cellLimit) { + cell = peekAtNextCell(cellLimit); + if (cell == null) { + break; + } byte[] newColumnQualifier = CellUtil.cloneQualifier(cell); if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { if (converter != null && isNumericConverter(converter)) { addedCnt += emitCells(cells, currentColumnCells, currentAggOp, - (NumericValueConverter)converter); + (NumericValueConverter)converter, currentTimestamp); } resetState(currentColumnCells, alreadySeenAggDim); currentColumnQualifier = newColumnQualifier; @@ -199,7 +217,7 @@ private boolean nextInternal(List cells, int cellLimit) } // No operation needs to be performed on non numeric converters. if (!isNumericConverter(converter)) { - nextCell(cellLimit); + nextCell(limit); continue; } collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim, @@ -207,8 +225,16 @@ private boolean nextInternal(List cells, int cellLimit) nextCell(cellLimit); } if (!currentColumnCells.isEmpty()) { - emitCells(cells, currentColumnCells, currentAggOp, - (NumericValueConverter)converter); + addedCnt += emitCells(cells, currentColumnCells, currentAggOp, + (NumericValueConverter)converter, currentTimestamp); + if (LOG.isDebugEnabled()) { + if (addedCnt > 0) { + LOG.debug("emitted cells. " + addedCnt + " for " + this.action + + " rowKey="); + } else { + LOG.debug("emitted no cells for " + this.action); + } + } } return hasMore(); } @@ -247,7 +273,7 @@ private void collectCells(SortedSet currentColumnCells, } switch (currentAggOp) { - case MIN: + case GLOBAL_MIN: if (currentColumnCells.size() == 0) { currentColumnCells.add(cell); } else { @@ -260,7 +286,7 @@ private void collectCells(SortedSet currentColumnCells, } } break; - case MAX: + case GLOBAL_MAX: if (currentColumnCells.size() == 0) { currentColumnCells.add(cell); } else { @@ -275,16 +301,32 @@ private void collectCells(SortedSet currentColumnCells, break; case SUM: case SUM_FINAL: + if (LOG.isTraceEnabled()) { + LOG.trace("In collect cells " + + " FlowSannerOperation=" + + this.action + + " currentAggOp=" + + currentAggOp + + " cell qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(cell)) + + " cell value= " + + (Number) converter.decodeValue(CellUtil.cloneValue(cell)) + + " timestamp=" + cell.getTimestamp()); + } + // only if this app has not been seen yet, add to current column cells List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); - String aggDim = getAggregationCompactionDimension(tags); - - // If this agg dimension has already been seen, since they show up in - // sorted order, we drop the rest which are older. In other words, this - // cell is older than previously seen cells for that agg dim. + String aggDim = TimelineStorageUtils + .getAggregationCompactionDimension(tags); if (!alreadySeenAggDim.contains(aggDim)) { - // Not seen this agg dim, hence consider this cell in our working set + // if this agg dimension has already been seen, + // since they show up in sorted order + // we drop the rest which are older + // in other words, this cell is older than previously seen cells + // for that agg dim + // but when this agg dim is not seen, + // consider this cell in our working set currentColumnCells.add(cell); alreadySeenAggDim.add(aggDim); } @@ -300,8 +342,8 @@ private void collectCells(SortedSet currentColumnCells, * parameter. */ private int emitCells(List cells, SortedSet currentColumnCells, - AggregationOperation currentAggOp, NumericValueConverter converter) - throws IOException { + AggregationOperation currentAggOp, NumericValueConverter converter, + long currentTimestamp) throws IOException { if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { return 0; } @@ -309,17 +351,36 @@ private int emitCells(List cells, SortedSet currentColumnCells, cells.addAll(currentColumnCells); return currentColumnCells.size(); } + if (LOG.isTraceEnabled()) { + LOG.trace("In emitCells " + this.action + " currentColumnCells size= " + + currentColumnCells.size() + " currentAggOp" + currentAggOp); + } switch (currentAggOp) { - case MIN: - case MAX: + case GLOBAL_MIN: + case GLOBAL_MAX: cells.addAll(currentColumnCells); return currentColumnCells.size(); case SUM: case SUM_FINAL: - Cell sumCell = processSummation(currentColumnCells, converter); - cells.add(sumCell); - return 1; + switch (action) { + case FLUSH: + case MINOR_COMPACTION: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + case READ: + Cell sumCell = processSummation(currentColumnCells, converter); + cells.add(sumCell); + return 1; + case MAJOR_COMPACTION: + List finalCells = processSummationMajorCompaction( + currentColumnCells, converter, currentTimestamp); + cells.addAll(finalCells); + return finalCells.size(); + default: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } default: cells.addAll(currentColumnCells); return currentColumnCells.size(); @@ -349,10 +410,119 @@ private Cell processSummation(SortedSet currentColumnCells, sum = converter.add(sum, currentValue); } byte[] sumBytes = converter.encodeValue(sum); - Cell sumCell = createNewCell(mostRecentCell, sumBytes); + Cell sumCell = TimelineStorageUtils.createNewCell(mostRecentCell, sumBytes); return sumCell; } + + /** + * Returns a list of cells that contains + * + * A) the latest cells for applications that haven't finished yet + * B) summation + * for the flow, based on applications that have completed and are older than + * a certain time + * + * The new cell created has the timestamp of the most recent metric cell. The + * sum of a metric for a flow run is the summation at the point of the last + * metric update in that flow till that time. + */ + @VisibleForTesting + List processSummationMajorCompaction( + SortedSet currentColumnCells, NumericValueConverter converter, + long currentTimestamp) + throws IOException { + Number sum = 0; + Number currentValue = 0; + long ts = 0L; + List finalCells = new ArrayList(); + if (currentColumnCells == null) { + return finalCells; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("In processSummationMajorCompaction," + + " will drop cells older than " + currentTimestamp + + " CurrentColumnCells size=" + currentColumnCells.size()); + } + + for (Cell cell : currentColumnCells) { + AggregationOperation cellAggOp = getCurrentAggOp(cell); + // if this is the existing flow sum cell + List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + String appId = TimelineStorageUtils + .getAggregationCompactionDimension(tags); + if (appId == FLOW_APP_ID) { + sum = converter.add(sum, currentValue); + if (LOG.isTraceEnabled()) { + LOG.trace("reading flow app id sum=" + sum); + } + } else { + currentValue = (Number) converter.decodeValue(CellUtil + .cloneValue(cell)); + // read the timestamp truncated by the generator + ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp()); + if ((cellAggOp == AggregationOperation.SUM_FINAL) + && ((ts + this.appFinalValueRetentionThreshold) + < currentTimestamp)) { + sum = converter.add(sum, currentValue); + if (LOG.isTraceEnabled()) { + LOG.trace("MAJOR COMPACTION loop sum= " + sum + + " discarding now: " + " qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value=" + + (Number) converter.decodeValue(CellUtil.cloneValue(cell)) + + " timestamp=" + cell.getTimestamp() + " " + this.action); + } + } else { + // not a final value but it's the latest cell for this app + // so include this cell in the list of cells to write back + finalCells.add(cell); + } + } + } + if (sum.longValue() > 0L) { + Cell anyCell = currentColumnCells.first(); + List tags = new ArrayList(); + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Bytes.toBytes(FLOW_APP_ID)); + tags.add(t); + t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(), + Bytes.toBytes(FLOW_APP_ID)); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + Cell sumCell = TimelineStorageUtils.createNewCell( + CellUtil.cloneRow(anyCell), + CellUtil.cloneFamily(anyCell), + CellUtil.cloneQualifier(anyCell), + TimestampGenerator.getSupplementedTimestamp( + System.currentTimeMillis(), FLOW_APP_ID), + converter.encodeValue(sum), tagByteArray); + finalCells.add(sumCell); + if (LOG.isTraceEnabled()) { + LOG.trace("MAJOR COMPACTION final sum= " + sum + " for " + + Bytes.toString(CellUtil.cloneQualifier(sumCell)) + + " " + this.action); + } + LOG.info("After major compaction for qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(sumCell)) + + " with currentColumnCells.size=" + + currentColumnCells.size() + + " returning finalCells.size=" + finalCells.size() + + " with sum=" + sum.longValue() + + " with cell timestamp " + sumCell.getTimestamp()); + } else { + String qualifier = ""; + LOG.info("After major compaction for qualifier=" + qualifier + + " with currentColumnCells.size=" + + currentColumnCells.size() + + " returning finalCells.size=" + finalCells.size() + + " with zero sum=" + + sum.longValue()); + } + return finalCells; + } + /** * Determines which cell is to be returned based on the values in each cell * and the comparison operation MIN or MAX. @@ -375,7 +545,7 @@ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, Number currentCellValue = (Number) converter.decodeValue(CellUtil .cloneValue(currentCell)); switch (currentAggOp) { - case MIN: + case GLOBAL_MIN: if (converter.compare( currentCellValue, previouslyChosenCellValue) < 0) { // new value is minimum, hence return this cell @@ -384,7 +554,7 @@ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, // previously chosen value is miniumum, hence return previous min cell return previouslyChosenCell; } - case MAX: + case GLOBAL_MAX: if (converter.compare( currentCellValue, previouslyChosenCellValue) > 0) { // new value is max, hence return this cell @@ -402,16 +572,13 @@ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, } } - private Cell createNewCell(Cell origCell, byte[] newValue) - throws IOException { - return CellUtil.createCell(CellUtil.cloneRow(origCell), - CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), - origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); - } - @Override public void close() throws IOException { - flowRunScanner.close(); + if (flowRunScanner != null) { + flowRunScanner.close(); + } else { + LOG.warn("scanner close called but scanner is null"); + } } /** @@ -423,8 +590,6 @@ public void startNext() { /** * Returns whether or not the underlying scanner has more rows. - * - * @return true, if there are more cells to return, false otherwise. */ public boolean hasMore() { return currentIndex < availableCells.size() ? true : hasMore; @@ -440,8 +605,7 @@ public boolean hasMore() { * fetched by the wrapped scanner * @return the next available cell or null if no more cells are available for * the current row - * @throws IOException if any problem is encountered while grabbing the next - * cell. + * @throws IOException */ public Cell nextCell(int cellLimit) throws IOException { Cell cell = peekAtNextCell(cellLimit); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java new file mode 100644 index 0000000..30a0324 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScannerOperation.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + + +/** + * Identifies the scanner operation on the {@link FlowRunTable}. + */ +public enum FlowScannerOperation { + + /** + * If the scanner is opened for reading + * during preGet or preScan. + */ + READ, + + /** + * If the scanner is opened during preFlush. + */ + FLUSH, + + /** + * If the scanner is opened during minor Comapction. + */ + MINOR_COMPACTION, + + /** + * If the scanner is opened during major Comapction. + */ + MAJOR_COMPACTION +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java index a4c06f2..78de937 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -29,17 +28,60 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.conf.Configuration; /** * Generates the data/entities for the FlowRun and FlowActivity Tables */ class TestFlowDataGenerator { - private final static String metric1 = "MAP_SLOT_MILLIS"; - private final static String metric2 = "HDFS_BYTES_READ"; + private static final String metric1 = "MAP_SLOT_MILLIS"; + private static final String metric2 = "HDFS_BYTES_READ"; + public static final long END_TS_INCR = 10000L; + + static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map metricValues = new HashMap(); + long ts = insertTs; - static TimelineEntity getEntityMetricsApp1() { + for (int k=1; k< 100 ; k++) { + metricValues.put(ts - k*200000, 20L); + } + metricValues.put(ts - 80000, 40L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap(); + ts = System.currentTimeMillis(); + for (int k=1; k< 100 ; k++) { + metricValues.put(ts - k*100000, 31L); + } + + metricValues.put(ts - 80000, 57L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + + entity.addMetrics(metrics); + return entity; + } + + + static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration c1) { TimelineEntity entity = new TimelineEntity(); String id = "flowRunMetrics_test"; String type = TimelineEntityType.YARN_APPLICATION.toString(); @@ -53,7 +95,48 @@ static TimelineEntity getEntityMetricsApp1() { TimelineMetric m1 = new TimelineMetric(); m1.setId(metric1); Map metricValues = new HashMap(); - long ts = System.currentTimeMillis(); + long ts = insertTs; + + metricValues.put(ts - 80000, 40L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap(); + ts = insertTs; + metricValues.put(ts - 80000, 57L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + + entity.addMetrics(metrics); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(insertTs); + event.addInfo("done", "insertTs=" + insertTs); + entity.addEvent(event); + return entity; + } + + + static TimelineEntity getEntityMetricsApp1(long insertTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map metricValues = new HashMap(); + long ts = insertTs; metricValues.put(ts - 100000, 2L); metricValues.put(ts - 80000, 40L); m1.setType(Type.TIME_SERIES); @@ -63,7 +146,7 @@ static TimelineEntity getEntityMetricsApp1() { TimelineMetric m2 = new TimelineMetric(); m2.setId(metric2); metricValues = new HashMap(); - ts = System.currentTimeMillis(); + ts = insertTs; metricValues.put(ts - 100000, 31L); metricValues.put(ts - 80000, 57L); m2.setType(Type.TIME_SERIES); @@ -74,7 +157,8 @@ static TimelineEntity getEntityMetricsApp1() { return entity; } - static TimelineEntity getEntityMetricsApp2() { + + static TimelineEntity getEntityMetricsApp2(long insertTs) { TimelineEntity entity = new TimelineEntity(); String id = "flowRunMetrics_test"; String type = TimelineEntityType.YARN_APPLICATION.toString(); @@ -87,7 +171,7 @@ static TimelineEntity getEntityMetricsApp2() { TimelineMetric m1 = new TimelineMetric(); m1.setId(metric1); Map metricValues = new HashMap(); - long ts = System.currentTimeMillis(); + long ts = insertTs; metricValues.put(ts - 100000, 5L); metricValues.put(ts - 80000, 101L); m1.setType(Type.TIME_SERIES); @@ -141,6 +225,54 @@ static TimelineEntity getEntity1() { return entity; } + static TimelineEntity getAFullEntity(long ts, long endTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunFullEntity"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(ts); + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map metricValues = new HashMap(); + metricValues.put(ts - 120000, 100000000L); + metricValues.put(ts - 100000, 200000000L); + metricValues.put(ts - 80000, 300000000L); + metricValues.put(ts - 60000, 400000000L); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap(); + metricValues.put(ts - 900000, 31L); + metricValues.put(ts - 30000, 57L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + entity.addMetrics(metrics); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(ts); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + + event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(endTs); + event.addInfo(expKey, expVal); + entity.addEvent(event); + + return entity; + } + static TimelineEntity getEntityGreaterStartTime(long startTs) { TimelineEntity entity = new TimelineEntity(); entity.setCreatedTime(startTs); @@ -186,6 +318,34 @@ static TimelineEntity getEntityMinStartTime(long startTs) { return entity; } + static TimelineEntity getMinFlushEntity(long startTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHelloFlushEntityMin"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(startTs); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(startTs); + entity.addEvent(event); + return entity; + } + + static TimelineEntity getMaxFlushEntity(long startTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHelloFlushEntityMax"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(startTs); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(startTs + END_TS_INCR); + entity.addEvent(event); + return entity; + } static TimelineEntity getFlowApp1() { TimelineEntity entity = new TimelineEntity(); @@ -207,5 +367,4 @@ static TimelineEntity getFlowApp1() { return entity; } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 9504799..c83ea3c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -216,7 +216,8 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception { long runid = 1002345678919L; TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + TimelineEntity entityApp1 = TestFlowDataGenerator + .getEntityMetricsApp1(System.currentTimeMillis()); te.addEntity(entityApp1); HBaseTimelineWriterImpl hbi = null; @@ -228,7 +229,8 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception { hbi.write(cluster, user, flow, flowVersion, runid, appName, te); // write another application with same metric to this flow te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + TimelineEntity entityApp2 = TestFlowDataGenerator + .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; hbi.write(cluster, user, flow, flowVersion, runid, appName, te); @@ -323,7 +325,8 @@ public void testWriteFlowRunMetricsPrefix() throws Exception { long runid = 1002345678919L; TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + TimelineEntity entityApp1 = TestFlowDataGenerator + .getEntityMetricsApp1(System.currentTimeMillis()); te.addEntity(entityApp1); HBaseTimelineWriterImpl hbi = null; @@ -335,7 +338,8 @@ public void testWriteFlowRunMetricsPrefix() throws Exception { hbi.write(cluster, user, flow, flowVersion, runid, appName, te); // write another application with same metric to this flow te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + TimelineEntity entityApp2 = TestFlowDataGenerator + .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; hbi.write(cluster, user, flow, flowVersion, runid, appName, te); @@ -420,7 +424,8 @@ public void testWriteFlowRunsMetricFields() throws Exception { long runid = 1002345678919L; TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + TimelineEntity entityApp1 = TestFlowDataGenerator + .getEntityMetricsApp1(System.currentTimeMillis()); te.addEntity(entityApp1); HBaseTimelineWriterImpl hbi = null; @@ -432,7 +437,8 @@ public void testWriteFlowRunsMetricFields() throws Exception { hbi.write(cluster, user, flow, flowVersion, runid, appName, te); // write another application with same metric to this flow te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + TimelineEntity entityApp2 = TestFlowDataGenerator + .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; hbi.write(cluster, user, flow, flowVersion, runid, appName, te); @@ -494,6 +500,98 @@ public void testWriteFlowRunsMetricFields() throws Exception { } } + @Test + public void testWriteFlowRunFlush() throws Exception { + String cluster = "atestFlushFlowRun_cluster1"; + String user = "atestFlushFlowRun__user1"; + String flow = "atestFlushFlowRun_flow_name"; + String flowVersion = "AF1021C19F1351"; + long runid = 1449526652000L; + + int start = 10; + int count = 20000; + int appIdSuffix = 1; + HBaseTimelineWriterImpl hbi = null; + long insertTs = 1449796654827L - count; + long minTS = insertTs + 1; + long startTs = insertTs; + Configuration c1 = util.getConfiguration(); + TimelineEntities te1 = null; + TimelineEntity entityApp1 = null; + TimelineEntity entityApp2 = null; + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + + for (int i = start; i < count; i++) { + String appName = "application_1060350000000_" + appIdSuffix; + insertTs++; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs); + te1.addEntity(entityApp1); + entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); + te1.addEntity(entityApp2); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + Thread.sleep(1); + + appName = "application_1001199480000_7" + appIdSuffix; + insertTs++; + appIdSuffix++; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs); + te1.addEntity(entityApp1); + entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); + te1.addEntity(entityApp2); + + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + if (i % 1000 == 0) { + hbi.flush(); + checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, + runid, false); + } + } + } finally { + hbi.flush(); + hbi.close(); + checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid, + true); + } + } + + private void checkMinMaxFlush(Configuration c1, long minTS, long startTs, + int count, String cluster, String user, String flow, long runid, + boolean checkMax) throws IOException { + Connection conn = ConnectionFactory.createConnection(c1); + // check in flow run table + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + // scan the table and see that we get back the right min and max + // timestamps + byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + Get g = new Get(startRow); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()); + + Result r1 = table1.get(g); + assertNotNull(r1); + assertTrue(!r1.isEmpty()); + Map values = r1.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + int start = 10; + assertEquals(2, r1.size()); + long starttime = Bytes.toLong(values + .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); + assertEquals(minTS, starttime); + if (checkMax) { + assertEquals(startTs + 2 * (count - start) + + TestFlowDataGenerator.END_TS_INCR, + Bytes.toLong(values + .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); + } + } + @AfterClass public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java new file mode 100644 index 0000000..a5ca458 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -0,0 +1,638 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotEquals; + +import java.io.IOException; +import java.util.Map; +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; + +/** + * Tests the FlowRun and FlowActivity Tables + */ +public class TestHBaseStorageFlowRunCompaction { + + private static HBaseTestingUtility util; + + private final String metric1 = "MAP_SLOT_MILLIS"; + private final String metric2 = "HDFS_BYTES_READ"; + + private final byte[] aRowKey = Bytes.toBytes("a"); + private final byte[] aFamily = Bytes.toBytes("family"); + private final byte[] aQualifier = Bytes.toBytes("qualifier"); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + createSchema(); + } + + private static void createSchema() throws IOException { + TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); + } + + @Test + public void testWriteFlowRunCompaction() throws Exception { + String cluster = "kompaction_cluster1"; + String user = "kompaction_FlowRun__user1"; + String flow = "kompaction_flowRun_flow_name"; + String flowVersion = "AF1021C19F1351"; + long runid = 1449526652000L; + + int start = 10; + int count = 2000; + int appIdSuffix = 1; + HBaseTimelineWriterImpl hbi = null; + long insertTs = System.currentTimeMillis() - count; + Configuration c1 = util.getConfiguration(); + TimelineEntities te1 = null; + TimelineEntity entityApp1 = null; + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + // now insert count * ( 100 + 100) metrics + // each call to getEntityMetricsApp1 brings back 100 values + // of metric1 and 100 of metric2 + for (int i = start; i < start + count; i++) { + String appName = "application_10240000000000_" + appIdSuffix; + insertTs++; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(insertTs, c1); + te1.addEntity(entityApp1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + + appName = "application_2048000000000_7" + appIdSuffix; + insertTs++; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getEntityMetricsApp2(insertTs); + te1.addEntity(entityApp1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + } + } finally { + String appName = "application_10240000000000_" + appIdSuffix; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1Complete( + insertTs + 1, c1); + te1.addEntity(entityApp1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + hbi.flush(); + hbi.close(); + } + + // check in flow run table + HRegionServer server = util.getRSForFirstRegionInTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + List regions = server.getOnlineRegions(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + assertTrue("Didn't find any regions for primary table!", regions.size() > 0); + // flush and compact all the regions of the primary table + for (HRegion region : regions) { + region.flushcache(); + region.compactStores(true); + } + + // check flow run for one flow many apps + checkFlowRunTable(cluster, user, flow, runid, c1, 3); + } + + + private void checkFlowRunTable(String cluster, String user, String flow, + long runid, Configuration c1, int valueCount) throws IOException { + Scan s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + Map values = result.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + assertEquals(valueCount, values.size()); + + rowCount++; + // check metric1 + byte[] q = ColumnHelper.getColumnQualifier( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1); + assertTrue(values.containsKey(q)); + assertEquals(141, Bytes.toLong(values.get(q))); + + // check metric2 + q = ColumnHelper.getColumnQualifier( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2); + assertTrue(values.containsKey(q)); + assertEquals(57, Bytes.toLong(values.get(q))); + } + assertEquals(1, rowCount); + } + + + private FlowScanner getFlowScannerForTestingCompaction() { + // create a FlowScanner object with the sole purpose of invoking a process + // summation; + CompactionRequest request = new CompactionRequest(); + request.setIsMajor(true, true); + // okay to pass in nulls for the constructor arguments + // because all we want to do is invoke the process summation + FlowScanner fs = new FlowScanner(null, -1, null, + (request.isMajor() == true ? FlowScannerOperation.MAJOR_COMPACTION + : FlowScannerOperation.MINOR_COMPACTION)); + assertNotNull(fs); + return fs; + } + + @Test + public void checkProcessSummationMoreCellsSumFinal2() + throws IOException { + long cellValue1 = 1236L; + long cellValue2 = 28L; + long cellValue3 = 1236L; + long cellValue4 = 1236L; + FlowScanner fs = getFlowScannerForTestingCompaction(); + + // note down the current timestamp + long currentTimestamp = System.currentTimeMillis(); + long cell1Ts = 1200120L; + long cell2Ts = TimestampGenerator.getSupplementedTimestamp( + System.currentTimeMillis(),"application_123746661110_11202"); + long cell3Ts = 1277719L; + long cell4Ts = currentTimestamp - 10; + + SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); + + List tags = new ArrayList<>(); + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_1234588888_91188"); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + // create a cell with a VERY old timestamp and attribute SUM_FINAL + Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cell1Ts, Bytes.toBytes(cellValue1), tagByteArray); + currentColumnCells.add(c1); + + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_12700000001_29102"); + tags.add(t); + tagByteArray = Tag.fromList(tags); + // create a cell with a recent timestamp and attribute SUM_FINAL + Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cell2Ts, Bytes.toBytes(cellValue2), tagByteArray); + currentColumnCells.add(c2); + + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM.getTagType(), + "application_191780000000001_8195"); + tags.add(t); + tagByteArray = Tag.fromList(tags); + // create a cell with a VERY old timestamp but has attribute SUM + Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cell3Ts, Bytes.toBytes(cellValue3), tagByteArray); + currentColumnCells.add(c3); + + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM.getTagType(), + "application_191780000000001_98104"); + tags.add(t); + tagByteArray = Tag.fromList(tags); + // create a cell with a VERY old timestamp but has attribute SUM + Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); + currentColumnCells.add(c4); + + List cells = fs.processSummationMajorCompaction(currentColumnCells, + LongConverter.getInstance(), currentTimestamp); + assertNotNull(cells); + + // we should be getting back 4 cells + // one is the flow sum cell + // two are the cells with SUM attribute + // one cell with SUM_FINAL + assertEquals(4, cells.size()); + + for (int i = 0; i < cells.size(); i++) { + Cell returnedCell = cells.get(0); + assertNotNull(returnedCell); + + long returnTs = returnedCell.getTimestamp(); + long returnValue = Bytes.toLong(CellUtil + .cloneValue(returnedCell)); + if (returnValue == cellValue2) { + assertTrue(returnTs == cell2Ts); + } else if (returnValue == cellValue3) { + assertTrue(returnTs == cell3Ts); + } else if (returnValue == cellValue4) { + assertTrue(returnTs == cell4Ts); + } else if (returnValue == cellValue1) { + assertTrue(returnTs != cell1Ts); + assertTrue(returnTs > cell1Ts); + assertTrue(returnTs >= currentTimestamp); + } else { + // raise a failure since we expect only these two values back + Assert.fail(); + } + } + } + + // tests with many cells + // of type SUM and SUM_FINAL + // all cells of SUM_FINAL will expire + @Test + public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { + FlowScanner fs = getFlowScannerForTestingCompaction(); + int count = 200000; + + long cellValueFinal = 1000L; + long cellValueNotFinal = 28L; + + // note down the current timestamp + long currentTimestamp = System.currentTimeMillis(); + long cellTsFinalStart = 10001120L; + long cellTsFinal = cellTsFinalStart; + long cellTsNotFinalStart = currentTimestamp - 5; + long cellTsNotFinal = cellTsNotFinalStart; + + SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); + List tags = null; + Tag t = null; + Cell c1 = null; + + // insert SUM_FINAL cells + for (int i = 0; i < count; i++) { + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_123450000" + i + "01_19" + i); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + // create a cell with a VERY old timestamp and attribute SUM_FINAL + c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); + currentColumnCells.add(c1); + cellTsFinal++; + } + + // add SUM cells + for (int i = 0; i < count; i++) { + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM.getTagType(), + "application_1987650000" + i + "83_911" + i); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + // create a cell with attribute SUM + c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); + currentColumnCells.add(c1); + cellTsNotFinal++; + } + + List cells = fs.processSummationMajorCompaction(currentColumnCells, + LongConverter.getInstance(), currentTimestamp); + assertNotNull(cells); + + // we should be getting back count + 1 cells + // one is the flow sum cell + // others are the cells with SUM attribute + assertEquals(count + 1, cells.size()); + + for (int i = 0; i < cells.size(); i++) { + Cell returnedCell = cells.get(0); + assertNotNull(returnedCell); + + long returnTs = returnedCell.getTimestamp(); + long returnValue = Bytes.toLong(CellUtil + .cloneValue(returnedCell)); + if (returnValue == (count * cellValueFinal)) { + assertTrue(returnTs > (cellTsFinalStart + count)); + assertTrue(returnTs >= currentTimestamp); + } else if ((returnValue >= cellValueNotFinal) + && (returnValue <= cellValueNotFinal * count)) { + assertTrue(returnTs >= cellTsNotFinalStart); + assertTrue(returnTs <= cellTsNotFinalStart * count); + } else { + // raise a failure since we expect only these values back + Assert.fail(); + } + } + } + + // tests with many cells + // of type SUM and SUM_FINAL + // NOT cells of SUM_FINAL will expire + @Test + public void checkProcessSummationMoreCellsSumFinalVariedTags() throws IOException { + FlowScanner fs = getFlowScannerForTestingCompaction(); + int countFinal = 20100; + int countNotFinal = 1000; + int countFinalNotExpire = 7009; + + long cellValueFinal = 1000L; + long cellValueNotFinal = 28L; + + // note down the current timestamp + long currentTimestamp = System.currentTimeMillis(); + long cellTsFinalStart = 10001120L; + long cellTsFinal = cellTsFinalStart; + + long cellTsFinalStartNotExpire = TimestampGenerator.getSupplementedTimestamp( + System.currentTimeMillis(), "application_10266666661166_118821"); + long cellTsFinalNotExpire = cellTsFinalStartNotExpire; + + long cellTsNotFinalStart = currentTimestamp - 5; + long cellTsNotFinal = cellTsNotFinalStart; + + SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); + List tags = null; + Tag t = null; + Cell c1 = null; + + // insert SUM_FINAL cells which will expire + for (int i = 0; i < countFinal; i++) { + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_123450000" + i + "01_19" + i); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + // create a cell with a VERY old timestamp and attribute SUM_FINAL + c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); + currentColumnCells.add(c1); + cellTsFinal++; + } + + // insert SUM_FINAL cells which will NOT expire + for (int i = 0; i < countFinalNotExpire; i++) { + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_123450000" + i + "01_19" + i); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + // create a cell with a VERY old timestamp and attribute SUM_FINAL + c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray); + currentColumnCells.add(c1); + cellTsFinalNotExpire++; + } + + // add SUM cells + for (int i = 0; i < countNotFinal; i++) { + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM.getTagType(), + "application_1987650000" + i + "83_911" + i); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + // create a cell with attribute SUM + c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); + currentColumnCells.add(c1); + cellTsNotFinal++; + } + + List cells = fs.processSummationMajorCompaction(currentColumnCells, + LongConverter.getInstance(), currentTimestamp); + assertNotNull(cells); + + // we should be getting back + // countNotFinal + countFinalNotExpire + 1 cells + // one is the flow sum cell + // count = the cells with SUM attribute + // count = the cells with SUM_FINAL attribute but not expired + assertEquals(countFinalNotExpire + countNotFinal + 1, cells.size()); + + for (int i = 0; i < cells.size(); i++) { + Cell returnedCell = cells.get(0); + assertNotNull(returnedCell); + + long returnTs = returnedCell.getTimestamp(); + long returnValue = Bytes.toLong(CellUtil + .cloneValue(returnedCell)); + if (returnValue == (countFinal * cellValueFinal)) { + assertTrue(returnTs > (cellTsFinalStart + countFinal)); + assertTrue(returnTs >= currentTimestamp); + } else if (returnValue == cellValueNotFinal) { + assertTrue(returnTs >= cellTsNotFinalStart); + assertTrue(returnTs <= cellTsNotFinalStart + countNotFinal); + } else if (returnValue == cellValueFinal){ + assertTrue(returnTs >= cellTsFinalStartNotExpire); + assertTrue(returnTs <= cellTsFinalStartNotExpire + countFinalNotExpire); + } else { + // raise a failure since we expect only these values back + Assert.fail(); + } + } + } + + @Test + public void testProcessSummationMoreCellsSumFinal() throws IOException { + FlowScanner fs = getFlowScannerForTestingCompaction(); + // note down the current timestamp + long currentTimestamp = System.currentTimeMillis(); + long cellValue1 = 1236L; + long cellValue2 = 28L; + + List tags = new ArrayList<>(); + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_1234588888_999888"); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); + + // create a cell with a VERY old timestamp and attribute SUM_FINAL + Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + 120L, Bytes.toBytes(cellValue1), tagByteArray); + currentColumnCells.add(c1); + + tags = new ArrayList<>(); + t = new Tag(AggregationOperation.SUM.getTagType(), + "application_100000000001_119101"); + tags.add(t); + tagByteArray = Tag.fromList(tags); + + // create a cell with a VERY old timestamp but has attribute SUM + Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + 130L, Bytes.toBytes(cellValue2), tagByteArray); + currentColumnCells.add(c2); + List cells = fs.processSummationMajorCompaction(currentColumnCells, + LongConverter.getInstance(), currentTimestamp); + assertNotNull(cells); + + // we should be getting back two cells + // one is the flow sum cell + // another is the cell with SUM attribute + assertEquals(2, cells.size()); + + Cell returnedCell = cells.get(0); + assertNotNull(returnedCell); + long inputTs1 = c1.getTimestamp(); + long inputTs2 = c2.getTimestamp(); + + long returnTs = returnedCell.getTimestamp(); + long returnValue = Bytes.toLong(CellUtil + .cloneValue(returnedCell)); + // the returned Ts will be far greater than input ts as well as the noted + // current timestamp + if (returnValue == cellValue2) { + assertTrue(returnTs == inputTs2); + } else if (returnValue == cellValue1) { + assertTrue(returnTs >= currentTimestamp); + assertTrue(returnTs != inputTs1); + } else { + // raise a failure since we expect only these two values back + Assert.fail(); + } + } + + @Test + public void testProcessSummationOneCellSumFinal() throws IOException { + FlowScanner fs = getFlowScannerForTestingCompaction(); + + // note down the current timestamp + long currentTimestamp = System.currentTimeMillis(); + List tags = new ArrayList<>(); + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + "application_123458888888_999888"); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); + + // create a cell with a VERY old timestamp + Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + 120L, Bytes.toBytes(1110L), tagByteArray); + currentColumnCells.add(c1); + + List cells = fs.processSummationMajorCompaction(currentColumnCells, + LongConverter.getInstance(), currentTimestamp); + assertNotNull(cells); + // we should not get the same cell back + // but we get back the flow cell + assertEquals(1, cells.size()); + + Cell returnedCell = cells.get(0); + // it's NOT the same cell + assertNotEquals(c1, returnedCell); + long inputTs = c1.getTimestamp(); + long returnTs = returnedCell.getTimestamp(); + // the returned Ts will be far greater than input ts as well as the noted + // current timestamp + assertTrue(returnTs > inputTs); + assertTrue(returnTs >= currentTimestamp); + } + + @Test + public void testProcessSummationOneCell() throws IOException { + FlowScanner fs = getFlowScannerForTestingCompaction(); + + // note down the current timestamp + long currentTimestamp = System.currentTimeMillis(); + + // try for 1 cell with tag SUM + List tags = new ArrayList<>(); + Tag t = new Tag(AggregationOperation.SUM.getTagType(), + "application_123458888888_999888"); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + + SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); + + Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + currentTimestamp, Bytes.toBytes(1110L), tagByteArray); + currentColumnCells.add(c1); + List cells = fs.processSummationMajorCompaction(currentColumnCells, + LongConverter.getInstance(), currentTimestamp); + assertNotNull(cells); + // we expect the same cell back + assertEquals(1, cells.size()); + Cell c2 = cells.get(0); + assertEquals(c1, c2); + assertEquals(currentTimestamp, c2.getTimestamp()); + } + + @Test + public void testProcessSummationEmpty() throws IOException { + FlowScanner fs = getFlowScannerForTestingCompaction(); + long currentTimestamp = System.currentTimeMillis(); + + SortedSet currentColumnCells = null; + List cells = fs.processSummationMajorCompaction(currentColumnCells, + LongConverter.getInstance(), currentTimestamp); + assertNotNull(cells); + assertEquals(0, cells.size()); + + currentColumnCells = new TreeSet(KeyValue.COMPARATOR); + cells = fs.processSummationMajorCompaction(currentColumnCells, + LongConverter.getInstance(), currentTimestamp); + assertNotNull(cells); + assertEquals(0, cells.size()); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } +}