diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 772002d..28fc7ad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -33,11 +33,10 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; @@ -53,10 +52,21 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperations; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; /** - * This implements a hbase based backend for storing application timeline entity + * This implements a hbase based backend for storing the timeline entity * information. + * It writes to multiple tables at the backend */ @InterfaceAudience.Private @InterfaceStability.Unstable @@ -67,6 +77,8 @@ private TypedBufferedMutator entityTable; private TypedBufferedMutator appToFlowTable; private TypedBufferedMutator applicationTable; + private TypedBufferedMutator flowActivityTable; + private TypedBufferedMutator flowRunTable; private static final Log LOG = LogFactory .getLog(HBaseTimelineWriterImpl.class); @@ -91,6 +103,8 @@ protected void serviceInit(Configuration conf) throws Exception { entityTable = new EntityTable().getTableMutator(hbaseConf, conn); appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn); applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn); + flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn); + flowActivityTable = new FlowActivityTable().getTableMutator(hbaseConf, conn); } /** @@ -111,7 +125,7 @@ public TimelineWriteResponse write(String clusterId, String userId, // if the entity is the application, the destination is the application // table - boolean isApplication = isApplicationEntity(te); + boolean isApplication = TimelineWriterUtils.isApplicationEntity(te); byte[] rowKey = isApplication ? ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId) : @@ -124,37 +138,142 @@ public TimelineWriteResponse write(String clusterId, String userId, storeMetrics(rowKey, te.getMetrics(), isApplication); storeRelations(rowKey, te, isApplication); - if (isApplicationCreated(te)) { - onApplicationCreated( - clusterId, userId, flowName, flowVersion, flowRunId, appId, te); + if (TimelineWriterUtils.isApplicationCreated(te)) { + onApplicationCreated(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); + } + if (isApplication) { + // if it's an application entity, store metrics + storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId, + appId, te); + // if application has finished, store it's finish time and write final + // values + // of all metrics + if (TimelineWriterUtils.isApplicationFinished(te)) { + onApplicationFinished(clusterId, userId, flowName, flowVersion, flowRunId, appId, + te); + } } } return putStatus; } - private static boolean isApplicationEntity(TimelineEntity te) { - return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); + private void onApplicationCreated(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntity te) throws IOException { + // store in App to flow table + storeInAppToFlowTable(clusterId, userId, flowName, flowVersion, flowRunId, + appId, te); + // store in flow run table + storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); + // store in flow activity table + storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); } - private static boolean isApplicationCreated(TimelineEntity te) { - if (isApplicationEntity(te)) { - for (TimelineEvent event : te.getEvents()) { - if (event.getId().equals( - ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { - return true; - } - } - } - return false; + /* + * updates the {@link FlowActivityTable} with the Application TimelineEntity + * information + */ + private void storeInFlowActivityTable(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntity te) throws IOException { + byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, userId, flowName); + byte[] qualifier = GenericObjectMapper.write(flowRunId); + FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, + null, flowVersion, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); } - private void onApplicationCreated(String clusterId, String userId, + /* + * updates the {@link FlowRunTable} with Application Created information + */ + private void storeAppCreatedInFlowRunTable(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntity te) throws IOException { + byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, + flowRunId); + FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null, + te.getCreatedTime(), + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + + private void storeInAppToFlowTable(String clusterId, String userId, String flowName, String flowVersion, long flowRunId, String appId, TimelineEntity te) throws IOException { byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); - AppToFlowColumn.FLOW_RUN_ID.store( - rowKey, appToFlowTable, null, flowRunId); + AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId); + } + + /* + * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an + * application has finished + */ + private void onApplicationFinished(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntity te) throws IOException { + // store in flow run table + storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId, + appId, te); + + // indicate in the flow activity table that the app has finished + storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, + flowRunId, appId, te); + } + + /* + * Update the {@link FlowRunTable} with Application Finished information + */ + private void storeAppFinishedInFlowRunTable(String clusterId, String userId, + String flowName, long flowRunId, String appId, TimelineEntity te) + throws IOException { + byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, + flowRunId); + Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID + .getAttribute(appId); + FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null, + TimelineWriterUtils.getApplicationFinishedTime(te), attributeAppId); + + // store the final value of metrics since application has finished + Set metrics = te.getMetrics(); + if (metrics != null) { + storeFlowMetrics(rowKey, metrics, attributeAppId, + AggregationOperations.SUM_FINAL.getAttribute()); + } + } + + /* + * Updates the {@link FlowRunTable} with Application Metrics + */ + private void storeFlowMetricsAppRunning(String clusterId, String userId, + String flowName, long flowRunId, String appId, TimelineEntity te) + throws IOException { + Set metrics = te.getMetrics(); + if (metrics != null) { + byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, + flowRunId); + storeFlowMetrics(rowKey, metrics, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); + } + } + + private void storeFlowMetrics(byte[] rowKey, Set metrics, + Attribute... attributes) throws IOException { + if (metrics == null) { + return; + } + for (TimelineMetric metric : metrics) { + String metricColumnQualifier = metric.getId(); + Map timeseries = metric.getValues(); + for (Map.Entry timeseriesEntry : timeseries.entrySet()) { + Long timestamp = timeseriesEntry.getKey(); + FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable, + metricColumnQualifier, timestamp, timeseriesEntry.getValue(), + attributes); + } + } } private void storeRelations(byte[] rowKey, TimelineEntity te, @@ -184,7 +303,6 @@ private void storeRelations(byte[] rowKey, TimelineEntity te, // id3?id4?id5 String compoundValue = Separator.VALUES.joinEncoded(connectedEntity.getValue()); - columnPrefix.store(rowKey, table, connectedEntity.getKey(), null, compoundValue); } @@ -342,6 +460,8 @@ public void flush() throws IOException { entityTable.flush(); appToFlowTable.flush(); applicationTable.flush(); + flowRunTable.flush(); + flowActivityTable.flush(); } /** @@ -364,6 +484,16 @@ protected void serviceStop() throws Exception { LOG.info("closing the application table"); applicationTable.close(); } + if (flowRunTable != null) { + LOG.info("closing the flow run table"); + // The close API performs flushing and releases any resources held + flowRunTable.close(); + } + if (flowActivityTable != null) { + LOG.info("closing the flowActivityTable table"); + // The close API performs flushing and releases any resources held + flowActivityTable.close(); + } if (conn != null) { LOG.info("closing the hbase Connection"); conn.close(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java index 5120856..9ba9f0c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; /** * This creates the schema for a hbase based backend for storing application @@ -172,7 +174,7 @@ private static CommandLine parseArgs(String[] args) throws ParseException { return commandLine; } - private static void createAllTables(Configuration hbaseConf) + static void createAllTables(Configuration hbaseConf) throws IOException { Connection conn = null; @@ -185,6 +187,8 @@ private static void createAllTables(Configuration hbaseConf) new EntityTable().createTable(admin, hbaseConf); new AppToFlowTable().createTable(admin, hbaseConf); new ApplicationTable().createTable(admin, hbaseConf); + new FlowRunTable().createTable(admin, hbaseConf); + new FlowActivityTable().createTable(admin, hbaseConf); } finally { if (conn != null) { conn.close(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java index c028386..802626d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Identifies fully qualified columns for the {@link ApplicationTable}. @@ -76,9 +77,9 @@ private String getColumnQualifier() { public void store(byte[] rowKey, TypedBufferedMutator tableMutator, Long timestamp, - Object inputValue) throws IOException { + Object inputValue, Attribute... attributes) throws IOException { column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue); + inputValue, attributes); } public Object readResult(Result result) throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index ad1def6..d7b5773 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Identifies partially qualified columns for the application table. @@ -112,7 +113,8 @@ private String getColumnPrefix() { */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, byte[] qualifier, - Long timestamp, Object inputValue) throws IOException { + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { // Null check if (qualifier == null) { @@ -123,8 +125,9 @@ public void store(byte[] rowKey, byte[] columnQualifier = ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); - } + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); + } /* * (non-Javadoc) @@ -137,7 +140,8 @@ public void store(byte[] rowKey, */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, String qualifier, - Long timestamp, Object inputValue) throws IOException { + Long timestamp, Object inputValue, Attribute...attributes) + throws IOException { // Null check if (qualifier == null) { @@ -148,7 +152,8 @@ public void store(byte[] rowKey, byte[] columnQualifier = ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); } /* diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java index 423037a..859fdca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java @@ -25,8 +25,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; import java.io.IOException; +import java.util.Map; /** * Identifies fully qualified columns for the {@link AppToFlowTable}. @@ -67,9 +69,9 @@ private String getColumnQualifier() { public void store(byte[] rowKey, TypedBufferedMutator tableMutator, Long timestamp, - Object inputValue) throws IOException { + Object inputValue, Attribute... attributes) throws IOException { column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue); + inputValue, attributes); } public Object readResult(Result result) throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java index 3397d62..8f68e21 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/Column.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * A Column represents the way to store a fully qualified column in a specific @@ -38,12 +39,15 @@ * column. * @param timestamp version timestamp. When null the server timestamp will be * used. + * @param attributes Map of attributes for this mutation. used in the coprocessor + * to set/read the cell tags * @param inputValue the value to write to the rowKey and column qualifier. * Nothing gets written when null. * @throws IOException */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, - Long timestamp, Object inputValue) throws IOException; + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException; /** * Get the latest version of this specified column. Note: this call clones the diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java index f1b7c58..9f218fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnHelper.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; - +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * This class is meant to be used only by explicit Columns, and not directly to * write by clients. @@ -57,30 +57,36 @@ public ColumnHelper(ColumnFamily columnFamily) { /** * Sends a Mutation to the table. The mutations will be buffered and sent over * the wire as part of a batch. - * - * @param rowKey identifying the row to write. Nothing gets written when null. - * @param tableMutator used to modify the underlying HBase table - * @param columnQualifier column qualifier. Nothing gets written when null. - * @param timestamp version timestamp. When null the server timestamp will be - * used. - * @param inputValue the value to write to the rowKey and column qualifier. - * Nothing gets written when null. + * + * @param rowKey + * identifying the row to write. Nothing gets written when null. + * @param tableMutator + * used to modify the underlying HBase table + * @param columnQualifier + * column qualifier. Nothing gets written when null. + * @param timestamp + * version timestamp. When null the server timestamp will be used. + * @param inputValue + * the value to write to the rowKey and column qualifier. Nothing + * gets written when null. * @throws IOException */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, - byte[] columnQualifier, Long timestamp, Object inputValue) - throws IOException { + byte[] columnQualifier, Long timestamp, Object inputValue, + Attribute... attributes) throws IOException { if ((rowKey == null) || (columnQualifier == null) || (inputValue == null)) { return; } Put p = new Put(rowKey); - if (timestamp == null) { - p.addColumn(columnFamilyBytes, columnQualifier, - GenericObjectMapper.write(inputValue)); - } else { - p.addColumn(columnFamilyBytes, columnQualifier, timestamp, - GenericObjectMapper.write(inputValue)); + timestamp = System.currentTimeMillis(); + } + p.addColumn(columnFamilyBytes, columnQualifier, timestamp, + GenericObjectMapper.write(inputValue)); + if ((attributes != null) && (attributes.length > 0)) { + for (Attribute attribute : attributes) { + p.setAttribute(attribute.getName(), attribute.getValue()); + } } tableMutator.mutate(p); } @@ -315,6 +321,27 @@ public Object readResult(Result result, byte[] columnQualifierBytes) /** * @param columnPrefixBytes The byte representation for the column prefix. * Should not contain {@link Separator#QUALIFIERS}. + * @param qualifier for the remainder of the column. + * @return fully sanitized column qualifier that is a combination of prefix + * and qualifier. If prefix is null, the result is simply the encoded + * qualifier without any separator. + */ + public static byte[] getColumnQualifier(byte[] columnPrefixBytes, + long qualifier) { + + if (columnPrefixBytes == null) { + return Bytes.toBytes(qualifier); + } + + // Convert qualifier to lower case, strip of separators and tag on column + // prefix. + byte[] columnQualifier = + Separator.QUALIFIERS.join(columnPrefixBytes, Bytes.toBytes(qualifier)); + return columnQualifier; + } + /** + * @param columnPrefixBytes The byte representation for the column prefix. + * Should not contain {@link Separator#QUALIFIERS}. * @param qualifier the byte representation for the remainder of the column. * @return fully sanitized column qualifier that is a combination of prefix * and qualifier. If prefix is null, the result is simply the encoded diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java index 509ff49..db49098 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Used to represent a partially qualified column, where the actual column name @@ -43,12 +44,36 @@ * @param qualifier column qualifier. Nothing gets written when null. * @param timestamp version timestamp. When null the server timestamp will be * used. + *@param attributes attributes for the mutation that are used by the coprocessor + * to set/read the cell tags * @param inputValue the value to write to the rowKey and column qualifier. * Nothing gets written when null. * @throws IOException */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, - String qualifier, Long timestamp, Object inputValue) throws IOException; + byte[] qualifier, Long timestamp, Object inputValue, + Attribute... attributes) throws IOException; + + /** + * Sends a Mutation to the table. The mutations will be buffered and sent over + * the wire as part of a batch. + * + * @param rowKey identifying the row to write. Nothing gets written when null. + * @param tableMutator used to modify the underlying HBase table. Caller is + * responsible to pass a mutator for the table that actually has this + * column. + * @param qualifier column qualifier. Nothing gets written when null. + * @param timestamp version timestamp. When null the server timestamp will be + * used. + *@param attributes attributes for the mutation that are used by the coprocessor + * to set/read the cell tags + * @param inputValue the value to write to the rowKey and column qualifier. + * Nothing gets written when null. + * @throws IOException + */ + public void store(byte[] rowKey, TypedBufferedMutator tableMutator, + String qualifier, Long timestamp, Object inputValue, + Attribute... attributes) throws IOException; /** * Get the latest version of this specified column. Note: this call clones the @@ -81,4 +106,5 @@ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, */ public NavigableMap> readResultsWithTimestamps(Result result) throws IOException; + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java index 58bdedc7e..b0e5720 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineWriterUtils.java @@ -19,9 +19,16 @@ import java.util.ArrayList; import java.util.List; +import java.util.SortedSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperations; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * bunch of utility functions used across TimelineWriter classes @@ -36,6 +43,9 @@ /** indicator for no limits for splitting */ public static final int NO_LIMIT_SPLIT = -1; + /** milliseconds in one day */ + public static final long MILLIS_ONE_DAY = 86400000L; + /** * Splits the source array into multiple array segments using the given * separator, up to a maximum of count items. This will naturally produce @@ -140,4 +150,131 @@ public static long invert(Long key) { return Long.MAX_VALUE - key; } + /** + * returns the timestamp of that day's start (which is midnight 00:00:00 AM) + * for a given input timestamp + * + * @param ts + * @return timestamp of that day's beginning (midnight) + */ + public static long getTopOfTheDayTimestamp(long ts) { + long dayTimestamp = ts - (ts % MILLIS_ONE_DAY); + return dayTimestamp; + } + + /** + * this method combines the input array of attributes and the input + * aggregation operation into a new array of attributes + * + * @param attributes + * @param aggOp + * @return array of combined attributes + */ + public static Attribute[] combineAttributes(Attribute[] attributes, + AggregationOperations aggOp) { + int newLength = getNewLengthCombinedAttributes(attributes, aggOp); + Attribute[] combinedAttributes = new Attribute[newLength]; + + if (attributes != null) { + System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length); + } + + if (aggOp != null) { + Attribute a2 = aggOp.getAttribute(); + combinedAttributes[newLength - 1] = a2; + } + return combinedAttributes; + } + + /** + * returns a number for the new array size. The new array is the combination + * of input array of attributes and the input aggregation operation + * + * @param attributes + * @param aggOp + * @return + */ + private static int getNewLengthCombinedAttributes(Attribute[] attributes, + AggregationOperations aggOp) { + int oldLength = getAttributesLength(attributes); + int aggLength = getAppOpLength(aggOp); + return oldLength + aggLength; + } + + private static int getAppOpLength(AggregationOperations aggOp) { + if (aggOp != null) { + return 1; + } + return 0; + } + + private static int getAttributesLength(Attribute[] attributes) { + if (attributes != null) { + return attributes.length; + } + return 0; + } + + /** + * checks if an application has finished + * + * @param te + * @return true if application has finished else false + */ + public static boolean isApplicationFinished(TimelineEntity te) { + SortedSet allEvents = te.getEvents(); + if ((allEvents != null) && (allEvents.size() > 0)) { + TimelineEvent event = allEvents.last(); + if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { + return true; + } + } + return false; + } + + /** + * get the time at which an app finsihed + * + * @param te + * @return true if application has finished else false + */ + public static long getApplicationFinishedTime(TimelineEntity te) { + SortedSet allEvents = te.getEvents(); + if ((allEvents != null) && (allEvents.size() > 0)) { + TimelineEvent event = allEvents.last(); + if (event.getId().equals(ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { + return event.getTimestamp(); + } + } + return 0l; + } + + /** + * checks if the input TimelineEntity object is an ApplicationEntity + * + * @param te + * @return true if input is an ApplicationEntity, false otherwise + */ + public static boolean isApplicationEntity(TimelineEntity te) { + return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); + } + + /** + * checks for the APPLICATION_CREATED event + * + * @param te + * @return true is application event exists, false otherwise + */ + public static boolean isApplicationCreated(TimelineEntity te) { + if (isApplicationEntity(te)) { + for (TimelineEvent event : te.getEvents()) { + if (event.getId() + .equals(ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { + return true; + } + } + } + return false; + } + } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/FlowRunCoprocessor.java new file mode 100644 index 0000000..d4ed5b4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/FlowRunCoprocessor.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperations; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; + +public class FlowRunCoprocessor extends BaseRegionObserver { + private HRegion region; + + private TimestampGenerator timestampGenerator = new TimestampGenerator(); + + private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); + + @Override + public void start(CoprocessorEnvironment e) throws IOException { + if (e instanceof RegionCoprocessorEnvironment) { + RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; + this.region = env.getRegion(); + } + } + + /** + * checks if the input attribute is part of the cell tag list + * + * @param tags + * @param aggOp + * @return true if the attribute is part of the cell tag list, false otherwise + */ + static boolean tagsListContains(List tags, AggregationOperations aggOp) { + boolean exists = false; + for (Tag tag : tags) { + if (tag.getType() == aggOp.getTagType()) { + exists = true; + } + } + return exists; + } + + /* + * (non-Javadoc) + * + * This method adds the tags onto the cells in the Put. It is presumed that + * all the cells in one Put have the same set of Tags. The existing cell + * timestamp is overwritten for non-metric cells and each such cell gets a new + * unique timestamp generated by {@link TimestampGenerator} + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache + * .hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Put, + * org.apache.hadoop.hbase.regionserver.wal.WALEdit, + * org.apache.hadoop.hbase.client.Durability) + */ + @Override + public void prePut(ObserverContext e, Put put, + WALEdit edit, Durability durability) throws IOException { + Map attributes = put.getAttributesMap(); + + // Assumption is that all the cells in a put are the same operation. + List tags = new ArrayList(); + if ((attributes != null) && (attributes.size() > 0)) { + for (Map.Entry attribute : attributes.entrySet()) { + Tag t = getTagFromAttribute(attribute); + tags.add(t); + } + byte[] tagByteArray = Tag.fromList(tags); + NavigableMap> newFamilyMap = new TreeMap<>( + Bytes.BYTES_COMPARATOR); + for (Map.Entry> entry : put.getFamilyCellMap() + .entrySet()) { + List newCells = new ArrayList<>(entry.getValue().size()); + for (Cell cell : entry.getValue()) { + // for each cell in the put add the tags + // Assumption is that all the cells in + // one put are the same operation + // also, get a unique cell timestamp for non-metric cells + // this way we don't inadvertently overwrite cell versions + long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags); + newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell), + CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), + cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell), + tagByteArray)); + } + newFamilyMap.put(entry.getKey(), newCells); + } // for each entry + // Update the family map for the Put + put.setFamilyCellMap(newFamilyMap); + } + } + + /** + * this method determines if the current cell's timestamp is to be used or a + * new unique cell timestamp is to be used.The reason this is done is to + * inadvertently overwrite cells when writes come in very fast. But for metric + * cells, the cell timestamp signifies the metric timestamp. Hence we don't + * want to overwrite it + * + * @param timestamp + * @param tags + * @return cell timestamp + */ + private long getCellTimestamp(long timestamp, List tags) { + if ((tagsListContains(tags, AggregationOperations.SUM)) + || (tagsListContains(tags, AggregationOperations.SUM_FINAL))) { + return timestamp; + } + return timestampGenerator.getUniqueTimestamp(); + } + + /** + * creates a {@link Tag} from the input attribute + * + * @param attribute + * @return Tag + */ + private Tag getTagFromAttribute(Entry attribute) { + // attribute could be either an Aggregation Operation or + // an Aggregation Dimension + // Get the Tag type from either + try { + AggregationOperations aggOp = AggregationOperations.valueOf(attribute + .getKey()); + Tag t = new Tag(aggOp.getTagType(), attribute.getValue()); + return t; + } catch (IllegalArgumentException iae) { + // ignore exception, this may be part of another enum + } + try { + AggregationCompactionDimension aggCompactDim = AggregationCompactionDimension + .valueOf(attribute.getKey()); + Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue()); + return t; + } catch (IllegalArgumentException iae) { + // ignore exception, this may be part of another enum + } + return null; + } + + /* + * (non-Javadoc) + * + * creates a {@link FlowScanner} Scan so that it can correctly process the + * contents of {@link FlowRunTable} + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache + * .hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Get, java.util.List) + */ + @Override + public void preGetOp(ObserverContext e, + Get get, List results) throws IOException { + Scan scan = new Scan(get); + scan.setMaxVersions(); + RegionScanner scanner = null; + try { + scanner = new FlowScanner(region, scan.getBatch(), + region.getScanner(scan)); + scanner.next(results); + e.bypass(); + } finally { + if (scanner != null) { + scanner.close(); + } + } + } + + /* + * (non-Javadoc) + * + * ensures that max versions are set for the Scan so that metrics can be + * correctly aggregated and min/max can be correctly determined + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org + * .apache.hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Scan, + * org.apache.hadoop.hbase.regionserver.RegionScanner) + */ + @Override + public RegionScanner preScannerOpen( + ObserverContext e, Scan scan, + RegionScanner s) throws IOException { + // set max versions for scan to see all + // versions to aggregate for metrics + scan.setMaxVersions(); + return s; + } + + /* + * (non-Javadoc) + * + * creates a {@link FlowScanner} Scan so that it can correctly process the + * contents of {@link FlowRunTable} + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen( + * org.apache.hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Scan, + * org.apache.hadoop.hbase.regionserver.RegionScanner) + */ + @Override + public RegionScanner postScannerOpen( + ObserverContext e, Scan scan, + RegionScanner scanner) throws IOException { + return new FlowScanner(region, scan.getBatch(), scanner); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/FlowScanner.java new file mode 100644 index 0000000..9563923 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/FlowScanner.java @@ -0,0 +1,405 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.coprocessor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperations; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + * Invoked via the coprocessor when a Get or a Scan is issued for flow run table + * Looks through the list of cells per row, checks their tags and does operation + * on those cells as per the cell tags Transforms reads of the stored metrics + * into calculated sums for each column Also, finds the min and max for start + * and end times in a flow run + */ +class FlowScanner implements RegionScanner, Closeable { + + private final HRegion region; + private final InternalScanner flowRunScanner; + private RegionScanner regionScanner; + private final int limit; + private boolean hasMore; + private byte[] currentRow; + private List availableCells = new ArrayList<>(); + private int currentIndex; + + private static final Log LOG = LogFactory.getLog(FlowScanner.class); + + FlowScanner(HRegion region, int limit, InternalScanner internalScanner) { + this.region = region; + this.limit = limit; + this.flowRunScanner = internalScanner; + if (internalScanner instanceof RegionScanner) { + this.regionScanner = (RegionScanner) internalScanner; + } + // TODO: note if it's compaction/flush + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo() + */ + @Override + public HRegionInfo getRegionInfo() { + return region.getRegionInfo(); + } + + @Override + public boolean nextRaw(List cells) throws IOException { + return nextRaw(cells, limit); + } + + @Override + public boolean nextRaw(List cells, int limit) throws IOException { + return nextInternal(cells, limit); + } + + @Override + public boolean next(List cells) throws IOException { + return next(cells, limit); + } + + @Override + public boolean next(List cells, int limit) throws IOException { + return nextInternal(cells, limit); + } + + private String getAggregationCompactionDimension(List tags) { + String appId = null; + for (Tag t : tags) { + if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t + .getType()) { + appId = Bytes.toString(t.getValue()); + } + } + return appId; + } + + /** + * This method loops through the cells in a given row of the + * {@link FlowRunTable}. It looks at the tags of each cell to figure out how + * to process the contents. It then calculates the sum or min or max for each + * column + * + * @param cells + * @param limit + * @return + * @throws IOException + */ + private boolean nextInternal(List cells, int limit) throws IOException { + Map runningSum = new HashMap(); + Cell cell = null; + + Map metricCell = new HashMap(); + Set> seenAppsForMetrics = new HashSet>(); + + // cells must be added in the right key-value order for them to be fetched + // correctly + Set sortedCells = new TreeSet<>(KeyValue.COMPARATOR); + Cell currentMinCell = null; + Cell currentMaxCell = null; + + startNext(); + // loop through all the cells in this row + // for min/max/metrics we do need to scan the entire set of cells to get + // the right one + // but with flush/compaction, the number of cells being scanned will go + // down + while ((cell = peekAtNextCell(limit)) != null) { + List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + cell.getTagsLength()); + + if (FlowRunCoprocessor.tagsListContains(tags, AggregationOperations.MIN)) { + currentMinCell = findMinCell(currentMinCell, cell); + } else if (FlowRunCoprocessor.tagsListContains(tags, + AggregationOperations.MAX)) { + currentMaxCell = findMaxCell(currentMaxCell, cell); + } else if (FlowRunCoprocessor.tagsListContains(tags, + AggregationOperations.SUM) + || (FlowRunCoprocessor.tagsListContains(tags, + AggregationOperations.SUM_FINAL))) { + processMetricCell(cell, seenAppsForMetrics, runningSum, metricCell, + getAggregationCompactionDimension(tags)); + } else { + // not a min/max/metric cell, so just return it as is + sortedCells.add(cell); + } + nextCell(limit); + } + + if (runningSum.size() > 0) { + for (Map.Entry newCellSum : runningSum.entrySet()) { + // create a new cell that represents the flow metric + Cell c = createNewCell(metricCell.get(newCellSum.getKey()), + newCellSum.getValue()); + sortedCells.add(c); + } + } + if (currentMinCell != null) { + sortedCells.add(currentMinCell); + } + if (currentMaxCell != null) { + sortedCells.add(currentMaxCell); + } + // add the cells in the right order + cells.addAll(sortedCells); + return hasMore(); + } + + private void processMetricCell(Cell cell, + Set> seenAppsForMetrics, + Map runningSum, Map metricCell, String appId) + throws IOException { + String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)); + Pair pair = new Pair(qualifier, appId); + if (!seenAppsForMetrics.contains(pair)) { + Number current = (Number) GenericObjectMapper.read(CellUtil + .cloneValue(cell)); + Number newSum = addToRunningSum(runningSum, qualifier, current); + runningSum.put(qualifier, newSum); + seenAppsForMetrics.add(new Pair(qualifier, appId)); + metricCell.put(qualifier, cell); + } + } + + private Cell findMinCell(Cell currentMinCell, Cell cell) throws IOException { + if (currentMinCell == null) { + return cell; + } + try { + long currentMinValue = (Long) GenericObjectMapper.read(CellUtil + .cloneValue(currentMinCell)); + long currentCellValue = (Long) GenericObjectMapper.read(CellUtil + .cloneValue(cell)); + if (currentCellValue < currentMinValue) { + // new value is minimum, hence return this cell + return cell; + } else { + // exising min value is miniumum, hence return existing min cell + return currentMinCell; + } + } catch (IllegalArgumentException iae) { + LOG.error("caught iae during conversion to long ", iae); + return cell; + } + } + + private Cell findMaxCell(Cell currentMaxCell, Cell cell) throws IOException { + + if (currentMaxCell == null) { + return cell; + } + try { + long currentMaxCellValue = (Long) GenericObjectMapper.read((CellUtil + .cloneValue(currentMaxCell))); + long currentCellValue = (Long) GenericObjectMapper.read((CellUtil + .cloneValue(cell))); + if (currentCellValue > currentMaxCellValue) { + // new value is max, hence return this cell + return cell; + } else { + // exising min value is max, hence return existing max cell + return currentMaxCell; + } + } catch (IllegalArgumentException e) { + LOG.error("Caught iae during long conversion ", e); + return cell; + } + } + + private Number addToRunningSum(Map runningSum, + String qualifier, Number currentCellValue) { + Number existingSum = runningSum.get(qualifier); + if (existingSum == null) { + return currentCellValue; + } else { + return existingSum.longValue() + currentCellValue.longValue(); + } + } + + private Cell createNewCell(Cell origCell, Number number) throws IOException { + byte[] newValue = GenericObjectMapper.write(number); + return CellUtil.createCell(CellUtil.cloneRow(origCell), + CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), + origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); + } + + @Override + public void close() throws IOException { + flowRunScanner.close(); + } + + /** + * Wraps the underlying store or region scanner in an API that hides the + * details of calling and managing the buffered batch of results. + */ + + /** + * Called to signal the start of the next() call by the scanner. + */ + public void startNext() { + currentRow = null; + } + + /** + * Returns whether or not the underlying scanner has more rows. + */ + public boolean hasMore() { + return currentIndex < availableCells.size() ? true : hasMore; + } + + /** + * Returns the next available cell for the current row and advances the + * pointer to the next cell. This method can be called multiple times in a row + * to advance through all the available cells. + * + * @param limit + * the limit of number of cells to return if the next batch must be + * fetched by the wrapped scanner + * @return the next available cell or null if no more cells are available for + * the current row + * @throws IOException + */ + public Cell nextCell(int limit) throws IOException { + Cell cell = peekAtNextCell(limit); + if (cell != null) { + currentIndex++; + } + return cell; + } + + /** + * Returns the next available cell for the current row, without advancing the + * pointer. Calling this method multiple times in a row will continue to + * return the same cell. + * + * @param limit + * the limit of number of cells to return if the next batch must be + * fetched by the wrapped scanner + * @return the next available cell or null if no more cells are available for + * the current row + * @throws IOException + */ + public Cell peekAtNextCell(int limit) throws IOException { + if (currentIndex >= availableCells.size()) { + // done with current batch + availableCells.clear(); + currentIndex = 0; + hasMore = flowRunScanner.next(availableCells, limit); + } + Cell cell = null; + if (currentIndex < availableCells.size()) { + cell = availableCells.get(currentIndex); + if (currentRow == null) { + currentRow = CellUtil.cloneRow(cell); + } else if (!CellUtil.matchingRow(cell, currentRow)) { + // moved on to the next row + // don't use the current cell + // also signal no more cells for this row + return null; + } + } + return cell; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize() + */ + @Override + public long getMaxResultSize() { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's scanner is not a RegionScanner"); + } + return regionScanner.getMaxResultSize(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint() + */ + @Override + public long getMvccReadPoint() { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.getMvccReadPoint(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone() + */ + @Override + public boolean isFilterDone() throws IOException { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.isFilterDone() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.isFilterDone(); + + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[]) + */ + @Override + public boolean reseek(byte[] bytes) throws IOException { + if (regionScanner == null) { + throw new IllegalStateException( + "RegionScanner.reseek() called when the flow " + + "scanner's internal scanner is not a RegionScanner"); + } + return regionScanner.reseek(bytes); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/TimestampGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/TimestampGenerator.java new file mode 100644 index 0000000..32d1825 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/coprocessor/TimestampGenerator.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.coprocessor; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Utility class that allows HBase coprocessors to interact with unique + * timestamps. + */ +public class TimestampGenerator { + + private AtomicLong lastTimestamp = new AtomicLong(); + public static final long MAX_TS_PER_MS = 1000000; + + /** + * Returns the current wall clock time in milliseconds, multiplied by the + * required precision. + */ + public long currentTime() { + // We want to align cell timestamps with current time. + // cell timestamps are not be less than + // System.currentTimeMillis() * MAX_TX_PER_MS. + return System.currentTimeMillis() * MAX_TS_PER_MS; + } + + /** + * Returns a timestamp value unique within the scope of this + * {@code TimestampGenerator} instance. For usage by HBase + * {@code RegionObserver} coprocessors, this normally means unique within a + * given region. + */ + public long getUniqueTimestamp() { + long lastTs; + long nextTs; + do { + lastTs = lastTimestamp.get(); + nextTs = Math.max(lastTs + 1, currentTime()); + } while (!lastTimestamp.compareAndSet(lastTs, nextTs)); + return nextTs; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java index 26e7748..8ae19b8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.entity; import java.io.IOException; +import java.util.Map; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; @@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Identifies fully qualified columns for the {@link EntityTable}. @@ -81,9 +83,9 @@ private String getColumnQualifier() { public void store(byte[] rowKey, TypedBufferedMutator tableMutator, Long timestamp, - Object inputValue) throws IOException { + Object inputValue, Attribute... attributes) throws IOException { column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue); + inputValue, attributes); } public Object readResult(Result result) throws IOException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 75ff742..781746a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; /** * Identifies partially qualified columns for the entity table. @@ -103,16 +104,18 @@ public String getColumnPrefix() { /* * (non-Javadoc) - * + * * @see * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #store(byte[], * org.apache.hadoop.yarn.server.timelineservice.storage.common. - * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, String qualifier, - Long timestamp, Object inputValue) throws IOException { + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { // Null check if (qualifier == null) { @@ -123,8 +126,9 @@ public void store(byte[] rowKey, byte[] columnQualifier = ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); - } + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); + } /* * (non-Javadoc) @@ -137,7 +141,8 @@ public void store(byte[] rowKey, */ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, byte[] qualifier, - Long timestamp, Object inputValue) throws IOException { + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { // Null check if (qualifier == null) { @@ -148,8 +153,9 @@ public void store(byte[] rowKey, byte[] columnQualifier = ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); - } + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + attributes); + } /* * (non-Javadoc) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java new file mode 100644 index 0000000..374d8bc --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationCompactionDimension.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Identifies the compaction dimensions for the data in the {@link FlowRunTable} + * . + */ +public enum AggregationCompactionDimension { + + /** + * the application id + */ + APPLICATION_ID((byte) 101); + + private byte tagType; + private byte[] inBytes; + + private AggregationCompactionDimension(byte tagType) { + this.tagType = tagType; + this.inBytes = Bytes.toBytes(this.name()); + } + + public Attribute getAttribute(String attributeValue) { + return new Attribute(this.name(), Bytes.toBytes(attributeValue)); + } + + public byte getTagType() { + return tagType; + } + + public byte[] getInBytes() { + return this.inBytes; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperations.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperations.java new file mode 100644 index 0000000..8a34308 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/AggregationOperations.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Identifies the attributes to be set for puts into the {@link FlowRunTable}. + * The numbers used for tagType are prime numbers + */ +public enum AggregationOperations { + + /** + * When the flow was started. + */ + MIN((byte) 71), + + /** + * When it ended. + */ + MAX((byte) 73), + + /** + * The metrics of the flow + */ + SUM((byte) 79), + + /** + * application running + */ + SUM_FINAL((byte) 83), + + /** + * compact + */ + COMPACT((byte) 89); + + private byte tagType; + private byte[] inBytes; + + private AggregationOperations(byte tagType) { + this.tagType = tagType; + this.inBytes = Bytes.toBytes(this.name()); + } + + public Attribute getAttribute() { + return new Attribute(this.name(), this.inBytes); + } + + public byte getTagType() { + return tagType; + } + + public byte[] getInBytes() { + return this.inBytes; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java new file mode 100644 index 0000000..5fa4253 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/Attribute.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +/** + * Defines the attribute tuple to be set for puts into the {@link FlowRunTable}. + */ +public class Attribute { + private final String name; + private final byte[] value; + + public Attribute(String name, byte[] value) { + this.name = name; + this.value = value; + } + + public String getName() { + return name; + } + + public byte[] getValue() { + return value; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java new file mode 100644 index 0000000..d991b42 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnFamily.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents the flow run table column families. + */ +public enum FlowActivityColumnFamily implements ColumnFamily { + + /** + * Info column family houses known columns, specifically ones included in + * columnfamily filters. + */ + INFO("i"); + + /** + * Byte representation of this column family. + */ + private final byte[] bytes; + + /** + * @param value + * create a column family with this name. Must be lower case and + * without spaces. + */ + private FlowActivityColumnFamily(String value) { + // column families should be lower case and not contain any spaces. + this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); + } + + public byte[] getBytes() { + return Bytes.copy(bytes); + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java new file mode 100644 index 0000000..974dd50 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +/** + * Identifies partially qualified columns for the {@link FlowActivityTable} + */ +public enum FlowActivityColumnPrefix implements ColumnPrefix { + + /** + * To store run ids of the flows + */ + RUN_ID(FlowActivityColumnFamily.INFO, "r", null); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + + /** + * Can be null for those cases where the provided column qualifier is the + * entire column name. + */ + private final String columnPrefix; + private final byte[] columnPrefixBytes; + + private final AggregationOperations aggOp; + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily + * that this column is stored in. + * @param columnPrefix + * for this column. + */ + private FlowActivityColumnPrefix( + ColumnFamily columnFamily, String columnPrefix, + AggregationOperations aggOp) { + column = new ColumnHelper(columnFamily); + this.columnFamily = columnFamily; + this.columnPrefix = columnPrefix; + if (columnPrefix == null) { + this.columnPrefixBytes = null; + } else { + // Future-proof by ensuring the right column prefix hygiene. + this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE + .encode(columnPrefix)); + } + this.aggOp = aggOp; + } + + /** + * @return the column name value + */ + public String getColumnPrefix() { + return columnPrefix; + } + + public byte[] getColumnPrefixBytes() { + return columnPrefixBytes; + } + + public AggregationOperations getAttribute() { + return aggOp; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, byte[], java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) + */ + @Override + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, byte[] qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + combinedAttributes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) + */ + public Object readResult(Result result, String qualifier) throws IOException { + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + return column.readResult(result, columnQualifier); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResults(org.apache.hadoop.hbase.client.Result) + */ + public Map readResults(Result result) throws IOException { + return column.readResults(result, columnPrefixBytes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) + */ + public NavigableMap> readResultsWithTimestamps( + Result result) throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes); + } + + /** + * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there + * is no match. The following holds true: {@code columnFor(x) == columnFor(y)} + * if and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowActivityColumnPrefix} or null + */ + public static final FlowActivityColumnPrefix columnFor(String columnPrefix) { + + // Match column based on value, assume column family matches. + for (FlowActivityColumnPrefix flowActivityColPrefix : FlowActivityColumnPrefix + .values()) { + // Find a match based only on name. + if (flowActivityColPrefix.getColumnPrefix().equals(columnPrefix)) { + return flowActivityColPrefix; + } + } + // Default to null + return null; + } + + /** + * Retrieve an {@link FlowActivityColumnPrefix} given a name, or null if there + * is no match. The following holds true: + * {@code columnFor(a,x) == columnFor(b,y)} if and only if + * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} + * + * @param columnFamily + * The columnFamily for which to retrieve the column. + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowActivityColumnPrefix} or null if both + * arguments don't match. + */ + public static final FlowActivityColumnPrefix columnFor( + FlowActivityColumnFamily columnFamily, String columnPrefix) { + + // TODO: needs unit test to confirm and need to update javadoc to explain + // null prefix case. + + for (FlowActivityColumnPrefix flowActivityColumnPrefix : FlowActivityColumnPrefix + .values()) { + // Find a match based column family and on name. + if (flowActivityColumnPrefix.columnFamily.equals(columnFamily) + && (((columnPrefix == null) && (flowActivityColumnPrefix + .getColumnPrefix() == null)) || (flowActivityColumnPrefix + .getColumnPrefix().equals(columnPrefix)))) { + return flowActivityColumnPrefix; + } + } + // Default to null + return null; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) + */ + @Override + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, String qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, null, inputValue, + combinedAttributes); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java new file mode 100644 index 0000000..e8509b2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; + +/** + * Represents a rowkey for the flow activity table. + */ +public class FlowActivityRowKey { + + private final String clusterId; + private final long dayTs; + private final String userId; + private final String flowId; + + public FlowActivityRowKey(String clusterId, long dayTs, String userId, + String flowId) { + this.clusterId = clusterId; + this.dayTs = dayTs; + this.userId = userId; + this.flowId = flowId; + } + + public String getClusterId() { + return clusterId; + } + + public long getDayTimestamp() { + return dayTs; + } + + public String getUserId() { + return userId; + } + + public String getFlowId() { + return flowId; + } + + /** + * Constructs a row key for the flow activity table as follows: + * {@code clusterId!dayTimestamp!user!flowId} + * + * Will insert into current day's record in the table + * @param clusterId + * @param userId + * @param flowId + * @return byte array with the row key prefix + */ + public static byte[] getRowKey(String clusterId, String userId, String flowId) { + long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + .currentTimeMillis()); + return getRowKey(clusterId, dayTs, userId, flowId); + } + + /** + * Constructs a row key for the flow activity table as follows: + * {@code clusterId!dayTimestamp!user!flowId} + * + * @param clusterId + * @param dayTs + * @param userId + * @param flowId + * @return + */ + public static byte[] getRowKey(String clusterId, long dayTs, String userId, + String flowId) { + return Separator.QUALIFIERS.join( + Bytes.toBytes(Separator.QUALIFIERS.encode(clusterId)), + Bytes.toBytes(TimelineWriterUtils.invert(dayTs)), + Bytes.toBytes(Separator.QUALIFIERS.encode(userId)), + Bytes.toBytes(Separator.QUALIFIERS.encode(flowId))); + } + + /** + * Given the raw row key as bytes, returns the row key as an object. + */ + public static FlowActivityRowKey parseRowKey(byte[] rowKey) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey); + + if (rowKeyComponents.length < 4) { + throw new IllegalArgumentException("the row key is not valid for " + + "a flow activity"); + } + + String clusterId = Separator.QUALIFIERS.decode(Bytes + .toString(rowKeyComponents[0])); + long dayTs = TimelineWriterUtils.invert(Bytes.toLong(rowKeyComponents[1])); + String userId = Separator.QUALIFIERS.decode(Bytes + .toString(rowKeyComponents[2])); + String flowId = Separator.QUALIFIERS.decode(Bytes + .toString(rowKeyComponents[3])); + return new FlowActivityRowKey(clusterId, dayTs, userId, flowId); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java new file mode 100644 index 0000000..af8df99 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityTable.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; + +/** + * The flow activity table has column family info + * Stores the daily activity record for flows + * Useful as a quick lookup of what flows were + * running on a given day + * + * Example flow activity table record: + * + * + * |-------------------------------------------| + * | Row key | Column Family | + * | | info | + * |-------------------------------------------| + * | clusterId! | r!runid1:version1 | + * | inv Top of | | + * | Day! | r!runid2:version7 | + * | userName! | | + * | flowId | | + * |-------------------------------------------| + * + */ +public class FlowActivityTable extends BaseTable { + /** flow activity table prefix */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowactivity"; + + /** config param name that specifies the flowactivity table name */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** default value for flowactivity table name */ + public static final String DEFAULT_TABLE_NAME = "timelineservice.flowactivity"; + + private static final Log LOG = LogFactory.getLog(FlowActivityTable.class); + + /** default max number of versions */ + public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE; + + public FlowActivityTable() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable + * (org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor FlowActivityTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(FlowActivityColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + FlowActivityTableDescp.addFamily(infoCF); + infoCF.setMinVersions(1); + infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS); + + // TODO: figure the split policy before running in production + admin.createTable(FlowActivityTableDescp); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java new file mode 100644 index 0000000..e33df95 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +/** + * Identifies fully qualified columns for the {@link FlowRunTable}. + */ +public enum FlowRunColumn implements Column { + + /** + * When the flow was started. This is the minimum of currently known + * application start times. + */ + MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time", + AggregationOperations.MIN), + + /** + * When the flow ended. This is the maximum of currently known application end + * times. + */ + MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time", + AggregationOperations.MAX), + + /** + * The version of the flow that this flow belongs to. + */ + FLOW_VERSION(FlowRunColumnFamily.INFO, "flow_version", null); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + private final String columnQualifier; + private final byte[] columnQualifierBytes; + private final AggregationOperations aggOp; + + private FlowRunColumn(ColumnFamily columnFamily, + String columnQualifier, AggregationOperations aggOp) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + this.aggOp = aggOp; + // Future-proof by ensuring the right column prefix hygiene. + this.columnQualifierBytes = Bytes.toBytes(Separator.SPACE + .encode(columnQualifier)); + this.column = new ColumnHelper(columnFamily); + } + + /** + * @return the column name value + */ + private String getColumnQualifier() { + return columnQualifier; + } + + public byte[] getColumnQualifierBytes() { + return columnQualifierBytes; + } + + public AggregationOperations getAggregationOperation() { + return aggOp; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.Column#store + * (byte[], org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.Long, java.lang.Object, + * org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute[]) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, Long timestamp, + Object inputValue, Attribute... attributes) throws IOException { + + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, aggOp); + column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, + inputValue, combinedAttributes); + } + + public Object readResult(Result result) throws IOException { + return column.readResult(result, columnQualifierBytes); + } + + /** + * Retrieve an {@link FlowRunColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnQualifier + * Name of the column to retrieve + * @return the corresponding {@link FlowRunColumn} or null + */ + public static final FlowRunColumn columnFor(String columnQualifier) { + + // Match column based on value, assume column family matches. + for (FlowRunColumn ec : FlowRunColumn.values()) { + // Find a match based only on name. + if (ec.getColumnQualifier().equals(columnQualifier)) { + return ec; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link FlowRunColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} + * if and only if {@code a.equals(b) & x.equals(y)} or + * {@code (x == y == null)} + * + * @param columnFamily + * The columnFamily for which to retrieve the column. + * @param name + * Name of the column to retrieve + * @return the corresponding {@link FlowRunColumn} or null if both arguments + * don't match. + */ + public static final FlowRunColumn columnFor(FlowRunColumnFamily columnFamily, + String name) { + + for (FlowRunColumn ec : FlowRunColumn.values()) { + // Find a match based column family and on name. + if (ec.columnFamily.equals(columnFamily) + && ec.getColumnQualifier().equals(name)) { + return ec; + } + } + + // Default to null + return null; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java new file mode 100644 index 0000000..8faf5f8 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnFamily.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents the flow run table column families. + */ +public enum FlowRunColumnFamily implements ColumnFamily { + + /** + * Info column family houses known columns, specifically ones included in + * columnfamily filters. + */ + INFO("i"); + + /** + * Byte representation of this column family. + */ + private final byte[] bytes; + + /** + * @param value + * create a column family with this name. Must be lower case and + * without spaces. + */ + private FlowRunColumnFamily(String value) { + // column families should be lower case and not contain any spaces. + this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); + } + + public byte[] getBytes() { + return Bytes.copy(bytes); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java new file mode 100644 index 0000000..73c79d5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +/** + * Identifies partially qualified columns for the {@link FlowRunTable}. + */ +public enum FlowRunColumnPrefix implements ColumnPrefix { + + /** + * To store flow run info values. + */ + METRIC(FlowRunColumnFamily.INFO, "m", AggregationOperations.SUM); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + + /** + * Can be null for those cases where the provided column qualifier is the + * entire column name. + */ + private final String columnPrefix; + private final byte[] columnPrefixBytes; + + private final AggregationOperations aggOp; + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily + * that this column is stored in. + * @param columnPrefix + * for this column. + */ + private FlowRunColumnPrefix(ColumnFamily columnFamily, + String columnPrefix, AggregationOperations fra) { + column = new ColumnHelper(columnFamily); + this.columnFamily = columnFamily; + this.columnPrefix = columnPrefix; + if (columnPrefix == null) { + this.columnPrefixBytes = null; + } else { + // Future-proof by ensuring the right column prefix hygiene. + this.columnPrefixBytes = Bytes.toBytes(Separator.SPACE + .encode(columnPrefix)); + } + this.aggOp = fra; + } + + /** + * @return the column name value + */ + public String getColumnPrefix() { + return columnPrefix; + } + + public byte[] getColumnPrefixBytes() { + return columnPrefixBytes; + } + + public AggregationOperations getAttribute() { + return aggOp; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, String qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + combinedAttributes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, byte[] qualifier, + Long timestamp, Object inputValue, Attribute... attributes) + throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + Attribute[] combinedAttributes = TimelineWriterUtils.combineAttributes( + attributes, this.aggOp); + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, + combinedAttributes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) + */ + public Object readResult(Result result, String qualifier) throws IOException { + byte[] columnQualifier = ColumnHelper.getColumnQualifier( + this.columnPrefixBytes, qualifier); + return column.readResult(result, columnQualifier); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResults(org.apache.hadoop.hbase.client.Result) + */ + public Map readResults(Result result) throws IOException { + return column.readResults(result, columnPrefixBytes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) + */ + public NavigableMap> readResultsWithTimestamps( + Result result) throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes); + } + + /** + * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is + * no match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowRunColumnPrefix} or null + */ + public static final FlowRunColumnPrefix columnFor(String columnPrefix) { + + // Match column based on value, assume column family matches. + for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) { + // Find a match based only on name. + if (frcp.getColumnPrefix().equals(columnPrefix)) { + return frcp; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link FlowRunColumnPrefix} given a name, or null if there is + * no match. The following holds true: + * {@code columnFor(a,x) == columnFor(b,y)} if and only if + * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} + * + * @param columnFamily + * The columnFamily for which to retrieve the column. + * @param columnPrefix + * Name of the column to retrieve + * @return the corresponding {@link FlowRunColumnPrefix} or null if both + * arguments don't match. + */ + public static final FlowRunColumnPrefix columnFor( + FlowRunColumnFamily columnFamily, String columnPrefix) { + + // TODO: needs unit test to confirm and need to update javadoc to explain + // null prefix case. + + for (FlowRunColumnPrefix frcp : FlowRunColumnPrefix.values()) { + // Find a match based column family and on name. + if (frcp.columnFamily.equals(columnFamily) + && (((columnPrefix == null) && (frcp.getColumnPrefix() == null)) || (frcp + .getColumnPrefix().equals(columnPrefix)))) { + return frcp; + } + } + + // Default to null + return null; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java new file mode 100644 index 0000000..b2b423c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; + +/** + * Represents a rowkey for the flow run table. + */ +public class FlowRunRowKey { + // TODO: more methods are needed for this class like parse row key + + /** + * Constructs a row key for the entity table as follows: { + * clusterId!userI!flowId!Inverted Flow Run Id} + * + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @return byte array with the row key + */ + public static byte[] getRowKey(String clusterId, String userId, + String flowId, Long flowRunId) { + byte[] first = Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, + userId, flowId)); + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId)); + return Separator.QUALIFIERS.join(first, second); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java new file mode 100644 index 0000000..f395965 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.coprocessor.FlowRunCoprocessor; + +/** + * The flow run table has column family info + * Stores per flow run information + * aggregated across applications + * + * Example flow run table record: + * + *
+ * flow_run table
+ * |-------------------------------------------|
+ * |  Row key   | Column Family                |
+ * |            | info                         |
+ * |-------------------------------------------|
+ * | clusterId! | flow_version:version7        |
+ * | userName!  |                              |
+ * | flowId!    | running_apps:1               |
+ * | flowRunId  |                              |
+ * |            | min_start_time:1392995080000 |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            | min_start_time:1392995081012 |
+ * |            | #0:appId2                    |
+ * |            |                              |
+ * |            | min_start_time:1392993083210 |
+ * |            | #0:appId3                    |
+ * |            |                              |
+ * |            |                              |
+ * |            | max_end_time:1392993084018   |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            |                              |
+ * |            | m!mapInputRecords:127        |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            | m!mapInputRecords:31         |
+ * |            | #2:appId2                    |
+ * |            |                              |
+ * |            | m!mapInputRecords:37         |
+ * |            | #1:appId3                    |
+ * |            |                              |
+ * |            |                              |
+ * |            | m!mapOutputRecords:181       |
+ * |            | #0:""                        |
+ * |            |                              |
+ * |            | m!mapOutputRecords:37        |
+ * |            | #1:appId3                    |
+ * |            |                              |
+ * |            |                              |
+ * |-------------------------------------------|
+ * 
+ */ +public class FlowRunTable extends BaseTable { + /** entity prefix */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".flowrun"; + + /** config param name that specifies the flowrun table name */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** default value for flowrun table name */ + public static final String DEFAULT_TABLE_NAME = "timelineservice.flowrun"; + + private static final Log LOG = LogFactory.getLog(FlowRunTable.class); + + /** default max number of versions */ + public static final int DEFAULT_METRICS_MAX_VERSIONS = Integer.MAX_VALUE; + + public FlowRunTable() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable + * (org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor flowRunTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(FlowRunColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + flowRunTableDescp.addFamily(infoCF); + infoCF.setMinVersions(1); + infoCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS); + + // TODO: figure the split policy + flowRunTableDescp.addCoprocessor(FlowRunCoprocessor.class + .getCanonicalName()); + admin.createTable(flowRunTableDescp); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 2875e01..d1ecd7d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; @@ -88,12 +87,7 @@ public static void setupBeforeClass() throws Exception { } private static void createSchema() throws IOException { - new EntityTable() - .createTable(util.getHBaseAdmin(), util.getConfiguration()); - new AppToFlowTable() - .createTable(util.getHBaseAdmin(), util.getConfiguration()); - new ApplicationTable() - .createTable(util.getHBaseAdmin(), util.getConfiguration()); + TimelineSchemaCreator.createAllTables(util.getConfiguration()); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImplFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImplFlowRun.java new file mode 100644 index 0000000..59e5f05 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImplFlowRun.java @@ -0,0 +1,614 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * @throws Exception + */ +public class TestHBaseTimelineWriterImplFlowRun { + + private static HBaseTestingUtility util; + private final String metric1 = "MAP_SLOT_MILLIS"; + private final String metric2 = "HDFS_BYTES_READ"; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + Configuration conf = util.getConfiguration(); + conf.setInt("hfile.format.version", 3); + util.startMiniCluster(); + createSchema(); + } + + private static void createSchema() throws IOException { + TimelineSchemaCreator.createAllTables(util.getConfiguration()); + } + + private TimelineEntity getEntity1() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHello"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 20000000000000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map metricValues = new HashMap(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 120000, 100000000); + metricValues.put(ts - 100000, 200000000); + metricValues.put(ts - 80000, 300000000); + metricValues.put(ts - 60000, 400000000); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + Long expTs = 1436512802000L; + event.setTimestamp(expTs); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + + event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(1436512801000L); + event.addInfo(expKey, expVal); + entity.addEvent(event); + + return entity; + } + + @Test + public void testWriteFlowRunMinMaxToHBase() throws Exception { + + TimelineEntities te = new TimelineEntities(); + te.addEntity(getEntity1()); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + String cluster = "testWriteFlowRunMinMaxToHBase_cluster1"; + String user = "testWriteFlowRunMinMaxToHBase_user1"; + String flow = "testing_flowRun_flow_name"; + String flowVersion = "CF7022C10F1354"; + Long runid = 1002345678919L; + String appName = "application_id_100000000000_1111"; + long endTs = 1439750690000L; + TimelineEntity entityMinStartTime = getEntityMinStartTime(); + + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // write another entity with the right min start time + te = new TimelineEntities(); + te.addEntity(entityMinStartTime); + appName = "application_id_100000000000_3333"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // writer another entity for max end time + TimelineEntity entityMaxEndTime = getEntityMaxEndTime(endTs); + te = new TimelineEntities(); + te.addEntity(entityMaxEndTime); + appName = "application_id_100000000000_4444"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // writer another entity with greater start time + TimelineEntity entityGreaterStartTime = getEntityGreaterStartTime(); + te = new TimelineEntities(); + te.addEntity(entityGreaterStartTime); + appName = "application_id_1000000000000000_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // flush everything to hbase + hbi.flush(); + } finally { + hbi.close(); + } + + Connection conn = ConnectionFactory.createConnection(c1); + // check in flow run table + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + // scan the table and see that we get back the right min and max + // timestamps + byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + Get g = new Get(startRow); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); + g.addColumn(FlowRunColumnFamily.INFO.getBytes(), + FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()); + Result r1 = table1.get(g); + assertNotNull(r1); + assertTrue(!r1.isEmpty()); + Map values = r1.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + assertEquals(2, r1.size()); + Long starttime = (Long) GenericObjectMapper.read(values + .get(FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes())); + Long expmin = entityMinStartTime.getCreatedTime(); + assertEquals(expmin, starttime); + assertEquals(endTs, GenericObjectMapper.read(values + .get(FlowRunColumn.MAX_END_TIME.getColumnQualifierBytes()))); + + // check in flow activity table + table1 = conn.getTable(TableName + .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + // scan the table and see that we get back the right min and max + // timestamps + startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); + g = new Get(startRow); + r1 = table1.get(g); + assertNotNull(r1); + assertTrue(!r1.isEmpty()); + values = r1.getFamilyMap(FlowActivityColumnFamily.INFO.getBytes()); + assertEquals(1, values.size()); + + } + + private TimelineEntity getEntityGreaterStartTime() { + TimelineEntity entity = new TimelineEntity(); + entity.setCreatedTime(30000000000000L); + entity.setId("flowRunHello with greater start time"); + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setType(type); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + long endTs = 1439379885000L; + event.setTimestamp(endTs); + String expKey = "foo_event_greater"; + String expVal = "test_app_greater"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + return entity; + } + + private TimelineEntity getEntityMaxEndTime(long endTs) { + TimelineEntity entity = new TimelineEntity(); + entity.setId("flowRunHello Max End time"); + entity.setType(TimelineEntityType.YARN_APPLICATION.toString()); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE); + event.setTimestamp(endTs); + String expKey = "foo_even_max_ finished"; + String expVal = "test_app_max_finished"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + return entity; + } + + private TimelineEntity getEntityMinStartTime() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunHelloMInStartTime"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 10000000000000L; + entity.setCreatedTime(cTime); + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + event.setTimestamp(System.currentTimeMillis()); + entity.addEvent(event); + return entity; + } + + boolean isFlowRunRowKeyCorrect(byte[] rowKey, String cluster, String user, + String flow, Long runid) { + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1); + assertTrue(rowKeyComponents.length == 4); + assertEquals(cluster, Bytes.toString(rowKeyComponents[0])); + assertEquals(user, Bytes.toString(rowKeyComponents[1])); + assertEquals(flow, Bytes.toString(rowKeyComponents[2])); + assertEquals(TimelineWriterUtils.invert(runid), + Bytes.toLong(rowKeyComponents[3])); + return true; + } + + @Test + public void testWriteFlowRunMetricsOneFlow() throws Exception { + String cluster = "testWriteFlowRunMetricsOneFlow_cluster1"; + String user = "testWriteFlowRunMetricsOneFlow_user1"; + String flow = "testing_flowRun_metrics_flow_name"; + String flowVersion = "CF7022C10F1354"; + Long runid = 1002345678919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = getEntityMetricsApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11111111111111_1111"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + // write another application with same metric to this flow + te = new TimelineEntities(); + TimelineEntity entityApp2 = getEntityMetricsApp2(); + te.addEntity(entityApp2); + appName = "application_11111111111111_2222"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.flush(); + } finally { + hbi.close(); + } + + // check flow run + checkFlowRunTable(cluster, user, flow, runid, c1); + } + + @Test + public void testWriteFlowActivityOneFlow() throws Exception { + String cluster = "testWriteFlowActivityOneFlow_cluster1"; + String user = "testWriteFlowActivityOneFlow_user1"; + String flow = "flow_activity_test_flow_name"; + String flowVersion = "A122110F135BC4"; + Long runid = 1001111178919L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = getFlowApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_1111999999_1234"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + // write another application with same metric to this flow + hbi.flush(); + } finally { + hbi.close(); + } + // check flow activity + checkFlowActivityTable(cluster, user, flow, flowVersion, runid, c1); + } + + private void checkFlowActivityTable(String cluster, String user, String flow, + String flowVersion, Long runid, Configuration c1) throws IOException { + Scan s = new Scan(); + s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); + byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + Map values = result + .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes()); + rowCount++; + byte[] row = result.getRow(); + FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey + .parseRowKey(row); + assertNotNull(flowActivityRowKey); + assertEquals(cluster, flowActivityRowKey.getClusterId()); + assertEquals(user, flowActivityRowKey.getUserId()); + assertEquals(flow, flowActivityRowKey.getFlowId()); + long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + .currentTimeMillis()); + assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); + assertEquals(1, values.size()); + byte[] rq = ColumnHelper.getColumnQualifier( + FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(), + GenericObjectMapper.write(runid)); + for (Map.Entry k : values.entrySet()) { + String actualQ = Bytes.toString(k.getKey()); + if (Bytes.toString(rq).equals(actualQ)) { + String actualV = (String) GenericObjectMapper.read(k.getValue()); + assertEquals(flowVersion, actualV); + } + } + } + assertEquals(1, rowCount); + } + + private void checkFlowRunTable(String cluster, String user, String flow, + long runid, Configuration c1) throws IOException { + Scan s = new Scan(); + s.addFamily(FlowRunColumnFamily.INFO.getBytes()); + byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + Map values = result.getFamilyMap(FlowRunColumnFamily.INFO + .getBytes()); + rowCount++; + // check metric1 + byte[] q = ColumnHelper.getColumnQualifier( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1); + assertTrue(values.containsKey(q)); + assertEquals(141, GenericObjectMapper.read(values.get(q))); + + // check metric2 + assertEquals(2, values.size()); + q = ColumnHelper.getColumnQualifier( + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2); + assertTrue(values.containsKey(q)); + assertEquals(57, GenericObjectMapper.read(values.get(q))); + } + assertEquals(1, rowCount); + } + + private TimelineEntity getFlowApp1() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowActivity_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + TimelineEvent event = new TimelineEvent(); + event.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE); + Long expTs = 1436512802000L; + event.setTimestamp(expTs); + String expKey = "foo_event"; + Object expVal = "test"; + event.addInfo(expKey, expVal); + entity.addEvent(event); + + return entity; + } + + private TimelineEntity getEntityMetricsApp1() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId(metric1); + Map metricValues = new HashMap(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 100000, 2); + metricValues.put(ts - 80000, 40); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + m1 = new TimelineMetric(); + m1.setId(metric2); + metricValues = new HashMap(); + ts = System.currentTimeMillis(); + metricValues.put(ts - 100000, 31); + metricValues.put(ts - 80000, 57); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + + entity.addMetrics(metrics); + return entity; + } + + private TimelineEntity getEntityMetricsApp2() { + TimelineEntity entity = new TimelineEntity(); + String id = "flowRunMetrics_test"; + String type = TimelineEntityType.YARN_APPLICATION.toString(); + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map metricValues = new HashMap(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 100000, 5L); + metricValues.put(ts - 80000, 101L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + return entity; + } + + @Test + public void testFlowActivityTable() throws IOException { + String cluster = "testManyDaysFlowActivity_cluster1"; + String user = "testManyDaysFlowActivity_c_user1"; + String flow = "flow_activity_test_flow_name"; + String flowVersion1 = "A122110F135BC4"; + Long runid1 = 11111111111L; + + String flowVersion2 = "A12222222222C4"; + long runid2 = 2222222222222L; + + String flowVersion3 = "A1333333333C4"; + long runid3 = 3333333333333L; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entityApp1 = getFlowApp1(); + te.addEntity(entityApp1); + + HBaseTimelineWriterImpl hbi = null; + Configuration c1 = util.getConfiguration(); + try { + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String appName = "application_11888888888_1111"; + hbi.write(cluster, user, flow, flowVersion1, runid1, appName, te); + + // write an application with to this flow but a different day + te = new TimelineEntities(); + te.addEntity(entityApp1); + appName = "application_11888888888_2222"; + hbi.write(cluster, user, flow, flowVersion2, runid2, appName, te); + + // write an application with to this flow but a different day + te = new TimelineEntities(); + te.addEntity(entityApp1); + appName = "application_11888888888_3333"; + hbi.write(cluster, user, flow, flowVersion3, runid3, appName, te); + + hbi.flush(); + } finally { + hbi.close(); + } + // check flow activity + checkFlowActivityTableTwoDays(cluster, user, flow, c1, flowVersion1, + runid1, flowVersion2, runid2, flowVersion3, runid3); + + } + + private void checkFlowActivityTableTwoDays(String cluster, String user, + String flow, Configuration c1, String flowVersion1, Long runid1, + String flowVersion2, Long runid2, String flowVersion3, Long runid3) + throws IOException { + Scan s = new Scan(); + s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); + byte[] startRow = FlowActivityRowKey.getRowKey(cluster, user, flow); + s.setStartRow(startRow); + String clusterStop = cluster + "1"; + byte[] stopRow = FlowActivityRowKey.getRowKey(clusterStop, user, flow); + s.setStopRow(stopRow); + Connection conn = ConnectionFactory.createConnection(c1); + Table table1 = conn.getTable(TableName + .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + ResultScanner scanner = table1.getScanner(s); + int rowCount = 0; + for (Result result : scanner) { + assertNotNull(result); + assertTrue(!result.isEmpty()); + byte[] row = result.getRow(); + FlowActivityRowKey flowActivityRowKey = FlowActivityRowKey + .parseRowKey(row); + assertNotNull(flowActivityRowKey); + assertEquals(cluster, flowActivityRowKey.getClusterId()); + assertEquals(user, flowActivityRowKey.getUserId()); + assertEquals(flow, flowActivityRowKey.getFlowId()); + long dayTs = TimelineWriterUtils.getTopOfTheDayTimestamp(System + .currentTimeMillis()); + assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); + + Map values = result + .getFamilyMap(FlowActivityColumnFamily.INFO.getBytes()); + rowCount++; + assertEquals(3, values.size()); + checkRunId(runid1, flowVersion1, values); + checkRunId(runid2, flowVersion2, values); + checkRunId(runid3, flowVersion3, values); + } + // the flow activity table is such that it will insert + // into current day's record + // hence, if this test runs across the midnight boundary, + // it may fail since it would insert into two records + // one for each day + assertEquals(1, rowCount); + } + + private void checkRunId(Long runid, String flowVersion, + Map values) throws IOException { + byte[] rq = ColumnHelper.getColumnQualifier( + FlowActivityColumnPrefix.RUN_ID.getColumnPrefixBytes(), + GenericObjectMapper.write(runid)); + for (Map.Entry k : values.entrySet()) { + String actualQ = Bytes.toString(k.getKey()); + if (Bytes.toString(rq).equals(actualQ)) { + String actualV = (String) GenericObjectMapper.read(k.getValue()); + assertEquals(flowVersion, actualV); + } + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } +}