diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6a3854a..4f083db 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1610,6 +1610,17 @@ private static void addDeprecatedKeys() { public static final int DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60; + /** + * The setting that controls how often the coprocessor flushes the final + * values. + */ + public static final String APP_FINAL_VALUE_RETENTION_THRESHOLD = + TIMELINE_SERVICE_PREFIX + + "coprocessor.app-final-value-retention-milliseconds"; + + public static final long DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD = 7 * 24 + * 60 * 60 * 1000L; + // mark app-history related configs @Private as application history is going // to be integrated into the timeline service @Private diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index a57be55..eccbf08 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -255,7 +255,8 @@ private void storeFlowMetricsAppRunning(String clusterId, String userId, byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId); storeFlowMetrics(rowKey, metrics, - AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId), + AggregationOperation.SUM.getAttribute()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index e30f699..dba7bf3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -24,10 +24,14 @@ import java.util.Set; import java.util.SortedSet; import java.util.Map.Entry; +import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -483,4 +487,93 @@ public static boolean isIntegralValue(Object obj) { return (obj instanceof Short) || (obj instanceof Integer) || (obj instanceof Long); } + + /** + * creates a new cell based on the input cell but with the new value + * @param origCell + * @param newValue + * @return cell + * @throws IOException + */ + public static Cell createNewCell(Cell origCell, byte[] newValue) throws IOException { + return CellUtil.createCell(CellUtil.cloneRow(origCell), + CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), + origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); + } + + /** + * creates a cell with the given inputs + * @param row + * @param family + * @param qualifier + * @param ts + * @param newValue + * @param tags + * @return cell + * @throws IOException + */ + public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier, + long ts, byte[] newValue, byte[] tags) throws IOException { + return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put, + newValue, tags); + } + + /** + * creates a readable string representation of a Cell + * @param cell + * @return a string representation of a Cell + */ + public static String printCellDetails(Cell cell) { + StringBuilder cellDetails = new StringBuilder(); + cellDetails.append(" Row="); + cellDetails.append(Bytes.toString(CellUtil.cloneRow(cell))); + cellDetails.append(" Family="); + cellDetails.append(Bytes.toString(CellUtil.cloneFamily(cell))); + cellDetails.append(" Qualifier="); + cellDetails.append(Bytes.toString(CellUtil.cloneQualifier(cell))); + cellDetails.append(" Value="); + cellDetails.append(Bytes.toString(CellUtil.cloneValue(cell))); + cellDetails.append(" Timestamp="); + cellDetails.append(cell.getTimestamp()); + cellDetails.append(" Tags="); + cellDetails.append(printTags(cell)); + return cellDetails.toString(); + } + + /** creates a readable string of all the tags of that cell + * + * @param cell + * @return string representation for the cell tags + */ + public static String printTags(Cell cell) { + StringBuilder tagDetails = new StringBuilder(); + List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + for (Tag t : tags) { + tagDetails.append(" type="); + tagDetails.append(t.getType()); + tagDetails.append(" value="); + tagDetails.append(Bytes.toString(t.getValue())); + } + return tagDetails.toString(); + } + + /** + * returns app id from the list of tags + * + * @param tags + * @return App Id as the AggregationCompactionDimension + */ + public static String getAggregationCompactionDimension(List tags) { + String appId = null; + for (Tag t : tags) { + if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t + .getType()) { + appId = Bytes.toString(t.getValue()); + return appId; + } + } + return appId; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java index 555b64e..777f0b4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java @@ -33,7 +33,7 @@ * if this is changed, then reading cell timestamps written with older * multiplier value will not work */ - public static final long TS_MULTIPLIER = 1000L; + public static final long TS_MULTIPLIER = 1000000L; private final AtomicLong lastTimestamp = new AtomicLong(); @@ -74,9 +74,10 @@ public long getUniqueTimestamp() { * application id * * Unlikely scenario of generating a timestamp that is a duplicate: If more - * than a 1000 concurrent apps are running in one flow run AND write to same - * column at the same time, then say appId of 1001 will overlap with appId of - * 001 and there may be collisions for that flow run's specific column. + * than a 1M concurrent apps are running in one flow run AND write to same + * column at the same time, then say appId of 1000001 (1M and 1) will overlap + * with appId of 001 and there may be collisions for that flow run's + * specific column. * * @param incomingTS * @param appId diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java index c635ce6..18336d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java @@ -28,12 +28,12 @@ /** * When the flow was started. */ - MIN((byte) 71), + GLOBAL_MIN((byte) 71), /** * When it ended. */ - MAX((byte) 73), + GLOBAL_MAX((byte) 73), /** * The metrics of the flow @@ -46,9 +46,16 @@ SUM_FINAL((byte) 83), /** - * compact + * Min value as per the latest timestamp + * seen for a given app */ - COMPACT((byte) 89); + LATEST_MIN((byte) 89), + + /** + * Max value as per the latest timestamp + * seen for a given app + */ + LATEST_MAX((byte) 97); private byte tagType; private byte[] inBytes; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java index 148a37f..d50bb16 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -41,14 +41,14 @@ * application start times. */ MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time", - AggregationOperation.MIN, LongConverter.getInstance()), + AggregationOperation.GLOBAL_MIN, LongConverter.getInstance()), /** * When the flow ended. This is the maximum of currently known application end * times. */ MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time", - AggregationOperation.MAX, LongConverter.getInstance()), + AggregationOperation.GLOBAL_MAX, LongConverter.getInstance()), /** * The version of the flow that this flow belongs to. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index e3bb52d..015b9a3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -40,8 +40,7 @@ /** * To store flow run info values. */ - METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM, - LongConverter.getInstance()); + METRIC(FlowRunColumnFamily.INFO, "m", null, LongConverter.getInstance()); private final ColumnHelper column; private final ColumnFamily columnFamily; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index 1984157..dcc6274 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import java.io.IOException; + import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -42,13 +43,17 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; public class FlowRunCoprocessor extends BaseRegionObserver { - @SuppressWarnings("unused") private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); private HRegion region; @@ -156,8 +161,8 @@ public void preGetOp(ObserverContext e, scan.setMaxVersions(); RegionScanner scanner = null; try { - scanner = new FlowScanner(region, scan.getBatch(), - region.getScanner(scan)); + scanner = new FlowScanner(e.getEnvironment(), scan.getBatch(), + region.getScanner(scan), FlowScannerOperation.READ); scanner.next(results); e.bypass(); } finally { @@ -205,6 +210,63 @@ public RegionScanner preScannerOpen( public RegionScanner postScannerOpen( ObserverContext e, Scan scan, RegionScanner scanner) throws IOException { - return new FlowScanner(region, scan.getBatch(), scanner); + return new FlowScanner(e.getEnvironment(), scan.getBatch(), scanner, + FlowScannerOperation.READ); + } + + @Override + public InternalScanner preFlush( + ObserverContext c, Store store, + InternalScanner scanner) throws IOException { + if (LOG.isDebugEnabled()) { + if (store != null) { + LOG.debug("preFlush store = " + store.getColumnFamilyName() + + " flushableSize=" + store.getFlushableSize() + + " flushedCellsCount=" + store.getFlushedCellsCount() + + " compactedCellsCount=" + store.getCompactedCellsCount() + + " majorCompactedCellsCount=" + + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" + + store.getMemstoreFlushSize() + " memstoreSize=" + + store.getMemStoreSize() + " size=" + store.getSize() + + " storeFilesCount=" + store.getStorefilesCount()); + } + } return new FlowScanner(c.getEnvironment(), -1, scanner, + FlowScannerOperation.FLUSH); + } + + @Override + public void postFlush(ObserverContext c, + Store store, StoreFile resultFile) { + if (LOG.isDebugEnabled()) { + if (store != null) { + LOG.debug("postFlush store = " + store.getColumnFamilyName() + + " flushableSize=" + store.getFlushableSize() + + " flushedCellsCount=" + store.getFlushedCellsCount() + + " compactedCellsCount=" + store.getCompactedCellsCount() + + " majorCompactedCellsCount=" + + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" + + store.getMemstoreFlushSize() + " memstoreSize=" + + store.getMemStoreSize() + " size=" + store.getSize() + + " storeFilesCount=" + store.getStorefilesCount()); + } + } + } + + @Override + public InternalScanner preCompact( + ObserverContext e, Store store, + InternalScanner scanner, ScanType scanType, CompactionRequest request) + throws IOException { + + FlowScannerOperation requestOp = FlowScannerOperation.MAJOR_COMPACTION; + if (request != null) { + LOG.info("Compactionrequest= " + request.toString() + " " + + request.isMajor() + " for ObserverContext " + + e.getEnvironment().getRegion().getRegionNameAsString()); + requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION + : FlowScannerOperation.MINOR_COMPACTION); + } + + return new FlowScanner(e.getEnvironment(), -1, scanner, requestOp); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index d541df0..aa0fcf0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -29,20 +29,27 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; + +import com.google.common.annotations.VisibleForTesting; /** * Invoked via the coprocessor when a Get or a Scan is issued for flow run @@ -55,23 +62,42 @@ private static final Log LOG = LogFactory.getLog(FlowScanner.class); + /** + * use a special application id to represent the flow id this is needed since + * TimestampGenerator parses the app id to generate a cell timestamp + */ + private static final String FLOW_APP_ID = "application_00000000000_0000"; + private final HRegion region; private final InternalScanner flowRunScanner; - private RegionScanner regionScanner; private final int limit; + private final long appFinalValueRetentionThreshold; + private RegionScanner regionScanner; private boolean hasMore; private byte[] currentRow; private List availableCells = new ArrayList<>(); private int currentIndex; + private FlowScannerOperation action = FlowScannerOperation.READ; - FlowScanner(HRegion region, int limit, InternalScanner internalScanner) { - this.region = region; + FlowScanner(RegionCoprocessorEnvironment env, int limit, InternalScanner internalScanner, + FlowScannerOperation action) { this.limit = limit; this.flowRunScanner = internalScanner; if (internalScanner instanceof RegionScanner) { this.regionScanner = (RegionScanner) internalScanner; } - // TODO: note if it's compaction/flush + this.action = action; + if (env == null) { + this.appFinalValueRetentionThreshold = + YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD; + this.region = null; + } else { + this.region = env.getRegion(); + Configuration hbaseConf = env.getConfiguration(); + this.appFinalValueRetentionThreshold = hbaseConf.getLong( + YarnConfiguration.APP_FINAL_VALUE_RETENTION_THRESHOLD, + YarnConfiguration.DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD); + } } /* @@ -104,17 +130,6 @@ public boolean next(List cells, int limit) throws IOException { return nextInternal(cells, limit); } - private String getAggregationCompactionDimension(List tags) { - String appId = null; - for (Tag t : tags) { - if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t - .getType()) { - appId = Bytes.toString(t.getValue()); - } - } - return appId; - } - /** * Get value converter associated with a column or a column prefix. If nothing * matches, generic converter is returned. @@ -182,6 +197,7 @@ private boolean nextInternal(List cells, int limit) throws IOException { SortedSet currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); Set alreadySeenAggDim = new HashSet<>(); int addedCnt = 0; + long currentTimestamp = System.currentTimeMillis(); ValueConverter converter = null; while (((cell = peekAtNextCell(limit)) != null) && (limit <= 0 || addedCnt < limit)) { @@ -189,7 +205,7 @@ private boolean nextInternal(List cells, int limit) throws IOException { if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { if (converter != null && isNumericConverter(converter)) { addedCnt += emitCells(cells, currentColumnCells, currentAggOp, - (NumericValueConverter)converter); + (NumericValueConverter)converter, currentTimestamp); } resetState(currentColumnCells, alreadySeenAggDim); currentColumnQualifier = newColumnQualifier; @@ -206,8 +222,16 @@ private boolean nextInternal(List cells, int limit) throws IOException { nextCell(limit); } if (!currentColumnCells.isEmpty()) { - emitCells(cells, currentColumnCells, currentAggOp, - (NumericValueConverter)converter); + addedCnt += emitCells(cells, currentColumnCells, currentAggOp, + (NumericValueConverter)converter, currentTimestamp); + if (LOG.isDebugEnabled()) { + if (addedCnt > 0) { + LOG.debug("emitted cells. " + addedCnt + " for " + + this.action + " rowKey="); + } else { + LOG.debug("emitted no cells for " + this.action); + } + } } return hasMore(); } @@ -246,7 +270,7 @@ private void collectCells(SortedSet currentColumnCells, } switch (currentAggOp) { - case MIN: + case GLOBAL_MIN: if (currentColumnCells.size() == 0) { currentColumnCells.add(cell); } else { @@ -259,7 +283,7 @@ private void collectCells(SortedSet currentColumnCells, } } break; - case MAX: + case GLOBAL_MAX: if (currentColumnCells.size() == 0) { currentColumnCells.add(cell); } else { @@ -274,10 +298,23 @@ private void collectCells(SortedSet currentColumnCells, break; case SUM: case SUM_FINAL: + if (LOG.isTraceEnabled()) { + LOG.trace("In collect cells " + + " FlowSannerOperation=" + + this.action + + " currentAggOp=" + + currentAggOp + + " cell qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(cell)) + + " cell value= " + + (Number) converter.decodeValue(CellUtil.cloneValue(cell)) + + " timestamp=" + cell.getTimestamp()); + } + // only if this app has not been seen yet, add to current column cells List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); - String aggDim = getAggregationCompactionDimension(tags); + String aggDim = TimelineStorageUtils.getAggregationCompactionDimension(tags); if (alreadySeenAggDim.contains(aggDim)) { // if this agg dimension has already been seen, // since they show up in sorted order @@ -301,8 +338,8 @@ private void collectCells(SortedSet currentColumnCells, * parameter. */ private int emitCells(List cells, SortedSet currentColumnCells, - AggregationOperation currentAggOp, NumericValueConverter converter) - throws IOException { + AggregationOperation currentAggOp, NumericValueConverter converter, + long currentTimestamp) throws IOException { if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { return 0; } @@ -310,17 +347,33 @@ private int emitCells(List cells, SortedSet currentColumnCells, cells.addAll(currentColumnCells); return currentColumnCells.size(); } + if (LOG.isTraceEnabled()) { + LOG.trace("In emitCells " + this.action + " currentColumnCells size= " + + currentColumnCells.size() + " currentAggOp" + currentAggOp); + } switch (currentAggOp) { - case MIN: - case MAX: + case GLOBAL_MIN: + case GLOBAL_MAX: cells.addAll(currentColumnCells); return currentColumnCells.size(); case SUM: case SUM_FINAL: - Cell sumCell = processSummation(currentColumnCells, converter); - cells.add(sumCell); - return 1; + switch (action) { + case FLUSH: + case MINOR_COMPACTION: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + case READ: + Cell sumCell = processSummation(currentColumnCells, converter); + cells.add(sumCell); + return 1; + case MAJOR_COMPACTION: + List finalCells = processSummationMajorCompaction( + currentColumnCells, converter, currentTimestamp); + cells.addAll(finalCells); + return finalCells.size(); + } default: cells.addAll(currentColumnCells); return currentColumnCells.size(); @@ -350,10 +403,117 @@ private Cell processSummation(SortedSet currentColumnCells, sum = converter.add(sum, currentValue); } byte[] sumBytes = converter.encodeValue(sum); - Cell sumCell = createNewCell(mostRecentCell, sumBytes); + Cell sumCell = TimelineStorageUtils.createNewCell(mostRecentCell, sumBytes); return sumCell; } + + /** + * Returns a list of cells that contains + * + * A) the latest cells for applications that haven't finished yet + * B) summation + * for the flow, based on applications that have completed and are older than + * a certain time + * + * The new cell created has the timestamp of the most recent metric cell. The + * sum of a metric for a flow run is the summation at the point of the last + * metric update in that flow till that time. + */ + @VisibleForTesting + List processSummationMajorCompaction( + SortedSet currentColumnCells, NumericValueConverter converter, + long currentTimestamp) + throws IOException { + Number sum = 0; + Number currentValue = 0; + long ts = 0L; + List finalCells = new ArrayList(); + if (currentColumnCells == null) { + return finalCells; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("In processSummationMajorCompaction," + + " will drop cells older than " + currentTimestamp + + " CurrentColumnCells size=" + currentColumnCells.size()); + } + + for (Cell cell : currentColumnCells) { + AggregationOperation cellAggOp = getCurrentAggOp(cell); + // if this is the existing flow sum cell + List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + String appId = TimelineStorageUtils.getAggregationCompactionDimension(tags); + if (appId == FLOW_APP_ID) { + sum = converter.add(sum, currentValue); + if (LOG.isTraceEnabled()) { + LOG.trace("reading flow app id sum=" + sum); + } + } else { + currentValue = (Number) converter.decodeValue(CellUtil + .cloneValue(cell)); + // read the timestamp truncated by the generator + ts = TimestampGenerator.getTruncatedTimestamp(cell.getTimestamp()); + if ((cellAggOp == AggregationOperation.SUM_FINAL) + && ((ts + this.appFinalValueRetentionThreshold) + < currentTimestamp)) { + sum = converter.add(sum, currentValue); + if (LOG.isTraceEnabled()) { + LOG.trace("MAJOR COMPACTION loop sum= " + sum + + " discarding now: " + " qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(cell)) + " value=" + + (Number) converter.decodeValue(CellUtil.cloneValue(cell)) + + " timestamp=" + cell.getTimestamp() + " " + this.action); + } + } else { + // not a final value but it's the latest cell for this app + // so include this cell in the list of cells to write back + finalCells.add(cell); + } + } + } + if (sum.longValue() > 0L) { + Cell anyCell = currentColumnCells.first(); + List tags = new ArrayList(); + Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Bytes.toBytes(FLOW_APP_ID)); + tags.add(t); + t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(), + Bytes.toBytes(FLOW_APP_ID)); + tags.add(t); + byte[] tagByteArray = Tag.fromList(tags); + Cell sumCell = TimelineStorageUtils.createNewCell( + CellUtil.cloneRow(anyCell), + CellUtil.cloneFamily(anyCell), + CellUtil.cloneQualifier(anyCell), + TimestampGenerator.getSupplementedTimestamp( + System.currentTimeMillis(), FLOW_APP_ID), + converter.encodeValue(sum), tagByteArray); + finalCells.add(sumCell); + if (LOG.isTraceEnabled()) { + LOG.trace("MAJOR COMPACTION final sum= " + sum + " for " + + Bytes.toString(CellUtil.cloneQualifier(sumCell)) + " " + this.action); + } + LOG.info("After major compaction for qualifier=" + + Bytes.toString(CellUtil.cloneQualifier(sumCell)) + + " with currentColumnCells.size=" + + currentColumnCells.size() + + " returning finalCells.size=" + finalCells.size() + + " with sum=" + sum.longValue() + + " with cell timestamp " + sumCell.getTimestamp()); + } else { + String qualifier = ""; + LOG.info("After major compaction for qualifier=" + qualifier + + " with currentColumnCells.size=" + + currentColumnCells.size() + + " returning finalCells.size=" + finalCells.size() + + " with zero sum=" + + sum.longValue()); + } + return finalCells; + } + /** * Determines which cell is to be returned based on the values in each cell * and the comparison operation MIN or MAX. @@ -376,7 +536,7 @@ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, Number currentCellValue = (Number) converter.decodeValue(CellUtil .cloneValue(currentCell)); switch (currentAggOp) { - case MIN: + case GLOBAL_MIN: if (converter.compare( currentCellValue, previouslyChosenCellValue) < 0) { // new value is minimum, hence return this cell @@ -385,7 +545,7 @@ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, // previously chosen value is miniumum, hence return previous min cell return previouslyChosenCell; } - case MAX: + case GLOBAL_MAX: if (converter.compare( currentCellValue, previouslyChosenCellValue) > 0) { // new value is max, hence return this cell @@ -403,16 +563,13 @@ private Cell compareCellValues(Cell previouslyChosenCell, Cell currentCell, } } - private Cell createNewCell(Cell origCell, byte[] newValue) - throws IOException { - return CellUtil.createCell(CellUtil.cloneRow(origCell), - CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), - origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); - } - @Override public void close() throws IOException { - flowRunScanner.close(); + if (flowRunScanner != null) { + flowRunScanner.close(); + } else { + LOG.warn("scanner close called but scanner is null"); + } } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java index d18613a..3a5bdec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; - import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -29,17 +28,60 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.conf.Configuration; /** * Generates the data/entities for the FlowRun and FlowActivity Tables */ class TestFlowDataGenerator { - private final static String metric1 = "MAP_SLOT_MILLIS"; - private final static String metric2 = "HDFS_BYTES_READ"; + private static final String metric1 = "MAP_SLOT_MILLIS"; + private static final String metric2 = "HDFS_BYTES_READ"; + public static final long END_TS_INCR = 10000L; + + static TimelineEntity getEntityMetricsApp1(long insertTs, Configuration c1) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map metricValues = new HashMap(); + long ts = insertTs; - static TimelineEntity getEntityMetricsApp1() { + for (int k=1; k< 100 ; k++) { + metricValues.put(ts - k*200000, 20L); + } + metricValues.put(ts - 80000, 40L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap(); + ts = System.currentTimeMillis(); + for (int k=1; k< 100 ; k++) { + metricValues.put(ts - k*100000, 31L); + } + + metricValues.put(ts - 80000, 57L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + + entity.addMetrics(metrics); + return entity; + } + + + static TimelineEntity getEntityMetricsApp1Complete(long insertTs, Configuration c1) { TimelineEntity entity = new TimelineEntity(); String id = "flowRunMetrics_test"; String type = TimelineEntityType.YARN_APPLICATION.toString(); @@ -53,7 +95,48 @@ static TimelineEntity getEntityMetricsApp1() { TimelineMetric m1 = new TimelineMetric(); m1.setId(metric1); Map metricValues = new HashMap(); - long ts = System.currentTimeMillis(); + long ts = insertTs; + + metricValues.put(ts - 80000, 40L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap(); + ts = insertTs; + metricValues.put(ts - 80000, 57L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + + entity.addMetrics(metrics); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(insertTs); + event.addInfo("done", "insertTs=" + insertTs); + entity.addEvent(event); + return entity; + } + + + static TimelineEntity getEntityMetricsApp1(long insertTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map metricValues = new HashMap(); + long ts = insertTs; metricValues.put(ts - 100000, 2L); metricValues.put(ts - 80000, 40L); m1.setType(Type.TIME_SERIES); @@ -63,7 +146,7 @@ static TimelineEntity getEntityMetricsApp1() { TimelineMetric m2 = new TimelineMetric(); m2.setId(metric2); metricValues = new HashMap(); - ts = System.currentTimeMillis(); + ts = insertTs; metricValues.put(ts - 100000, 31L); metricValues.put(ts - 80000, 57L); m2.setType(Type.TIME_SERIES); @@ -74,7 +157,8 @@ static TimelineEntity getEntityMetricsApp1() { return entity; } - static TimelineEntity getEntityMetricsApp2() { + + static TimelineEntity getEntityMetricsApp2(long insertTs) { TimelineEntity entity = new TimelineEntity(); String id = "flowRunMetrics_test"; String type = TimelineEntityType.YARN_APPLICATION.toString(); @@ -87,7 +171,7 @@ static TimelineEntity getEntityMetricsApp2() { TimelineMetric m1 = new TimelineMetric(); m1.setId(metric1); Map metricValues = new HashMap(); - long ts = System.currentTimeMillis(); + long ts = insertTs; metricValues.put(ts - 100000, 5L); metricValues.put(ts - 80000, 101L); m1.setType(Type.TIME_SERIES); @@ -142,6 +226,55 @@ static TimelineEntity getEntity1() { return entity; } + static TimelineEntity getAFullEntity(long ts, long endTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunFullEntity"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(ts); + entity.setModifiedTime(endTs); + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map metricValues = new HashMap(); + metricValues.put(ts - 120000, 100000000L); + metricValues.put(ts - 100000, 200000000L); + metricValues.put(ts - 80000, 300000000L); + metricValues.put(ts - 60000, 400000000L); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap(); + metricValues.put(ts - 900000, 31L); + metricValues.put(ts - 30000, 57L); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + entity.addMetrics(metrics); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(ts); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + + event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(endTs); + event.addInfo(expKey, expVal); + entity.addEvent(event); + + return entity; + } + static TimelineEntity getEntityGreaterStartTime(long startTs) { TimelineEntity entity = new TimelineEntity(); entity.setCreatedTime(startTs); @@ -187,6 +320,34 @@ static TimelineEntity getEntityMinStartTime(long startTs) { return entity; } + static TimelineEntity getMinFlushEntity(long startTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHelloFlushEntityMin"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(startTs); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(startTs); + entity.addEvent(event); + return entity; + } + + static TimelineEntity getMaxFlushEntity(long startTs) { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHelloFlushEntityMax"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(startTs); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(startTs + END_TS_INCR); + entity.addEvent(event); + return entity; + } static TimelineEntity getFlowApp1() { TimelineEntity entity = new TimelineEntity(); @@ -208,5 +369,4 @@ static TimelineEntity getFlowApp1() { return entity; } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 5da0192..067519f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -72,6 +72,7 @@ public static void setupBeforeClass() throws Exception { util = new HBaseTestingUtility(); Configuration conf = util.getConfiguration(); conf.setInt("hfile.format.version", 3); + conf.setInt("hbase.hregion.memstore.flush.size", 1024 * 1024); util.startMiniCluster(); createSchema(); } @@ -212,7 +213,8 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception { long runid = 1002345678919L; TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + TimelineEntity entityApp1 = TestFlowDataGenerator + .getEntityMetricsApp1(System.currentTimeMillis()); te.addEntity(entityApp1); HBaseTimelineWriterImpl hbi = null; @@ -224,7 +226,8 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception { hbi.write(cluster, user, flow, flowVersion, runid, appName, te); // write another application with same metric to this flow te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + TimelineEntity entityApp2 = TestFlowDataGenerator + .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; hbi.write(cluster, user, flow, flowVersion, runid, appName, te); @@ -317,7 +320,8 @@ public void testWriteFlowRunMetricsPrefix() throws Exception { long runid = 1002345678919L; TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + TimelineEntity entityApp1 = TestFlowDataGenerator + .getEntityMetricsApp1(System.currentTimeMillis()); te.addEntity(entityApp1); HBaseTimelineWriterImpl hbi = null; @@ -329,7 +333,8 @@ public void testWriteFlowRunMetricsPrefix() throws Exception { hbi.write(cluster, user, flow, flowVersion, runid, appName, te); // write another application with same metric to this flow te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + TimelineEntity entityApp2 = TestFlowDataGenerator + .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; hbi.write(cluster, user, flow, flowVersion, runid, appName, te); @@ -412,7 +417,8 @@ public void testWriteFlowRunsMetricFields() throws Exception { long runid = 1002345678919L; TimelineEntities te = new TimelineEntities(); - TimelineEntity entityApp1 = TestFlowDataGenerator.getEntityMetricsApp1(); + TimelineEntity entityApp1 = TestFlowDataGenerator + .getEntityMetricsApp1(System.currentTimeMillis()); te.addEntity(entityApp1); HBaseTimelineWriterImpl hbi = null; @@ -424,7 +430,8 @@ public void testWriteFlowRunsMetricFields() throws Exception { hbi.write(cluster, user, flow, flowVersion, runid, appName, te); // write another application with same metric to this flow te = new TimelineEntities(); - TimelineEntity entityApp2 = TestFlowDataGenerator.getEntityMetricsApp2(); + TimelineEntity entityApp2 = TestFlowDataGenerator + .getEntityMetricsApp2(System.currentTimeMillis()); te.addEntity(entityApp2); appName = "application_11111111111111_2222"; hbi.write(cluster, user, flow, flowVersion, runid, appName, te); @@ -483,6 +490,98 @@ public void testWriteFlowRunsMetricFields() throws Exception { } } + + @Test + public void testWriteFlowRunFlush() throws Exception { + String cluster = "atestFlushFlowRun_cluster1"; + String user = "atestFlushFlowRun__user1"; + String flow = "atestFlushFlowRun_flow_name"; + String flowVersion = "AF1021C19F1351"; + long runid = 1449526652000L; + + int start = 10; + int count = 20000; + int appIdSuffix = 1; + HBaseTimelineWriterImpl hbi = null; + long insertTs = 1449796654827L - count; + long minTS = insertTs + 1; + long startTs = insertTs; + Configuration c1 = util.getConfiguration(); + TimelineEntities te1 = null; + TimelineEntity entityApp1 = null; + TimelineEntity entityApp2 = null; + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + + for (int i = start; i < count; i++) { + String appName = "application_1060350000000_" + appIdSuffix; + insertTs++; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs); + te1.addEntity(entityApp1); + entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); + te1.addEntity(entityApp2); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + Thread.sleep(1); + + appName = "application_1001199480000_7" + appIdSuffix; + insertTs++; + appIdSuffix++; + te1 = new TimelineEntities(); + entityApp1 = TestFlowDataGenerator.getMinFlushEntity(insertTs); + te1.addEntity(entityApp1); + entityApp2 = TestFlowDataGenerator.getMaxFlushEntity(insertTs); + te1.addEntity(entityApp2); + + hbi.write(cluster, user, flow, flowVersion, runid, appName, te1); + if (i % 1000 == 0) { + hbi.flush(); + checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, + runid, false); + } + } + } finally { + hbi.flush(); + hbi.close(); + checkMinMaxFlush(c1, minTS, startTs, count, cluster, user, flow, runid, + true); + } + } + + private void checkMinMaxFlush(Configuration c1, long minTS, long startTs, + int count, String cluster, String user, String flow, long runid, + boolean checkMax) throws IOException { + Connection conn = ConnectionFactory.createConnection(c1); + // check in flow run table + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + // scan the table and see that we get back the right min and max + // timestamps + byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + Get g = new Get(startRow); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()); + + Result r1 = table1.get(g); + assertNotNull(r1); + assertTrue(!r1.isEmpty()); + Map values = r1.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + int start = 10; + assertEquals(2, r1.size()); + long starttime = Bytes.toLong(values + .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); + assertEquals(minTS, starttime); + if (checkMax) { + assertEquals(startTs + 2 * (count - start) + + TestFlowDataGenerator.END_TS_INCR, + Bytes.toLong(values + .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); + } + } @AfterClass public static void tearDownAfterClass() throws Exception { util.shutdownMiniCluster();