diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 772002d..7c4a5da 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -33,11 +33,10 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; @@ -53,23 +52,36 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; /** - * This implements a hbase based backend for storing application timeline entity + * This implements a hbase based backend for storing the timeline entity * information. + * It writes to multiple tables at the backend */ @InterfaceAudience.Private @InterfaceStability.Unstable public class HBaseTimelineWriterImpl extends AbstractService implements TimelineWriter { + private static final Log LOG = LogFactory + .getLog(HBaseTimelineWriterImpl.class); + private Connection conn; private TypedBufferedMutator entityTable; private TypedBufferedMutator appToFlowTable; private TypedBufferedMutator applicationTable; - - private static final Log LOG = LogFactory - .getLog(HBaseTimelineWriterImpl.class); + private TypedBufferedMutator flowActivityTable; + private TypedBufferedMutator flowRunTable; public HBaseTimelineWriterImpl() { super(HBaseTimelineWriterImpl.class.getName()); @@ -91,6 +103,8 @@ protected void serviceInit(Configuration conf) throws Exception { entityTable = new EntityTable().getTableMutator(hbaseConf, conn); appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn); applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn); + flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn); + flowActivityTable = new FlowActivityTable().getTableMutator(hbaseConf, conn); } /** @@ -111,7 +125,7 @@ public TimelineWriteResponse write(String clusterId, String userId, // if the entity is the application, the destination is the application // table - boolean isApplication = isApplicationEntity(te); + boolean isApplication = TimelineWriterUtils.isApplicationEntity(te); byte[] rowKey = isApplication ? ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId) : @@ -124,37 +138,139 @@ public TimelineWriteResponse write(String clusterId, String userId, storeMetrics(rowKey, te.getMetrics(), isApplication); storeRelations(rowKey, te, isApplication); - if (isApplicationCreated(te)) { - onApplicationCreated( - clusterId, userId, flowName, flowVersion, flowRunId, appId, te); + if (isApplication) { + if (TimelineWriterUtils.isApplicationCreated(te)) { + onApplicationCreated(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); + } + // if it's an application entity, store metrics + storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId, + appId, te); + // if application has finished, store it's finish time and write final + // values + // of all metrics + if (TimelineWriterUtils.isApplicationFinished(te)) { + onApplicationFinished(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); + } } } return putStatus; } - private static boolean isApplicationEntity(TimelineEntity te) { - return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); + private void onApplicationCreated(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntity te) throws IOException { + // store in App to flow table + storeInAppToFlowTable(clusterId, userId, flowName, flowVersion, flowRunId, + appId, te); + // store in flow run table + storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); + // store in flow activity table + storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); } - private static boolean isApplicationCreated(TimelineEntity te) { - if (isApplicationEntity(te)) { - for (TimelineEvent event : te.getEvents()) { - if (event.getId().equals( - ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { - return true; - } - } - } - return false; + /* + * updates the {@link FlowActivityTable} with the Application TimelineEntity + * information + */ + private void storeInFlowActivityTable(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntity te) throws IOException { + byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, userId, flowName); + byte[] qualifier = GenericObjectMapper.write(flowRunId); + FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, + null, flowVersion, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); } - private void onApplicationCreated(String clusterId, String userId, + /* + * updates the {@link FlowRunTable} with Application Created information + */ + private void storeAppCreatedInFlowRunTable(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntity te) throws IOException { + byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, + flowRunId); + FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null, + te.getCreatedTime(), + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + + private void storeInAppToFlowTable(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, TimelineEntity te) throws IOException { byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); - AppToFlowColumn.FLOW_RUN_ID.store( - rowKey, appToFlowTable, null, flowRunId); + AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId); + } + + /* + * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an + * application has finished + */ + private void onApplicationFinished(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntity te) throws IOException { + // store in flow run table + storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId, + appId, te); + + // indicate in the flow activity table that the app has finished + storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); + } + + /* + * Update the {@link FlowRunTable} with Application Finished information + */ + private void storeAppFinishedInFlowRunTable(String clusterId, String userId, + String flowName, long flowRunId, String appId, TimelineEntity te) + throws IOException { + byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, + flowRunId); + Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID + .getAttribute(appId); + FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null, + TimelineWriterUtils.getApplicationFinishedTime(te), attributeAppId); + + // store the final value of metrics since application has finished + Set metrics = te.getMetrics(); + if (metrics != null) { + storeFlowMetrics(rowKey, metrics, attributeAppId, + AggregationOperation.SUM_FINAL.getAttribute()); + } + } + + /* + * Updates the {@link FlowRunTable} with Application Metrics + */ + private void storeFlowMetricsAppRunning(String clusterId, String userId, + String flowName, long flowRunId, String appId, TimelineEntity te) + throws IOException { + Set metrics = te.getMetrics(); + if (metrics != null) { + byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, + flowRunId); + storeFlowMetrics(rowKey, metrics, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + } + + private void storeFlowMetrics(byte[] rowKey, Set metrics, + Attribute... attributes) throws IOException { + for (TimelineMetric metric : metrics) { + String metricColumnQualifier = metric.getId(); + Map timeseries = metric.getValues(); + for (Map.Entry timeseriesEntry : timeseries.entrySet()) { + Long timestamp = timeseriesEntry.getKey(); + FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable, + metricColumnQualifier, timestamp, timeseriesEntry.getValue(), + attributes); + } + } } private void storeRelations(byte[] rowKey, TimelineEntity te, @@ -184,7 +300,6 @@ private void storeRelations(byte[] rowKey, TimelineEntity te, // id3?id4?id5 String compoundValue = Separator.VALUES.joinEncoded(connectedEntity.getValue()); - columnPrefix.store(rowKey, table, connectedEntity.getKey(), null, compoundValue); } @@ -342,6 +457,8 @@ public void flush() throws IOException { entityTable.flush(); appToFlowTable.flush(); applicationTable.flush(); + flowRunTable.flush(); + flowActivityTable.flush(); } /** @@ -364,6 +481,16 @@ protected void serviceStop() throws Exception { LOG.info("closing the application table"); applicationTable.close(); } + if (flowRunTable != null) { + LOG.info("closing the flow run table"); + // The close API performs flushing and releases any resources held + flowRunTable.close(); + } + if (flowActivityTable != null) { + LOG.info("closing the flowActivityTable table"); + // The close API performs flushing and releases any resources held + flowActivityTable.close(); + } if (conn != null) { LOG.info("closing the hbase Connection"); conn.close(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java index 5120856..9ba9f0c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; /** * This creates the schema for a hbase based backend for storing application @@ -172,7 +174,7 @@ private static CommandLine parseArgs(String[] args) throws ParseException { return commandLine; } - private static void createAllTables(Configuration hbaseConf) + static void createAllTables(Configuration hbaseConf) throws IOException { Connection conn = null; @@ -185,6 +187,8 @@ private static void createAllTables(Configuration hbaseConf) new EntityTable().createTable(admin, hbaseConf); new AppToFlowTable().createTable(admin, hbaseConf); new ApplicationTable().createTable(admin, hbaseConf); + new FlowRunTable().createTable(admin, hbaseConf); + new FlowActivityTable().createTable(admin, hbaseConf); } finally { if (conn != null) { conn.close(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java index c028386..802626d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Identifies fully qualified columns for the {@link ApplicationTable}. @@ -76,9 +77,9 @@ private String getColumnQualifier() { public void store(byte[] rowKey, TypedBufferedMutator tableMutator, Long timestamp, - Object inputValue) throws IOException { + Object inputValue, Attribute... attributes) throws IOException { column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue); + inputValue, attributes); } public Object readResult(Result result) throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index ad1def6..d7b5773 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Identifies partially qualified columns for the application table. @@ -112,7 +113,8 @@ private String getColumnPrefix() { */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, byte[] qualifier, - Long timestamp, Object inputValue) throws IOException { + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { // Null check if (qualifier == null) { @@ -123,8 +125,9 @@ public void store(byte[] rowKey, byte[] columnQualifier = ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); - } + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); + } /* * (non-Javadoc) @@ -137,7 +140,8 @@ public void store(byte[] rowKey, */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, String qualifier, - Long timestamp, Object inputValue) throws IOException { + Long timestamp, Object inputValue, Attribute...attributes) + throws IOException { // Null check if (qualifier == null) { @@ -148,7 +152,8 @@ public void store(byte[] rowKey, byte[] columnQualifier = ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); } /* diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java index 423037a..859fdca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java @@ -25,8 +25,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; import java.io.IOException; +import java.util.Map; /** * Identifies fully qualified columns for the {@link AppToFlowTable}. @@ -67,9 +69,9 @@ private String getColumnQualifier() { public void store(byte[] rowKey, TypedBufferedMutator tableMutator, Long timestamp, - Object inputValue) throws IOException { + Object inputValue, Attribute... attributes) throws IOException { column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue); + inputValue, attributes); } public Object readResult(Result result) throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java index 3397d62..8f68e21 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * A Column represents the way to store a fully qualified column in a specific @@ -38,12 +39,15 @@ * column. * @param timestamp version timestamp. When null the server timestamp will be * used. + * @param attributes Map of attributes for this mutation. used in the coprocessor + * to set/read the cell tags * @param inputValue the value to write to the rowKey and column qualifier. * Nothing gets written when null. * @throws IOException */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, - Long timestamp, Object inputValue) throws IOException; + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException; /** * Get the latest version of this specified column. Note: this call clones the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java index f1b7c58..3a2e088 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -31,7 +31,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; - +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * This class is meant to be used only by explicit Columns, and not directly to * write by clients. @@ -58,31 +59,66 @@ public ColumnHelper(ColumnFamily columnFamily) { * Sends a Mutation to the table. The mutations will be buffered and sent over * the wire as part of a batch. * - * @param rowKey identifying the row to write. Nothing gets written when null. - * @param tableMutator used to modify the underlying HBase table - * @param columnQualifier column qualifier. Nothing gets written when null. - * @param timestamp version timestamp. When null the server timestamp will be - * used. - * @param inputValue the value to write to the rowKey and column qualifier. - * Nothing gets written when null. + * @param rowKey + * identifying the row to write. Nothing gets written when null. + * @param tableMutator + * used to modify the underlying HBase table + * @param columnQualifier + * column qualifier. Nothing gets written when null. + * @param timestamp + * version timestamp. When null the current timestamp multiplied with + * TimestampGenerator.TS_MULTIPLIER and added with last 3 digits of + * app id will be used + * @param inputValue + * the value to write to the rowKey and column qualifier. Nothing + * gets written when null. * @throws IOException */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, - byte[] columnQualifier, Long timestamp, Object inputValue) - throws IOException { + byte[] columnQualifier, Long timestamp, Object inputValue, + Attribute... attributes) throws IOException { if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) { return; } Put p = new Put(rowKey); + timestamp = getPutTimestamp(timestamp, attributes); + p.addColumn(columnFamilyBytes, columnQualifier, timestamp, + GenericObjectMapper.write(inputValue)); + if ((attributes != null) && (attributes.length > 0)) { + for (Attribute attribute : attributes) { + p.setAttribute(attribute.getName(), attribute.getValue()); + } + } + tableMutator.mutate(p); + } + /* + * Figures out the cell timestamp used in the Put For storing into flow run + * table. We would like to left shift the timestamp and supplement it with the + * AppId id so that there are no collisions in the flow run table's cells + */ + private long getPutTimestamp(Long timestamp, Attribute[] attributes) { if (timestamp == null) { - p.addColumn(columnFamilyBytes, columnQualifier, - GenericObjectMapper.write(inputValue)); - } else { - p.addColumn(columnFamilyBytes, columnQualifier, timestamp, - GenericObjectMapper.write(inputValue)); + timestamp = System.currentTimeMillis(); } - tableMutator.mutate(p); + String appId = getAppIdFromAttributes(attributes); + long supplementedTS = TimestampGenerator.getSupplementedTimestamp( + timestamp, appId); + return supplementedTS; + } + + private String getAppIdFromAttributes(Attribute[] attributes) { + if (attributes == null) { + return null; + } + String appId = null; + for (Attribute attribute : attributes) { + if (AggregationCompactionDimension.APPLICATION_ID.toString().equals( + attribute.getName())) { + appId = Bytes.toString(attribute.getValue()); + } + } + return appId; } /** @@ -171,7 +207,9 @@ public Object readResult(Result result, byte[] columnQualifierBytes) for (Entry cell : cells.entrySet()) { V value = (V) GenericObjectMapper.read(cell.getValue()); - cellResults.put(cell.getKey(), value); + cellResults.put( + TimestampGenerator.getTruncatedTimestamp(cell.getKey()), + value); } } results.put(columnName, cellResults); @@ -315,6 +353,27 @@ public Object readResult(Result result, byte[] columnQualifierBytes) /** * @param columnPrefixBytes The byte representation for the column prefix. * Should not contain {@link Separator#QUALIFIERS}. + * @param qualifier for the remainder of the column. + * @return fully sanitized column qualifier that is a combination of prefix + * and qualifier. If prefix is null, the result is simply the encoded + * qualifier without any separator. + */ + public static byte[] getColumnQualifier(byte[] columnPrefixBytes, + long qualifier) { + + if (columnPrefixBytes == null) { + return Bytes.toBytes(qualifier); + } + + // Convert qualifier to lower case, strip of separators and tag on column + // prefix. + byte[] columnQualifier = + Separator.QUALIFIERS.join(columnPrefixBytes, Bytes.toBytes(qualifier)); + return columnQualifier; + } + /** + * @param columnPrefixBytes The byte representation for the column prefix. + * Should not contain {@link Separator#QUALIFIERS}. * @param qualifier the byte representation for the remainder of the column. * @return fully sanitized column qualifier that is a combination of prefix * and qualifier. If prefix is null, the result is simply the encoded diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java index 509ff49..db49098 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Used to represent a partially qualified column, where the actual column name @@ -43,12 +44,36 @@ * @param qualifier column qualifier. Nothing gets written when null. * @param timestamp version timestamp. When null the server timestamp will be * used. + *@param attributes attributes for the mutation that are used by the coprocessor + * to set/read the cell tags * @param inputValue the value to write to the rowKey and column qualifier. * Nothing gets written when null. * @throws IOException */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, - String qualifier, Long timestamp, Object inputValue) throws IOException; + byte[] qualifier, Long timestamp, Object inputValue, + Attribute... attributes) throws IOException; + + /** + * Sends a Mutation to the table. The mutations will be buffered and sent over + * the wire as part of a batch. + * + * @param rowKey identifying the row to write. Nothing gets written when null. + * @param tableMutator used to modify the underlying HBase table. Caller is + * responsible to pass a mutator for the table that actually has this + * column. + * @param qualifier column qualifier. Nothing gets written when null. + * @param timestamp version timestamp. When null the server timestamp will be + * used. + *@param attributes attributes for the mutation that are used by the coprocessor + * to set/read the cell tags + * @param inputValue the value to write to the rowKey and column qualifier. + * Nothing gets written when null. + * @throws IOException + */ + public void store(byte[] rowKey, TypedBufferedMutator tableMutator, + String qualifier, Long timestamp, Object inputValue, + Attribute... attributes) throws IOException; /** * Get the latest version of this specified column. Note: this call clones the @@ -81,4 +106,5 @@ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, */ public NavigableMap> readResultsWithTimestamps(Result result) throws IOException; + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java index 58bdedc7e..775cdea 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java @@ -19,9 +19,19 @@ import java.util.ArrayList; import java.util.List; +import java.util.SortedSet; +import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * bunch of utility functions used across TimelineWriter classes @@ -36,6 +46,9 @@ /** indicator for no limits for splitting */ public static final int NO_LIMIT_SPLIT = -1; + /** milliseconds in one day */ + public static final long MILLIS_ONE_DAY = 86400000L; + /** * Splits the source array into multiple array segments using the given * separator, up to a maximum of count items. This will naturally produce @@ -140,4 +153,175 @@ public static long invert(Long key) { return Long.MAX_VALUE - key; } + /** + * returns the timestamp of that day's start (which is midnight 00:00:00 AM) + * for a given input timestamp + * + * @param ts + * @return timestamp of that day's beginning (midnight) + */ + public static long getTopOfTheDayTimestamp(long ts) { + long dayTimestamp = ts - (ts % MILLIS_ONE_DAY); + return dayTimestamp; + } + + /** + * this method combines the input array of attributes and the input + * aggregation operation into a new array of attributes + * + * @param attributes + * @param aggOp + * @return array of combined attributes + */ + public static Attribute[] combineAttributes(Attribute[] attributes, + AggregationOperation aggOp) { + int newLength = getNewLengthCombinedAttributes(attributes, aggOp); + Attribute[] combinedAttributes = new Attribute[newLength]; + + if (attributes != null) { + System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length); + } + + if (aggOp != null) { + Attribute a2 = aggOp.getAttribute(); + combinedAttributes[newLength - 1] = a2; + } + return combinedAttributes; + } + + /** + * returns a number for the new array size. The new array is the combination + * of input array of attributes and the input aggregation operation + * + * @param attributes + * @param aggOp + * @return + */ + private static int getNewLengthCombinedAttributes(Attribute[] attributes, + AggregationOperation aggOp) { + int oldLength = getAttributesLength(attributes); + int aggLength = getAppOpLength(aggOp); + return oldLength + aggLength; + } + + private static int getAppOpLength(AggregationOperation aggOp) { + if (aggOp != null) { + return 1; + } + return 0; + } + + private static int getAttributesLength(Attribute[] attributes) { + if (attributes != null) { + return attributes.length; + } + return 0; + } + + /** + * checks if an application has finished + * + * @param te + * @return true if application has finished else false + */ + public static boolean isApplicationFinished(TimelineEntity te) { + SortedSet allEvents = te.getEvents(); + if ((allEvents != null) && (allEvents.size() > 0)) { + TimelineEvent event = allEvents.last(); + if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { + return true; + } + } + return false; + } + + /** + * get the time at which an app finished + * + * @param te + * @return true if application has finished else false + */ + public static long getApplicationFinishedTime(TimelineEntity te) { + SortedSet allEvents = te.getEvents(); + if ((allEvents != null) && (allEvents.size() > 0)) { + TimelineEvent event = allEvents.last(); + if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { + return event.getTimestamp(); + } + } + return 0l; + } + + /** + * checks if the input TimelineEntity object is an ApplicationEntity + * + * @param te + * @return true if input is an ApplicationEntity, false otherwise + */ + public static boolean isApplicationEntity(TimelineEntity te) { + return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); + } + + /** + * checks for the APPLICATION_CREATED event + * + * @param te + * @return true is application event exists, false otherwise + */ + public static boolean isApplicationCreated(TimelineEntity te) { + if (isApplicationEntity(te)) { + for (TimelineEvent event : te.getEvents()) { + if (event.getId() + .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { + return true; + } + } + } + return false; + } + + /** + * returns the first seen aggregation operation as seen in the list of input tags + * or null otherwise + * @param tags + * @return AggregationOperation + */ + public static AggregationOperation getAggregationOperationFromTagsList( + List tags) { + for (AggregationOperation aggOp : AggregationOperation.values()) { + for (Tag tag : tags) { + if (tag.getType() == aggOp.getTagType()) { + return aggOp; + } + } + } + return null; + } + + /** + * creates a {@link Tag} from the input attribute + * + * @param attribute + * @return Tag + */ + public static Tag getTagFromAttribute(Entry attribute) { + // attribute could be either an Aggregation Operation or + // an Aggregation Dimension + // Get the Tag type from either + AggregationOperation aggOp = AggregationOperation + .getAggregationOperation(attribute.getKey()); + if (aggOp != null) { + Tag t = new Tag(aggOp.getTagType(), attribute.getValue()); + return t; + } + + AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension + .getAggregationCompactionDimension(attribute.getKey()); + if (aggCompactDim != null) { + Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue()); + return t; + } + return null; + } + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java new file mode 100644 index 0000000..39afe48 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimestampGenerator.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.ConverterUtils; + +/** + * Utility class that allows HBase coprocessors to interact with unique + * timestamps. + */ +public class TimestampGenerator { + + /* + * if this is changed, then reading cell timestamps written with older + * multiplier value will not work + */ + public static final long TS_MULTIPLIER = 1000L; + + private final AtomicLong lastTimestamp = new AtomicLong(); + + /** + * Returns the current wall clock time in milliseconds, multiplied by the + * required precision. + */ + public long currentTime() { + // We want to align cell timestamps with current time. + // cell timestamps are not be less than + // System.currentTimeMillis() * TS_MULTIPLIER. + return System.currentTimeMillis() * TS_MULTIPLIER; + } + + /** + * Returns a timestamp value unique within the scope of this + * {@code TimestampGenerator} instance. For usage by HBase + * {@code RegionObserver} coprocessors, this normally means unique within a + * given region. + */ + public long getUniqueTimestamp() { + long lastTs; + long nextTs; + do { + lastTs = lastTimestamp.get(); + nextTs = Math.max(lastTs + 1, currentTime()); + } while (!lastTimestamp.compareAndSet(lastTs, nextTs)); + return nextTs; + } + + /** + * returns a timestamp multiplied with TS_MULTIPLIER and last few digits of + * application id + * + * @param incomingTS + * @param appId + * @return + */ + public static long getSupplementedTimestamp(long incomingTS, String appId) { + long suffix = getAppIdSuffix(appId); + long outgoingTS = incomingTS * TS_MULTIPLIER + suffix; + return outgoingTS; + + } + + private static long getAppIdSuffix(String appIdStr) { + if (appIdStr == null) { + return 0L; + } + ApplicationId appId = ConverterUtils.toApplicationId(appIdStr); + long id = appId.getId() % TS_MULTIPLIER; + return id; + } + + /** + * truncates the last few digits of the timestamp which were supplemented by + * the TimestampGenerator#getSupplementedTimestamp function + * + * @param incomingTS + * @return + */ + public static long getTruncatedTimestamp(long incomingTS) { + return incomingTS / TS_MULTIPLIER; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/FlowRunCoprocessor.java new file mode 100644 index 0000000..065beb6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/FlowRunCoprocessor.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; + +public class FlowRunCoprocessor extends BaseRegionObserver { + + @SuppressWarnings("unused") + private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); + + private HRegion region; + private final TimestampGenerator timestampGenerator = new TimestampGenerator(); + + @Override + public void start(CoprocessorEnvironment e) throws IOException { + if (e instanceof RegionCoprocessorEnvironment) { + RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; + this.region = env.getRegion(); + } + } + + /* + * (non-Javadoc) + * + * This method adds the tags onto the cells in the Put. It is presumed that + * all the cells in one Put have the same set of Tags. The existing cell + * timestamp is overwritten for non-metric cells and each such cell gets a new + * unique timestamp generated by {@link TimestampGenerator} + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache + * .hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Put, + * org.apache.hadoop.hbase.regionserver.wal.WALEdit, + * org.apache.hadoop.hbase.client.Durability) + */ + @Override + public void prePut(ObserverContext e, Put put, + WALEdit edit, Durability durability) throws IOException { + Map attributes = put.getAttributesMap(); + + // Assumption is that all the cells in a put are the same operation. + List tags = new ArrayList<>(); + if ((attributes != null) && (attributes.size() > 0)) { + for (Map.Entry attribute : attributes.entrySet()) { + Tag t = TimelineWriterUtils.getTagFromAttribute(attribute); + tags.add(t); + } + byte[] tagByteArray = Tag.fromList(tags); + NavigableMap> newFamilyMap = new TreeMap<>( + Bytes.BYTES_COMPARATOR); + for (Map.Entry> entry : put.getFamilyCellMap() + .entrySet()) { + List newCells = new ArrayList<>(entry.getValue().size()); + for (Cell cell : entry.getValue()) { + // for each cell in the put add the tags + // Assumption is that all the cells in + // one put are the same operation + // also, get a unique cell timestamp for non-metric cells + // this way we don't inadvertently overwrite cell versions + long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags); + newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell), + CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), + cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell), + tagByteArray)); + } + newFamilyMap.put(entry.getKey(), newCells); + } // for each entry + // Update the family map for the Put + put.setFamilyCellMap(newFamilyMap); + } + } + + /** + * this method determines if the current cell's timestamp is to be used or a + * new unique cell timestamp is to be used.The reason this is done is to + * inadvertently overwrite cells when writes come in very fast. But for metric + * cells, the cell timestamp signifies the metric timestamp. Hence we don't + * want to overwrite it + * + * @param timestamp + * @param tags + * @return cell timestamp + */ + private long getCellTimestamp(Long timestamp, List tags) { + // if ts not set, then use the generator + if (timestamp == HConstants.LATEST_TIMESTAMP) { + return timestampGenerator.getUniqueTimestamp(); + } else { + return timestamp; + } + } + + /* + * (non-Javadoc) + * + * creates a {@link FlowScanner} Scan so that it can correctly process the + * contents of {@link FlowRunTable} + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache + * .hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Get, java.util.List) + */ + @Override + public void preGetOp(ObserverContext e, + Get get, List results) throws IOException { + Scan scan = new Scan(get); + scan.setMaxVersions(); + RegionScanner scanner = null; + try { + scanner = new FlowScanner(region, scan.getBatch(), + region.getScanner(scan)); + scanner.next(results); + e.bypass(); + } finally { + if (scanner != null) { + scanner.close(); + } + } + } + + /* + * (non-Javadoc) + * + * ensures that max versions are set for the Scan so that metrics can be + * correctly aggregated and min/max can be correctly determined + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org + * .apache.hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Scan, + * org.apache.hadoop.hbase.regionserver.RegionScanner) + */ + @Override + public RegionScanner preScannerOpen( + ObserverContext e, Scan scan, + RegionScanner s) throws IOException { + // set max versions for scan to see all + // versions to aggregate for metrics + scan.setMaxVersions(); + return s; + } + + /* + * (non-Javadoc) + * + * creates a {@link FlowScanner} Scan so that it can correctly process the + * contents of {@link FlowRunTable} + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen( + * org.apache.hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Scan, + * org.apache.hadoop.hbase.regionserver.RegionScanner) + */ + @Override + public RegionScanner postScannerOpen( + ObserverContext e, Scan scan, + RegionScanner scanner) throws IOException { + return new FlowScanner(region, scan.getBatch(), scanner); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/FlowScanner.java new file mode 100644 index 0000000..cca90fc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/FlowScanner.java @@ -0,0 +1,499 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.coprocessor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.Iterator; + +/** + * Invoked via the coprocessor when a Get or a Scan is issued for flow run table + * Looks through the list of cells per row, checks their tags and does operation + * on those cells as per the cell tags Transforms reads of the stored metrics + * into calculated sums for each column Also, finds the min and max for start + * and end times in a flow run + */ +class FlowScanner implements RegionScanner, Closeable { + + private static final Log LOG = LogFactory.getLog(FlowScanner.class); + + private final HRegion region; + private final InternalScanner flowRunScanner; + private RegionScanner regionScanner; + private final int limit; + private boolean hasMore; + private byte[] currentRow; + private List availableCells = new ArrayList<>(); + private int currentIndex; + + FlowScanner(HRegion region, int limit, InternalScanner internalScanner) { + this.region = region; + this.limit = limit; + this.flowRunScanner = internalScanner; + if (internalScanner instanceof RegionScanner) { + this.regionScanner = (RegionScanner) internalScanner; + } + // TODO: note if it's compaction/flush + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo() + */ + @Override + public HRegionInfo getRegionInfo() { + return region.getRegionInfo(); + } + + @Override + public boolean nextRaw(List cells) throws IOException { + return nextRaw(cells, limit); + } + + @Override + public boolean nextRaw(List cells, int limit) throws IOException { + return nextInternal(cells, limit); + } + + @Override + public boolean next(List cells) throws IOException { + return next(cells, limit); + } + + @Override + public boolean next(List cells, int limit) throws IOException { + return nextInternal(cells, limit); + } + + private String getAggregationCompactionDimension(List tags) { + String appId = null; + for (Tag t : tags) { + if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t + .getType()) { + appId = Bytes.toString(t.getValue()); + } + } + return appId; + } + + /** + * This method loops through the cells in a given row of the + * {@link FlowRunTable}. It looks at the tags of each cell to figure out how + * to process the contents. It then calculates the sum or min or max for each + * column + * + * @param cells + * @param limit + * @return + * @throws IOException + */ + private boolean nextInternal(List cells, int limit) throws IOException { + Cell cell = null; + startNext(); + // loop through all the cells in this row + // for min/max/metrics we do need to scan the entire set of cells to get + // the right one + // but with flush/compaction, the number of cells being scanned will go + // down + // cells are grouped per column qualifier then sorted by cell timestamp + // (latest to oldest) + // so all cells in one qualifier come one after the other before we see the + // next column qualifier + ByteArrayComparator comp = new ByteArrayComparator(); + byte[] currentColumnQualifier = TimelineWriterUtils.EMPTY_BYTES; + AggregationOperation currentAggOp = null; // We assume that all the + // operations for a particular + // column are the same + SortedSet currentColumnCells = null; + Set alreadySeenAggDim = null; + int addedCnt = 0; + boolean firstLoop = true; + boolean collectedButNotEmitted = false; + while ((cell = peekAtNextCell(limit)) != null) { + byte[] newColumnQualifier = CellUtil.cloneQualifier(cell); + // if it's the first time in the loop or it's a new column + if (firstLoop + || (comp.compare(currentColumnQualifier, newColumnQualifier) != 0)) { + // this is a new column + firstLoop = false; + if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { + addedCnt += emitCells(cells, currentColumnCells, currentAggOp); + collectedButNotEmitted = false; + if ((limit > 0) && (addedCnt >= limit)) { + // we have emitted the limit number of cells, hence break + break; + } + } + currentColumnQualifier = newColumnQualifier; + List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + currentAggOp = TimelineWriterUtils + .getAggregationOperationFromTagsList(tags); + // We assume that all the operations for a particular column are the + // same + currentColumnCells = new TreeSet<>(KeyValue.COMPARATOR); + alreadySeenAggDim = new HashSet<>(); + } + + collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim); + collectedButNotEmitted = true; + nextCell(limit); + } + if (collectedButNotEmitted) { + emitCells(cells, currentColumnCells, currentAggOp); + } + return hasMore(); + } + + private void collectCells(SortedSet currentColumnCells, + AggregationOperation currentAggOp, Cell cell, + Set alreadySeenAggDim) throws IOException { + if (currentAggOp == null) { + // not a min/max/metric cell, so just return it as is + currentColumnCells.add(cell); + nextCell(limit); + return; + } + + switch (currentAggOp) { + case MIN: + if (currentColumnCells.size() == 0) { + currentColumnCells.add(cell); + } else { + Cell currentMinCell = currentColumnCells.first(); + Cell newMinCell = findMinCell(currentMinCell, cell); + + if (!currentMinCell.equals(newMinCell)) { + currentColumnCells.remove(currentMinCell); + currentColumnCells.add(newMinCell); + } + } + break; + case MAX: + + if (currentColumnCells.size() == 0) { + currentColumnCells.add(cell); + + } else { + Cell currentMaxCell = currentColumnCells.first(); + Cell newMaxCell = findMaxCell(currentMaxCell, cell); + if (!currentMaxCell.equals(newMaxCell)) { + currentColumnCells.remove(currentMaxCell); + currentColumnCells.add(newMaxCell); + } + } + break; + case SUM: + case SUM_FINAL: + // only if this app has not been seen yet, add to current column cells + List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + String aggDim = getAggregationCompactionDimension(tags); + if (alreadySeenAggDim.contains(aggDim)) { + // if this agg dimension has already been seen, + // since they show up in sorted order + // we drop the rest which are older + // in other words, this cell is older than previously seen cells + // for that agg dim + } else { + // not seen this agg dim, hence consider this cell in our working set + currentColumnCells.add(cell); + alreadySeenAggDim.add(aggDim); + } + break; + default: + break; + } // end of switch case + } + + /* + * Processes the cells in input param currentColumnCells and populates + * List cells as the output based on the input AggregationOperation + * parameter. + */ + private int emitCells(List cells, SortedSet currentColumnCells, + AggregationOperation currentAggOp) throws IOException { + if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { + return 0; + } + if (currentAggOp == null) { + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } + + switch (currentAggOp) { + case MIN: + case MAX: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + case SUM: + case SUM_FINAL: + Cell sumCell = processSummation(currentColumnCells); + cells.add(sumCell); + return 1; + default: + cells.addAll(currentColumnCells); + return currentColumnCells.size(); + } + } + + /* + * returns a cell whose value is the sum of all cell values in the input set. + * The new cell created has the timestamp of the most recent metric cell. The + * sum of a metric for a flow run is the summation at the point of the last + * metric update in that flow till that time + */ + private Cell processSummation(SortedSet currentColumnCells) + throws IOException { + Cell cell; + Number sum = 0; + Number currentValue = 0; + long ts = 0L; + long mostCurrentTimestamp = 0l; + Cell mostRecentCell = null; + Iterator it = currentColumnCells.iterator(); + while (it.hasNext()) { + cell = it.next(); + currentValue = (Number) GenericObjectMapper.read(CellUtil + .cloneValue(cell)); + ts = cell.getTimestamp(); + if (mostCurrentTimestamp < ts) { + mostCurrentTimestamp = ts; + mostRecentCell = cell; + } + sum = sum.longValue() + currentValue.longValue(); + } + Cell sumCell = createNewCell(mostRecentCell, sum); + return sumCell; + } + + private Cell findMinCell(Cell currentMinCell, Cell cell) throws IOException { + if (currentMinCell == null) { + return cell; + } + try { + long currentMinValue = ((Number) GenericObjectMapper.read(CellUtil + .cloneValue(currentMinCell))).longValue(); + long currentCellValue = ((Number) GenericObjectMapper.read(CellUtil + .cloneValue(cell))).longValue(); + if (currentCellValue < currentMinValue) { + // new value is minimum, hence return this cell + return cell; + } else { + // exising min value is miniumum, hence return existing min cell + return currentMinCell; + } + } catch (IllegalArgumentException iae) { + LOG.error("caught iae during conversion to long ", iae); + return cell; + } + } + + private Cell findMaxCell(Cell currentMaxCell, Cell cell) throws IOException { + if (currentMaxCell == null) { + return cell; + } + try { + long currentMaxCellValue = ((Number) GenericObjectMapper.read(CellUtil + .cloneValue(currentMaxCell))).longValue(); + long currentCellValue = ((Number) GenericObjectMapper.read(CellUtil + .cloneValue(cell))).longValue(); + if (currentCellValue > currentMaxCellValue) { + // new value is max, hence return this cell + return cell; + } else { + // exising min value is max, hence return existing max cell + return currentMaxCell; + } + } catch (IllegalArgumentException e) { + LOG.error("Caught iae during long conversion ", e); + return cell; + } + } + + private Cell createNewCell(Cell origCell, Number number) throws IOException { + byte[] newValue = GenericObjectMapper.write(number); + return CellUtil.createCell(CellUtil.cloneRow(origCell), + CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), + origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); + } + + @Override + public void close() throws IOException { + flowRunScanner.close(); + } + + /** + * Wraps the underlying store or region scanner in an API that hides the + * details of calling and managing the buffered batch of results. + */ + + /** + * Called to signal the start of the next() call by the scanner. + */ + public void startNext() { + currentRow = null; + } + + /** + * Returns whether or not the underlying scanner has more rows. + */ + public boolean hasMore() { + return currentIndex < availableCells.size() ? true : hasMore; + } + + /** + * Returns the next available cell for the current row and advances the + * pointer to the next cell. This method can be called multiple times in a row + * to advance through all the available cells. + * + * @param limit + * the limit of number of cells to return if the next batch must be + * fetched by the wrapped scanner + * @return the next available cell or null if no more cells are available for + * the current row + * @throws IOException + */ + public Cell nextCell(int limit) throws IOException { + Cell cell = peekAtNextCell(limit); + if (cell != null) { + currentIndex++; + } + return cell; + } + + /** + * Returns the next available cell for the current row, without advancing the + * pointer. Calling this method multiple times in a row will continue to + * return the same cell. + * + * @param limit + * the limit of number of cells to return if the next batch must be + * fetched by the wrapped scanner + * @return the next available cell or null if no more cells are available for + * the current row + * @throws IOException + */ + public Cell peekAtNextCell(int limit) throws IOException { + if (currentIndex >= availableCells.size()) { + // done with current batch + availableCells.clear(); + currentIndex = 0; + hasMore = flowRunScanner.next(availableCells, limit); + } + Cell cell = null; + if (currentIndex < availableCells.size()) { + cell = availableCells.get(currentIndex); + if (currentRow == null) { + currentRow = CellUtil.cloneRow(cell); + } else if (!CellUtil.matchingRow(cell, currentRow)) { + // moved on to the next row + // don't use the current cell + // also signal no more cells for this row + return null; + } + } + return cell; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize() + */ + @Override + public long getMaxResultSize() { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's scanner is not a RegionScanner"); + } + return regionScanner.getMaxResultSize(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint() + */ + @Override + public long getMvccReadPoint() { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.getMvccReadPoint(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone() + */ + @Override + public boolean isFilterDone() throws IOException { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.isFilterDone(); + + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[]) + */ + @Override + public boolean reseek(byte[] bytes) throws IOException { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.reseek() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.reseek(bytes); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java index 26e7748..8ae19b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; @@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Identifies fully qualified columns for the {@link EntityTable}. @@ -81,9 +83,9 @@ private String getColumnQualifier() { public void store(byte[] rowKey, TypedBufferedMutator tableMutator, Long timestamp, - Object inputValue) throws IOException { + Object inputValue, Attribute... attributes) throws IOException { column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue); + inputValue, attributes); } public Object readResult(Result result) throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 75ff742..781746a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Identifies partially qualified columns for the entity table. @@ -103,16 +104,18 @@ public String getColumnPrefix() { /* * (non-Javadoc) - * + * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #store(byte[], * org.apache.hadoop.yarn.server.timelineservice.storage.common. - * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, String qualifier, - Long timestamp, Object inputValue) throws IOException { + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { // Null check if (qualifier == null) { @@ -123,8 +126,9 @@ public void store(byte[] rowKey, byte[] columnQualifier = ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); - } + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); + } /* * (non-Javadoc) @@ -137,7 +141,8 @@ public void store(byte[] rowKey, */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, byte[] qualifier, - Long timestamp, Object inputValue) throws IOException { + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { // Null check if (qualifier == null) { @@ -148,8 +153,9 @@ public void store(byte[] rowKey, byte[] columnQualifier = ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); - } + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); + } /* * (non-Javadoc) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java new file mode 100644 index 0000000..ff12c7b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Identifies the compaction dimensions for the data in the {@link FlowRunTable} + * . + */ +public enum AggregationCompactionDimension { + + /** + * the application id + */ + APPLICATION_ID((byte) 101); + + private byte tagType; + private byte[] inBytes; + + private AggregationCompactionDimension(byte tagType) { + this.tagType = tagType; + this.inBytes = Bytes.toBytes(this.name()); + } + + public Attribute getAttribute(String attributeValue) { + return new Attribute(this.name(), Bytes.toBytes(attributeValue)); + } + + public byte getTagType() { + return tagType; + } + + public byte[] getInBytes() { + return this.inBytes.clone(); + } + + public static AggregationCompactionDimension getAggregationCompactionDimension( + String aggCompactDimStr) { + for (AggregationCompactionDimension aggDim : AggregationCompactionDimension + .values()) { + if (aggDim.name().equals(aggCompactDimStr)) { + return aggDim; + } + } + return null; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java new file mode 100644 index 0000000..58c6c3e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperation.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Identifies the attributes to be set for puts into the {@link FlowRunTable}. + * The numbers used for tagType are prime numbers + */ +public enum AggregationOperation { + + /** + * When the flow was started. + */ + MIN((byte) 71), + + /** + * When it ended. + */ + MAX((byte) 73), + + /** + * The metrics of the flow + */ + SUM((byte) 79), + + /** + * application running + */ + SUM_FINAL((byte) 83), + + /** + * compact + */ + COMPACT((byte) 89); + + private byte tagType; + private byte[] inBytes; + + private AggregationOperation(byte tagType) { + this.tagType = tagType; + this.inBytes = Bytes.toBytes(this.name()); + } + + public Attribute getAttribute() { + return new Attribute(this.name(), this.inBytes); + } + + public byte getTagType() { + return tagType; + } + + public byte[] getInBytes() { + return this.inBytes.clone(); + } + + /** + * returns the AggregationOperation enum that represents that string + * @param aggOpStr + * @return + */ + public static AggregationOperation getAggregationOperation(String aggOpStr) { + for (AggregationOperation aggOp : AggregationOperation.values()) { + if (aggOp.name().equals(aggOpStr)) { + return aggOp; + } + } + return null; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java new file mode 100644 index 0000000..0a2e28f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +/** + * Defines the attribute tuple to be set for puts into the {@link FlowRunTable}. + */ +public class Attribute { + private final String name; + private final byte[] value; + + public Attribute(String name, byte[] value) { + this.name = name; + this.value = value.clone(); + } + + public String getName() { + return name; + } + + public byte[] getValue() { + return value.clone(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java new file mode 100644 index 0000000..d991b42 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents the flow run table column families. + */ +public enum FlowActivityColumnFamily implements ColumnFamily { + + /** + * Info column family houses known columns, specifically ones included in + * columnfamily filters. + */ + INFO("i"); + + /** + * Byte representation of this column family. + */ + private final byte[] bytes; + + /** + * @param value + * create a column family with this name. Must be lower case and + * without spaces. + */ + private FlowActivityColumnFamily(String value) { + // column families should be lower case and not contain any spaces. + this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); + } + + public byte[] getBytes() { + return Bytes.copy(bytes); + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java new file mode 100644 index 0000000..a44d2a7 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +/** + * Identifies partially qualified columns for the {@link FlowActivityTable} + */ +public enum FlowActivityColumnPrefix implements ColumnPrefix { + + /** + * To store run ids of the flows + */ + RUN_ID(FlowActivityColumnFamily.INFO, "r", null); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + + /** + * Can be null for those cases where the provided column qualifier is the + * entire column name. + */ + private final String columnPrefix; + private final byte[] columnPrefixBytes; + + private final AggregationOperation aggOp; + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily + * that this column is stored in. + * @param columnPrefix + * for this column. + */ + private FlowActivityColumnPrefix( + ColumnFamily columnFamily, String columnPrefix, + AggregationOperation aggOp) { + column = new ColumnHelper(columnFamily); + this.columnFamily = columnFamily; + this.columnPrefix = columnPrefix; + if (columnPrefix == null) { + this.columnPrefixBytes = null; + } else { + // Future-proof by ensuring the right column prefix hygiene. + this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE + .encode(columnPrefix)); + } + this.aggOp = aggOp; + } + + /** + * @return the column name value + */ + public String getColumnPrefix() { + return columnPrefix; + } + + public byte[] getColumnPrefixBytes() { + return columnPrefixBytes.clone(); + } + + public AggregationOperation getAttribute() { + return aggOp; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, byte[], java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) + */ + @Override + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, byte[] qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + combinedAttributes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) + */ + public Object readResult(Result result, String qualifier) throws IOException { + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + return column.readResult(result, columnQualifier); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResults(org.apache.hadoop.hbase.client.Result) + */ + public Map readResults(Result result) throws IOException { + return column.readResults(result, columnPrefixBytes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) + */ + public NavigableMap> readResultsWithTimestamps( + Result result) throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes); + } + + /** + * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there + * is no match. The following holds true: {@code columnFor(x) == columnFor(y)} + * if and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowActivityColumnPrefix} or null + */ + public static final FlowActivityColumnPrefix columnFor(String columnPrefix) { + + // Match column based on value, assume column family matches. + for (FlowActivityColumnPrefix flowActivityColPrefix : FlowActivityColumnPrefix + .values()) { + // Find a match based only on name. + if (flowActivityColPrefix.getColumnPrefix().equals(columnPrefix)) { + return flowActivityColPrefix; + } + } + // Default to null + return null; + } + + /** + * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there + * is no match. The following holds true: + * {@code columnFor(a,x) == columnFor(b,y)} if and only if + * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} + * + * @param columnFamily + * The columnFamily for which to retrieve the column. + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowActivityColumnPrefix} or null if both + * arguments don't match. + */ + public static final FlowActivityColumnPrefix columnFor( + FlowActivityColumnFamily columnFamily, String columnPrefix) { + + // TODO: needs unit test to confirm and need to update javadoc to explain + // null prefix case. + + for (FlowActivityColumnPrefix flowActivityColumnPrefix : FlowActivityColumnPrefix + .values()) { + // Find a match based column family and on name. + if (flowActivityColumnPrefix.columnFamily.equals(columnFamily) + && (((columnPrefix == null) && (flowActivityColumnPrefix + .getColumnPrefix() == null)) || (flowActivityColumnPrefix + .getColumnPrefix().equals(columnPrefix)))) { + return flowActivityColumnPrefix; + } + } + // Default to null + return null; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) + */ + @Override + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, String qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, null, inputValue, + combinedAttributes); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java new file mode 100644 index 0000000..e8509b2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; + +/** + * Represents a rowkey for the flow activity table. + */ +public class FlowActivityRowKey { + + private final String clusterId; + private final long dayTs; + private final String userId; + private final String flowId; + + public FlowActivityRowKey(String clusterId, long dayTs, String userId, + String flowId) { + this.clusterId = clusterId; + this.dayTs = dayTs; + this.userId = userId; + this.flowId = flowId; + } + + public String getClusterId() { + return clusterId; + } + + public long getDayTimestamp() { + return dayTs; + } + + public String getUserId() { + return userId; + } + + public String getFlowId() { + return flowId; + } + + /** + * Constructs a row key for the flow activity table as follows: + * {@code clusterId!dayTimestamp!user!flowId} + * + * Will insert into current day's record in the table + * @param clusterId + * @param userId + * @param flowId + * @return byte array with the row key prefix + */ + public static byte[] getRowKey(String clusterId, String userId, String flowId) { + long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + .currentTimeMillis()); + return getRowKey(clusterId, dayTs, userId, flowId); + } + + /** + * Constructs a row key for the flow activity table as follows: + * {@code clusterId!dayTimestamp!user!flowId} + * + * @param clusterId + * @param dayTs + * @param userId + * @param flowId + * @return + */ + public static byte[] getRowKey(String clusterId, long dayTs, String userId, + String flowId) { + return Separator.QUALIFIERS.join( + Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)), + Bytes.toBytes(TimelineWriterUtils.invert(dayTs)), + Bytes.toBytes(Separator.QUALIFIERS.encode(userId)), + Bytes.toBytes(Separator.QUALIFIERS.encode(flowId))); + } + + /** + * Given the raw row key as bytes, returns the row key as an object. + */ + public static FlowActivityRowKey parseRowKey(byte[] rowKey) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + + if (rowKeyComponents.length < 4) { + throw new IllegalArgumentException("the row key is not valid for " + + "a flow activity"); + } + + String clusterId = Separator.QUALIFIERS.decode(Bytes + .toString(rowKeyComponents[0])); + long dayTs = TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1])); + String userId = Separator.QUALIFIERS.decode(Bytes + .toString(rowKeyComponents[2])); + String flowId = Separator.QUALIFIERS.decode(Bytes + .toString(rowKeyComponents[3])); + return new FlowActivityRowKey(clusterId, dayTs, userId, flowId); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java new file mode 100644 index 0000000..af8df99 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; + +/** + * The flow activity table has column family info + * Stores the daily activity record for flows + * Useful as a quick lookup of what flows were + * running on a given day + * + * Example flow activity table record: + * + * + * |-------------------------------------------| + * | Row key | Column Family | + * | | info | + * |-------------------------------------------| + * | clusterId! | r!runid1:version1 | + * | inv Top of | | + * | Day! | r!runid2:version7 | + * | userName! | | + * | flowId | | + * |-------------------------------------------| + * + */ +public class FlowActivityTable extends BaseTable { + /** flow activity table prefix */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowactivity"; + + /** config param name that specifies the flowactivity table name */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** default value for flowactivity table name */ + public static final String DEFAULT_TABLE_NAME = "timelineservice.flowactivity"; + + private static final Log LOG = LogFactory.getLog(FlowActivityTable.class); + + /** default max number of versions */ + public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE; + + public FlowActivityTable() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable + * (org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor FlowActivityTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(FlowActivityColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + FlowActivityTableDescp.addFamily(infoCF); + infoCF.setMinVersions(1); + infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS); + + // TODO: figure the split policy before running in production + admin.createTable(FlowActivityTableDescp); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java new file mode 100644 index 0000000..93bb444 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +/** + * Identifies fully qualified columns for the {@link FlowRunTable}. + */ +public enum FlowRunColumn implements Column { + + /** + * When the flow was started. This is the minimum of currently known + * application start times. + */ + MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time", + AggregationOperation.MIN), + + /** + * When the flow ended. This is the maximum of currently known application end + * times. + */ + MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time", + AggregationOperation.MAX), + + /** + * The version of the flow that this flow belongs to. + */ + FLOW_VERSION(FlowRunColumnFamily.INFO, "flow_version", null); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + private final String columnQualifier; + private final byte[] columnQualifierBytes; + private final AggregationOperation aggOp; + + private FlowRunColumn(ColumnFamily columnFamily, + String columnQualifier, AggregationOperation aggOp) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + this.aggOp = aggOp; + // Future-proof by ensuring the right column prefix hygiene. + this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE + .encode(columnQualifier)); + this.column = new ColumnHelper(columnFamily); + } + + /** + * @return the column name value + */ + private String getColumnQualifier() { + return columnQualifier; + } + + public byte[] getColumnQualifierBytes() { + return columnQualifierBytes.clone(); + } + + public AggregationOperation getAggregationOperation() { + return aggOp; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.Column#store + * (byte[], org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, Long timestamp, + Object inputValue, Attribute... attributes) throws IOException { + + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, aggOp); + column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, + inputValue, combinedAttributes); + } + + public Object readResult(Result result) throws IOException { + return column.readResult(result, columnQualifierBytes); + } + + /** + * Retrieve an {@link FlowRunColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnQualifier + * Name of the column to retrieve + * @return the corresponding {@link FlowRunColumn} or null + */ + public static final FlowRunColumn columnFor(String columnQualifier) { + + // Match column based on value, assume column family matches. + for (FlowRunColumn ec : FlowRunColumn.values()) { + // Find a match based only on name. + if (ec.getColumnQualifier().equals(columnQualifier)) { + return ec; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link FlowRunColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} + * if and only if {@code a.equals(b) & x.equals(y)} or + * {@code (x == y == null)} + * + * @param columnFamily + * The columnFamily for which to retrieve the column. + * @param name + * Name of the column to retrieve + * @return the corresponding {@link FlowRunColumn} or null if both arguments + * don't match. + */ + public static final FlowRunColumn columnFor(FlowRunColumnFamily columnFamily, + String name) { + + for (FlowRunColumn ec : FlowRunColumn.values()) { + // Find a match based column family and on name. + if (ec.columnFamily.equals(columnFamily) + && ec.getColumnQualifier().equals(name)) { + return ec; + } + } + + // Default to null + return null; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java new file mode 100644 index 0000000..8faf5f8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents the flow run table column families. + */ +public enum FlowRunColumnFamily implements ColumnFamily { + + /** + * Info column family houses known columns, specifically ones included in + * columnfamily filters. + */ + INFO("i"); + + /** + * Byte representation of this column family. + */ + private final byte[] bytes; + + /** + * @param value + * create a column family with this name. Must be lower case and + * without spaces. + */ + private FlowRunColumnFamily(String value) { + // column families should be lower case and not contain any spaces. + this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); + } + + public byte[] getBytes() { + return Bytes.copy(bytes); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java new file mode 100644 index 0000000..3b3a97f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +/** + * Identifies partially qualified columns for the {@link FlowRunTable}. + */ +public enum FlowRunColumnPrefix implements ColumnPrefix { + + /** + * To store flow run info values. + */ + METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperation.SUM); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + + /** + * Can be null for those cases where the provided column qualifier is the + * entire column name. + */ + private final String columnPrefix; + private final byte[] columnPrefixBytes; + + private final AggregationOperation aggOp; + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily + * that this column is stored in. + * @param columnPrefix + * for this column. + */ + private FlowRunColumnPrefix(ColumnFamily columnFamily, + String columnPrefix, AggregationOperation fra) { + column = new ColumnHelper(columnFamily); + this.columnFamily = columnFamily; + this.columnPrefix = columnPrefix; + if (columnPrefix == null) { + this.columnPrefixBytes = null; + } else { + // Future-proof by ensuring the right column prefix hygiene. + this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE + .encode(columnPrefix)); + } + this.aggOp = fra; + } + + /** + * @return the column name value + */ + public String getColumnPrefix() { + return columnPrefix; + } + + public byte[] getColumnPrefixBytes() { + return columnPrefixBytes.clone(); + } + + public AggregationOperation getAttribute() { + return aggOp; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, String qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + combinedAttributes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, byte[] qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + combinedAttributes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) + */ + public Object readResult(Result result, String qualifier) throws IOException { + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + return column.readResult(result, columnQualifier); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResults(org.apache.hadoop.hbase.client.Result) + */ + public Map readResults(Result result) throws IOException { + return column.readResults(result, columnPrefixBytes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) + */ + public NavigableMap> readResultsWithTimestamps( + Result result) throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes); + } + + /** + * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is + * no match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowRunColumnPrefix} or null + */ + public static final FlowRunColumnPrefix columnFor(String columnPrefix) { + + // Match column based on value, assume column family matches. + for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) { + // Find a match based only on name. + if (frcp.getColumnPrefix().equals(columnPrefix)) { + return frcp; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is + * no match. The following holds true: + * {@code columnFor(a,x) == columnFor(b,y)} if and only if + * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} + * + * @param columnFamily + * The columnFamily for which to retrieve the column. + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowRunColumnPrefix} or null if both + * arguments don't match. + */ + public static final FlowRunColumnPrefix columnFor( + FlowRunColumnFamily columnFamily, String columnPrefix) { + + // TODO: needs unit test to confirm and need to update javadoc to explain + // null prefix case. + + for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) { + // Find a match based column family and on name. + if (frcp.columnFamily.equals(columnFamily) + && (((columnPrefix == null) && (frcp.getColumnPrefix() == null)) || (frcp + .getColumnPrefix().equals(columnPrefix)))) { + return frcp; + } + } + + // Default to null + return null; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java new file mode 100644 index 0000000..b2b423c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; + +/** + * Represents a rowkey for the flow run table. + */ +public class FlowRunRowKey { + // TODO: more methods are needed for this class like parse row key + + /** + * Constructs a row key for the entity table as follows: { + * clusterId!userI!flowId!Inverted Flow Run Id} + * + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @return byte array with the row key + */ + public static byte[] getRowKey(String clusterId, String userId, + String flowId, Long flowRunId) { + byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, + userId, flowId)); + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); + return Separator.QUALIFIERS.join(first, second); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java new file mode 100644 index 0000000..f395965 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.coprocessor.FlowRunCoprocessor; + +/** + * The flow run table has column family info + * Stores per flow run information + * aggregated across applications + * + * Example flow run table record: + * + *
+ * flow_run table
+ * |-------------------------------------------|
+ * |  Row key   | Column Family                |
+ * |            | info                         |
+ * |-------------------------------------------|
+ * | clusterId! | flow_version:version7        |
+ * | userName!  |                              |
+ * | flowId!    | running_apps:1               |
+ * | flowRunId  |                              |
+ * |            | min_start_time:1392995080000 |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            | min_start_time:1392995081012 |
+ * |            | #0:appId2                    |
+ * |            |                              |
+ * |            | min_start_time:1392993083210 |
+ * |            | #0:appId3                    |
+ * |            |                              |
+ * |            |                              |
+ * |            | max_end_time:1392993084018   |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            |                              |
+ * |            | m!mapInputRecords:127        |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            | m!mapInputRecords:31         |
+ * |            | #2:appId2                    |
+ * |            |                              |
+ * |            | m!mapInputRecords:37         |
+ * |            | #1:appId3                    |
+ * |            |                              |
+ * |            |                              |
+ * |            | m!mapOutputRecords:181       |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            | m!mapOutputRecords:37        |
+ * |            | #1:appId3                    |
+ * |            |                              |
+ * |            |                              |
+ * |-------------------------------------------|
+ * 
+ */ +public class FlowRunTable extends BaseTable { + /** entity prefix */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun"; + + /** config param name that specifies the flowrun table name */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** default value for flowrun table name */ + public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun"; + + private static final Log LOG = LogFactory.getLog(FlowRunTable.class); + + /** default max number of versions */ + public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE; + + public FlowRunTable() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable + * (org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor flowRunTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + flowRunTableDescp.addFamily(infoCF); + infoCF.setMinVersions(1); + infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS); + + // TODO: figure the split policy + flowRunTableDescp.addCoprocessor(FlowRunCoprocessor.class + .getCanonicalName()); + admin.createTable(flowRunTableDescp); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseStorageFlowRunFlowActivity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseStorageFlowRunFlowActivity.java new file mode 100644 index 0000000..12b7633 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseStorageFlowRunFlowActivity.java @@ -0,0 +1,654 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests the FlowRun and FlowActivity Tables + */ +public class TestHBaseStorageFlowRunFlowActivity { + + private static HBaseTestingUtility util; + private final String metric1 = "MAP_SLOT_MILLIS"; + private final String metric2 = "HDFS_BYTES_READ"; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + createSchema(); + } + + private static void createSchema() throws IOException { + TimelineSchemaCreator.createAllTables(util.getConfiguration()); + } + + /** + * Writes 4 timeline entities belonging to one flow run through the + * {@link HBaseTimelineWriterImpl} + * + * Checks the flow run table contents + * + * The first entity has a created event, metrics and a finish event. + * + * The second entity has a created event and this is the entity with smallest + * start time. This should be the start time for the flow run. + * + * The third entity has a finish event and this is the entity with the max end + * time. This should be the end time for the flow run. + * + * The fourth entity has a created event which has a start time that is + * greater than min start time. + * + * The test also checks in the flow activity table that one entry has been + * made for all of these 4 application entities since they belong to the same + * flow run. + */ + @Test + public void testWriteFlowRunMinMax() throws Exception { + + TimelineEntities te = new TimelineEntities(); + te.addEntity(getEntity1()); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + String cluster = "testWriteFlowRunMinMaxToHBase_cluster1"; + String user = "testWriteFlowRunMinMaxToHBase_user1"; + String flow = "testing_flowRun_flow_name"; + String flowVersion = "CF7022C10F1354"; + Long runid = 1002345678919L; + String appName = "application_100000000000_1111"; + long endTs = 1439750690000L; + TimelineEntity entityMinStartTime = getEntityMinStartTime(); + + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // write another entity with the right min start time + te = new TimelineEntities(); + te.addEntity(entityMinStartTime); + appName = "application_100000000000_3333"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // writer another entity for max end time + TimelineEntity entityMaxEndTime = getEntityMaxEndTime(endTs); + te = new TimelineEntities(); + te.addEntity(entityMaxEndTime); + appName = "application_100000000000_4444"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // writer another entity with greater start time + TimelineEntity entityGreaterStartTime = getEntityGreaterStartTime(); + te = new TimelineEntities(); + te.addEntity(entityGreaterStartTime); + appName = "application_1000000000000000_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // flush everything to hbase + hbi.flush(); + } finally { + hbi.close(); + } + + Connection conn = ConnectionFactory.createConnection(c1); + // check in flow run table + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + // scan the table and see that we get back the right min and max + // timestamps + byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + Get g = new Get(startRow); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()); + Result r1 = table1.get(g); + assertNotNull(r1); + assertTrue(!r1.isEmpty()); + Map values = r1.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + + assertEquals(2, r1.size()); + Long starttime = (Long) GenericObjectMapper.read(values + .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); + Long expmin = entityMinStartTime.getCreatedTime(); + assertEquals(expmin, starttime); + assertEquals(endTs, GenericObjectMapper.read(values + .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); + + // check in flow activity table + table1 = conn.getTable(TableName + .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); + g = new Get(startRow); + r1 = table1.get(g); + assertNotNull(r1); + assertTrue(!r1.isEmpty()); + values = r1.getFamilyMap(FlowActivityColumnFamily.INFO.getBytes()); + assertEquals(1, values.size()); + byte[] row = r1.getRow(); + FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey.parseRowKey(row); + assertNotNull(flowActivityRowKey); + assertEquals(cluster, flowActivityRowKey.getClusterId()); + assertEquals(user, flowActivityRowKey.getUserId()); + assertEquals(flow, flowActivityRowKey.getFlowId()); + long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + .currentTimeMillis()); + assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); + assertEquals(1, values.size()); + checkRunId(runid, flowVersion, values); + } + + boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user, + String flow, Long runid) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1); + assertTrue(rowKeyComponents.length == 4); + assertEquals(cluster, Bytes.toString(rowKeyComponents[0])); + assertEquals(user, Bytes.toString(rowKeyComponents[1])); + assertEquals(flow, Bytes.toString(rowKeyComponents[2])); + assertEquals(TimelineWriterUtils.invert(runid), + Bytes.toLong(rowKeyComponents[3])); + return true; + } + + /** + * Writes two application entities of the same flow run. Each application has + * two metrics: slot millis and hdfs bytes read. Each metric has values at two + * timestamps. + * + * Checks the metric values of the flow in the flow run table. Flow metric + * values should be the sum of individual metric values that belong to the + * latest timestamp for that metric + */ + @Test + public void testWriteFlowRunMetricsOneFlow() throws Exception { + String cluster = "testWriteFlowRunMetricsOneFlow_cluster1"; + String user = "testWriteFlowRunMetricsOneFlow_user1"; + String flow = "testing_flowRun_metrics_flow_name"; + String flowVersion = "CF7022C10F1354"; + Long runid = 1002345678919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = getEntityMetricsApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11111111111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = getEntityMetricsApp2(); + te.addEntity(entityApp2); + appName = "application_11111111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + + // check flow run + checkFlowRunTable(cluster, user, flow, runid, c1); + } + + /** + * Write 1 application entity and checks the record for today in the flow + * activity table + */ + @Test + public void testWriteFlowActivityOneFlow() throws Exception { + String cluster = "testWriteFlowActivityOneFlow_cluster1"; + String user = "testWriteFlowActivityOneFlow_user1"; + String flow = "flow_activity_test_flow_name"; + String flowVersion = "A122110F135BC4"; + Long runid = 1001111178919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = getFlowApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_1111999999_1234"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + // check flow activity + checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1); + } + + private void checkFlowActivityTable(String cluster, String user, String flow, + String flowVersion, Long runid, Configuration c1) throws IOException { + Scan s = new Scan(); + s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); + byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + Map values = result + .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes()); + rowCount++; + byte[] row = result.getRow(); + FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey + .parseRowKey(row); + assertNotNull(flowActivityRowKey); + assertEquals(cluster, flowActivityRowKey.getClusterId()); + assertEquals(user, flowActivityRowKey.getUserId()); + assertEquals(flow, flowActivityRowKey.getFlowId()); + long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + .currentTimeMillis()); + assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); + assertEquals(1, values.size()); + checkRunId(runid, flowVersion, values); + } + assertEquals(1, rowCount); + } + + private void checkFlowRunTable(String cluster, String user, String flow, + long runid, Configuration c1) throws IOException { + Scan s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + Map values = result.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + rowCount++; + // check metric1 + byte[] q = ColumnHelper.getColumnQualifier( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1); + assertTrue(values.containsKey(q)); + assertEquals(141, GenericObjectMapper.read(values.get(q))); + + // check metric2 + assertEquals(2, values.size()); + q = ColumnHelper.getColumnQualifier( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2); + assertTrue(values.containsKey(q)); + assertEquals(57, GenericObjectMapper.read(values.get(q))); + } + assertEquals(1, rowCount); + } + + private TimelineEntity getFlowApp1() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowActivity_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + Long expTs = 1436512802000L; + event.setTimestamp(expTs); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + + return entity; + } + + private TimelineEntity getEntityMetricsApp1() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map metricValues = new HashMap(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 100000, 2); + metricValues.put(ts - 80000, 40); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + TimelineMetric m2 = new TimelineMetric(); + m2.setId(metric2); + metricValues = new HashMap(); + ts = System.currentTimeMillis(); + metricValues.put(ts - 100000, 31); + metricValues.put(ts - 80000, 57); + m2.setType(Type.TIME_SERIES); + m2.setValues(metricValues); + metrics.add(m2); + + entity.addMetrics(metrics); + return entity; + } + + private TimelineEntity getEntityMetricsApp2() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map metricValues = new HashMap(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 100000, 5L); + metricValues.put(ts - 80000, 101L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + return entity; + } + + /** + * Writes 3 applications each with a different run id and version for the same + * {cluster, user, flow} + * + * They should be getting inserted into one record in the flow activity table + * with 3 columns, one per run id + */ + @Test + public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException { + String cluster = "testManyRunsFlowActivity_cluster1"; + String user = "testManyRunsFlowActivity_c_user1"; + String flow = "flow_activity_test_flow_name"; + String flowVersion1 = "A122110F135BC4"; + Long runid1 = 11111111111L; + + String flowVersion2 = "A12222222222C4"; + long runid2 = 2222222222222L; + + String flowVersion3 = "A1333333333C4"; + long runid3 = 3333333333333L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = getFlowApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11888888888_1111"; + hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te); + + // write an application with to this flow but a different runid/ version + te = new TimelineEntities(); + te.addEntity(entityApp1); + appName = "application_11888888888_2222"; + hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te); + + // write an application with to this flow but a different runid/ version + te = new TimelineEntities(); + te.addEntity(entityApp1); + appName = "application_11888888888_3333"; + hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te); + + hbi.flush(); + } finally { + hbi.close(); + } + // check flow activity + checkFlowActivityTableSeveralRuns(cluster, user, flow, c1, flowVersion1, + runid1, flowVersion2, runid2, flowVersion3, runid3); + + } + + private void checkFlowActivityTableSeveralRuns(String cluster, String user, + String flow, Configuration c1, String flowVersion1, Long runid1, + String flowVersion2, Long runid2, String flowVersion3, Long runid3) + throws IOException { + Scan s = new Scan(); + s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); + byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + byte[] row = result.getRow(); + FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey + .parseRowKey(row); + assertNotNull(flowActivityRowKey); + assertEquals(cluster, flowActivityRowKey.getClusterId()); + assertEquals(user, flowActivityRowKey.getUserId()); + assertEquals(flow, flowActivityRowKey.getFlowId()); + long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + .currentTimeMillis()); + assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); + + Map values = result + .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes()); + rowCount++; + assertEquals(3, values.size()); + checkRunId(runid1, flowVersion1, values); + checkRunId(runid2, flowVersion2, values); + checkRunId(runid3, flowVersion3, values); + } + // the flow activity table is such that it will insert + // into current day's record + // hence, if this test runs across the midnight boundary, + // it may fail since it would insert into two records + // one for each day + assertEquals(1, rowCount); + } + + private void checkRunId(Long runid, String flowVersion, + Map values) throws IOException { + byte[] rq = ColumnHelper.getColumnQualifier( + FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(), + GenericObjectMapper.write(runid)); + for (Map.Entry k : values.entrySet()) { + String actualQ = Bytes.toString(k.getKey()); + if (Bytes.toString(rq).equals(actualQ)) { + String actualV = (String) GenericObjectMapper.read(k.getValue()); + assertEquals(flowVersion, actualV); + } + } + } + + private TimelineEntity getEntity1() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHello"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 20000000000000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map metricValues = new HashMap(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 120000, 100000000); + metricValues.put(ts - 100000, 200000000); + metricValues.put(ts - 80000, 300000000); + metricValues.put(ts - 60000, 400000000); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + Long expTs = 1436512802000L; + event.setTimestamp(expTs); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + + event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(1436512801000L); + event.addInfo(expKey, expVal); + entity.addEvent(event); + + return entity; + } + + private TimelineEntity getEntityGreaterStartTime() { + TimelineEntity entity = new TimelineEntity(); + entity.setCreatedTime(30000000000000L); + entity.setId("flowRunHello with greater start time"); + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setType(type); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + long endTs = 1439379885000L; + event.setTimestamp(endTs); + String expKey = "foo_event_greater"; + String expVal = "test_app_greater"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + return entity; + } + + private TimelineEntity getEntityMaxEndTime(long endTs) { + TimelineEntity entity = new TimelineEntity(); + entity.setId("flowRunHello Max End time"); + entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(endTs); + String expKey = "foo_even_max_ finished"; + String expVal = "test_app_max_finished"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + return entity; + } + + private TimelineEntity getEntityMinStartTime() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHelloMInStartTime"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 10000000000000L; + entity.setCreatedTime(cTime); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(System.currentTimeMillis()); + entity.addEvent(event); + return entity; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 2875e01..bfd6b56 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; @@ -88,20 +87,15 @@ public static void setupBeforeClass() throws Exception { } private static void createSchema() throws IOException { - new EntityTable() - .createTable(util.getHBaseAdmin(), util.getConfiguration()); - new AppToFlowTable() - .createTable(util.getHBaseAdmin(), util.getConfiguration()); - new ApplicationTable() - .createTable(util.getHBaseAdmin(), util.getConfiguration()); + TimelineSchemaCreator.createAllTables(util.getConfiguration()); } @Test public void testWriteApplicationToHBase() throws Exception { TimelineEntities te = new TimelineEntities(); ApplicationEntity entity = new ApplicationEntity(); - String id = "hello"; - entity.setId(id); + String appId = "application_1234555678_1023"; + entity.setId(appId); long cTime = 1425016501000L; long mTime = 1425026901000L; entity.setCreatedTime(cTime); @@ -173,12 +167,12 @@ public void testWriteApplicationToHBase() throws Exception { String flow = "some_flow_name"; String flowVersion = "AB7822C10F1111"; long runid = 1002345678919L; - hbi.write(cluster, user, flow, flowVersion, runid, id, te); + hbi.write(cluster, user, flow, flowVersion, runid, appId, te); hbi.stop(); // retrieve the row byte[] rowKey = - ApplicationRowKey.getRowKey(cluster, user, flow, runid, id); + ApplicationRowKey.getRowKey(cluster, user, flow, runid, appId); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); Connection conn = ConnectionFactory.createConnection(c1); @@ -190,11 +184,11 @@ public void testWriteApplicationToHBase() throws Exception { // check the row key byte[] row1 = result.getRow(); assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid, - id)); + appId)); // check info column family String id1 = ApplicationColumn.ID.readResult(result).toString(); - assertEquals(id, id1); + assertEquals(appId, id1); Number val = (Number) ApplicationColumn.CREATED_TIME.readResult(result); @@ -252,17 +246,17 @@ public void testWriteApplicationToHBase() throws Exception { assertEquals(metricValues, metricMap); // read the timeline entity using the reader this time - TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, id, + TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId, entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); Set es1 = hbr.getEntities(user, cluster, flow, runid, - id, entity.getType(), null, null, null, null, null, null, null, + appId, entity.getType(), null, null, null, null, null, null, null, null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertEquals(1, es1.size()); // verify attributes - assertEquals(id, e1.getId()); + assertEquals(appId, e1.getId()); assertEquals(TimelineEntityType.YARN_APPLICATION.toString(), e1.getType()); assertEquals(cTime, e1.getCreatedTime()); @@ -576,13 +570,13 @@ public void testEvents() throws IOException { String flow = "other_flow_name"; String flowVersion = "1111F01C2287BA"; long runid = 1009876543218L; - String appName = "some app name"; - hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); + String appId = "application_12366668822_9917"; + hbi.write(cluster, user, flow, flowVersion, runid, appId, entities); hbi.stop(); // retrieve the row byte[] rowKey = - ApplicationRowKey.getRowKey(cluster, user, flow, runid, appName); + ApplicationRowKey.getRowKey(cluster, user, flow, runid, appId); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); Connection conn = ConnectionFactory.createConnection(c1); @@ -593,7 +587,7 @@ public void testEvents() throws IOException { // check the row key byte[] row1 = result.getRow(); assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, runid, - appName)); + appId)); Map eventsResult = ApplicationColumnPrefix.EVENT. @@ -614,17 +608,17 @@ public void testEvents() throws IOException { } // read the timeline entity using the reader this time - TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName, + TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId, entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); - TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName, + TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appId, entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL)); Set es1 = hbr.getEntities(user, cluster, flow, runid, - appName, entity.getType(), null, null, null, null, null, null, null, + appId, entity.getType(), null, null, null, null, null, null, null, null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); Set es2 = hbr.getEntities(user, cluster, null, null, - appName, entity.getType(), null, null, null, null, null, null, null, + appId, entity.getType(), null, null, null, null, null, null, null, null, null, null, null, EnumSet.of(TimelineReader.Field.ALL)); assertNotNull(e1); assertNotNull(e2);