diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index cd2e76e..bb16b9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -33,9 +33,15 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; @@ -54,6 +60,7 @@ private Connection conn; private TypedBufferedMutator entityTable; + private TypedBufferedMutator applicationTable; private static final Log LOG = LogFactory .getLog(HBaseTimelineWriterImpl.class); @@ -76,6 +83,7 @@ protected void serviceInit(Configuration conf) throws Exception { Configuration hbaseConf = HBaseConfiguration.create(conf); conn = ConnectionFactory.createConnection(hbaseConf); entityTable = new EntityTable().getTableMutator(hbaseConf, conn); + applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn); } /** @@ -94,58 +102,94 @@ public TimelineWriteResponse write(String clusterId, String userId, continue; } - byte[] rowKey = - EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId, - te); + // if the entity is the application, the destination is the application + // table + boolean isApplication = + te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); + byte[] rowKey = isApplication ? + ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId, + appId) : + EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, + appId, te); - storeInfo(rowKey, te, flowVersion); - storeEvents(rowKey, te.getEvents()); - storeConfig(rowKey, te.getConfigs()); - storeMetrics(rowKey, te.getMetrics()); - storeRelations(rowKey, te.getIsRelatedToEntities(), - EntityColumnPrefix.IS_RELATED_TO); - storeRelations(rowKey, te.getRelatesToEntities(), - EntityColumnPrefix.RELATES_TO); + storeInfo(rowKey, te, flowVersion, isApplication); + storeEvents(rowKey, te.getEvents(), isApplication); + storeConfig(rowKey, te.getConfigs(), isApplication); + storeMetrics(rowKey, te.getMetrics(), isApplication); + storeRelations(rowKey, te, isApplication); } return putStatus; } + private void storeRelations(byte[] rowKey, TimelineEntity te, + boolean isApplication) throws IOException { + if (isApplication) { + storeRelations(rowKey, te.getIsRelatedToEntities(), + ApplicationColumnPrefix.IS_RELATED_TO, applicationTable); + storeRelations(rowKey, te.getRelatesToEntities(), + ApplicationColumnPrefix.RELATES_TO, applicationTable); + } else { + storeRelations(rowKey, te.getIsRelatedToEntities(), + EntityColumnPrefix.IS_RELATED_TO, entityTable); + storeRelations(rowKey, te.getRelatesToEntities(), + EntityColumnPrefix.RELATES_TO, entityTable); + } + } + /** * Stores the Relations from the {@linkplain TimelineEntity} object */ - private void storeRelations(byte[] rowKey, + private void storeRelations(byte[] rowKey, Map> connectedEntities, - EntityColumnPrefix entityColumnPrefix) throws IOException { + ColumnPrefix columnPrefix, TypedBufferedMutator table) + throws IOException { for (Map.Entry> connectedEntity : connectedEntities .entrySet()) { // id3?id4?id5 String compoundValue = Separator.VALUES.joinEncoded(connectedEntity.getValue()); - entityColumnPrefix.store(rowKey, entityTable, connectedEntity.getKey(), - null, compoundValue); + columnPrefix.store(rowKey, table, connectedEntity.getKey(), null, + compoundValue); } } /** * Stores information from the {@linkplain TimelineEntity} object */ - private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion) - throws IOException { - - EntityColumn.ID.store(rowKey, entityTable, null, te.getId()); - EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType()); - EntityColumn.CREATED_TIME.store(rowKey, entityTable, null, - te.getCreatedTime()); - EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null, - te.getModifiedTime()); - EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); - Map info = te.getInfo(); - if (info != null) { - for (Map.Entry entry : info.entrySet()) { - EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(), - null, entry.getValue()); + private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, + boolean isApplication) throws IOException { + + if (isApplication) { + ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId()); + ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null, + te.getCreatedTime()); + ApplicationColumn.MODIFIED_TIME.store(rowKey, applicationTable, null, + te.getModifiedTime()); + ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null, + flowVersion); + Map info = te.getInfo(); + if (info != null) { + for (Map.Entry entry : info.entrySet()) { + ApplicationColumnPrefix.INFO.store(rowKey, applicationTable, + entry.getKey(), null, entry.getValue()); + } + } + } else { + EntityColumn.ID.store(rowKey, entityTable, null, te.getId()); + EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType()); + EntityColumn.CREATED_TIME.store(rowKey, entityTable, null, + te.getCreatedTime()); + EntityColumn.MODIFIED_TIME.store(rowKey, entityTable, null, + te.getModifiedTime()); + EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); + Map info = te.getInfo(); + if (info != null) { + for (Map.Entry entry : info.entrySet()) { + EntityColumnPrefix.INFO.store(rowKey, entityTable, entry.getKey(), + null, entry.getValue()); + } } } } @@ -153,14 +197,19 @@ private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion) /** * stores the config information from {@linkplain TimelineEntity} */ - private void storeConfig(byte[] rowKey, Map config) - throws IOException { + private void storeConfig(byte[] rowKey, Map config, + boolean isApplication) throws IOException { if (config == null) { return; } for (Map.Entry entry : config.entrySet()) { - EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(), + if (isApplication) { + ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable, + entry.getKey(), null, entry.getValue()); + } else { + EntityColumnPrefix.CONFIG.store(rowKey, entityTable, entry.getKey(), null, entry.getValue()); + } } } @@ -168,16 +217,21 @@ private void storeConfig(byte[] rowKey, Map config) * stores the {@linkplain TimelineMetric} information from the * {@linkplain TimelineEvent} object */ - private void storeMetrics(byte[] rowKey, Set metrics) - throws IOException { + private void storeMetrics(byte[] rowKey, Set metrics, + boolean isApplication) throws IOException { if (metrics != null) { for (TimelineMetric metric : metrics) { String metricColumnQualifier = metric.getId(); Map timeseries = metric.getValues(); for (Map.Entry timeseriesEntry : timeseries.entrySet()) { Long timestamp = timeseriesEntry.getKey(); - EntityColumnPrefix.METRIC.store(rowKey, entityTable, + if (isApplication) { + ApplicationColumnPrefix.METRIC.store(rowKey, applicationTable, metricColumnQualifier, timestamp, timeseriesEntry.getValue()); + } else { + EntityColumnPrefix.METRIC.store(rowKey, entityTable, + metricColumnQualifier, timestamp, timeseriesEntry.getValue()); + } } } } @@ -186,8 +240,8 @@ private void storeMetrics(byte[] rowKey, Set metrics) /** * Stores the events from the {@linkplain TimelineEvent} object */ - private void storeEvents(byte[] rowKey, Set events) - throws IOException { + private void storeEvents(byte[] rowKey, Set events, + boolean isApplication) throws IOException { if (events != null) { for (TimelineEvent event : events) { if (event != null) { @@ -212,8 +266,13 @@ private void storeEvents(byte[] rowKey, Set events) // convert back to string to avoid additional API on store. String compoundColumnQualifier = Bytes.toString(compoundColumnQualifierBytes); - EntityColumnPrefix.EVENT.store(rowKey, entityTable, + if (isApplication) { + ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, compoundColumnQualifier, eventTimestamp, info.getValue()); + } else { + EntityColumnPrefix.EVENT.store(rowKey, entityTable, + compoundColumnQualifier, eventTimestamp, info.getValue()); + } } // for info: eventInfo } } @@ -232,6 +291,7 @@ public TimelineWriteResponse aggregate(TimelineEntity data, public void flush() throws IOException { // flush all buffered mutators entityTable.flush(); + applicationTable.flush(); } /** @@ -241,10 +301,14 @@ public void flush() throws IOException { @Override protected void serviceStop() throws Exception { if (entityTable != null) { - LOG.info("closing entity table"); + LOG.info("closing the entity table"); // The close API performs flushing and releases any resources held entityTable.close(); } + if (applicationTable != null) { + LOG.info("closing the application table"); + applicationTable.close(); + } if (conn != null) { LOG.info("closing the hbase Connection"); conn.close(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java new file mode 100644 index 0000000..c028386 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +/** + * Identifies fully qualified columns for the {@link ApplicationTable}. + */ +public enum ApplicationColumn implements Column { + + /** + * App id + */ + ID(ApplicationColumnFamily.INFO, "id"), + + /** + * When the application was created. + */ + CREATED_TIME(ApplicationColumnFamily.INFO, "created_time"), + + /** + * When it was modified. + */ + MODIFIED_TIME(ApplicationColumnFamily.INFO, "modified_time"), + + /** + * The version of the flow that this app belongs to. + */ + FLOW_VERSION(ApplicationColumnFamily.INFO, "flow_version"); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + private final String columnQualifier; + private final byte[] columnQualifierBytes; + + private ApplicationColumn(ColumnFamily columnFamily, + String columnQualifier) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + // Future-proof by ensuring the right column prefix hygiene. + this.columnQualifierBytes = + Bytes.toBytes(Separator.SPACE.encode(columnQualifier)); + this.column = new ColumnHelper(columnFamily); + } + + /** + * @return the column name value + */ + private String getColumnQualifier() { + return columnQualifier; + } + + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, Long timestamp, + Object inputValue) throws IOException { + column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, + inputValue); + } + + public Object readResult(Result result) throws IOException { + return column.readResult(result, columnQualifierBytes); + } + + /** + * Retrieve an {@link ApplicationColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(x) == columnFor(y)} if + * and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnQualifier Name of the column to retrieve + * @return the corresponding {@link ApplicationColumn} or null + */ + public static final ApplicationColumn columnFor(String columnQualifier) { + + // Match column based on value, assume column family matches. + for (ApplicationColumn ac : ApplicationColumn.values()) { + // Find a match based only on name. + if (ac.getColumnQualifier().equals(columnQualifier)) { + return ac; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link ApplicationColumn} given a name, or null if there is no + * match. The following holds true: {@code columnFor(a,x) == columnFor(b,y)} + * if and only if {@code a.equals(b) & x.equals(y)} or + * {@code (x == y == null)} + * + * @param columnFamily The columnFamily for which to retrieve the column. + * @param name Name of the column to retrieve + * @return the corresponding {@link ApplicationColumn} or null if both + * arguments don't match. + */ + public static final ApplicationColumn columnFor( + ApplicationColumnFamily columnFamily, String name) { + + for (ApplicationColumn ac : ApplicationColumn.values()) { + // Find a match based column family and on name. + if (ac.columnFamily.equals(columnFamily) + && ac.getColumnQualifier().equals(name)) { + return ac; + } + } + + // Default to null + return null; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java new file mode 100644 index 0000000..97e5f7b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents the application table column families. + */ +public enum ApplicationColumnFamily implements ColumnFamily { + + /** + * Info column family houses known columns, specifically ones included in + * columnfamily filters. + */ + INFO("i"), + + /** + * Configurations are in a separate column family for two reasons: a) the size + * of the config values can be very large and b) we expect that config values + * are often separately accessed from other metrics and info columns. + */ + CONFIGS("c"), + + /** + * Metrics have a separate column family, because they have a separate TTL. + */ + METRICS("m"); + + /** + * Byte representation of this column family. + */ + private final byte[] bytes; + + /** + * @param value create a column family with this name. Must be lower case and + * without spaces. + */ + private ApplicationColumnFamily(String value) { + // column families should be lower case and not contain any spaces. + this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); + } + + public byte[] getBytes() { + return Bytes.copy(bytes); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java new file mode 100644 index 0000000..cd9e845 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; + +/** + * Identifies partially qualified columns for the application table. + */ +public enum ApplicationColumnPrefix implements ColumnPrefix { + + /** + * To store TimelineEntity getIsRelatedToEntities values. + */ + IS_RELATED_TO(ApplicationColumnFamily.INFO, "s"), + + /** + * To store TimelineEntity getRelatesToEntities values. + */ + RELATES_TO(ApplicationColumnFamily.INFO, "r"), + + /** + * To store TimelineEntity info values. + */ + INFO(ApplicationColumnFamily.INFO, "i"), + + /** + * Lifecycle events for an application + */ + EVENT(ApplicationColumnFamily.INFO, "e"), + + /** + * Config column stores configuration with config key as the column name. + */ + CONFIG(ApplicationColumnFamily.CONFIGS, null), + + /** + * Metrics are stored with the metric name as the column name. + */ + METRIC(ApplicationColumnFamily.METRICS, null); + + private final ColumnHelper column; + private final ColumnFamily columnFamily; + + /** + * Can be null for those cases where the provided column qualifier is the + * entire column name. + */ + private final String columnPrefix; + private final byte[] columnPrefixBytes; + + /** + * Private constructor, meant to be used by the enum definition. + * + * @param columnFamily that this column is stored in. + * @param columnPrefix for this column. + */ + private ApplicationColumnPrefix(ColumnFamily columnFamily, + String columnPrefix) { + column = new ColumnHelper(columnFamily); + this.columnFamily = columnFamily; + this.columnPrefix = columnPrefix; + if (columnPrefix == null) { + this.columnPrefixBytes = null; + } else { + // Future-proof by ensuring the right column prefix hygiene. + this.columnPrefixBytes = + Bytes.toBytes(Separator.SPACE.encode(columnPrefix)); + } + } + + /** + * @return the column name value + */ + private String getColumnPrefix() { + return columnPrefix; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #store(byte[], + * org.apache.hadoop.yarn.server.timelineservice.storage.common. + * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) + */ + public void store(byte[] rowKey, + TypedBufferedMutator tableMutator, String qualifier, + Long timestamp, Object inputValue) throws IOException { + + // Null check + if (qualifier == null) { + throw new IOException("Cannot store column with null qualifier in " + + tableMutator.getName().getNameAsString()); + } + + byte[] columnQualifier = + ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + + column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) + */ + public Object readResult(Result result, String qualifier) throws IOException { + byte[] columnQualifier = + ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); + return column.readResult(result, columnQualifier); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResults(org.apache.hadoop.hbase.client.Result) + */ + public Map readResults(Result result) throws IOException { + return column.readResults(result, columnPrefixBytes); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix + * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) + */ + public NavigableMap> + readResultsWithTimestamps(Result result) throws IOException { + return column.readResultsWithTimestamps(result, columnPrefixBytes); + } + + /** + * Retrieve an {@link ApplicationColumnPrefix} given a name, or null if there + * is no match. The following holds true: {@code columnFor(x) == columnFor(y)} + * if and only if {@code x.equals(y)} or {@code (x == y == null)} + * + * @param columnPrefix Name of the column to retrieve + * @return the corresponding {@link ApplicationColumnPrefix} or null + */ + public static final ApplicationColumnPrefix columnFor(String columnPrefix) { + + // Match column based on value, assume column family matches. + for (ApplicationColumnPrefix acp : ApplicationColumnPrefix.values()) { + // Find a match based only on name. + if (acp.getColumnPrefix().equals(columnPrefix)) { + return acp; + } + } + + // Default to null + return null; + } + + /** + * Retrieve an {@link ApplicationColumnPrefix} given a name, or null if there + * is no match. The following holds true: + * {@code columnFor(a,x) == columnFor(b,y)} if and only if + * {@code (x == y == null)} or {@code a.equals(b) & x.equals(y)} + * + * @param columnFamily The columnFamily for which to retrieve the column. + * @param columnPrefix Name of the column to retrieve + * @return the corresponding {@link ApplicationColumnPrefix} or null if both + * arguments don't match. + */ + public static final ApplicationColumnPrefix columnFor( + ApplicationColumnFamily columnFamily, String columnPrefix) { + + // TODO: needs unit test to confirm and need to update javadoc to explain + // null prefix case. + + for (ApplicationColumnPrefix acp : ApplicationColumnPrefix.values()) { + // Find a match based column family and on name. + if (acp.columnFamily.equals(columnFamily) + && (((columnPrefix == null) && (acp.getColumnPrefix() == null)) || (acp + .getColumnPrefix().equals(columnPrefix)))) { + return acp; + } + } + + // Default to null + return null; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java new file mode 100644 index 0000000..5f3868b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + +/** + * Represents a rowkey for the application table. + */ +public class ApplicationRowKey { + // TODO: more methods are needed for this class. + + // TODO: API needs to be cleaned up. + + /** + * Constructs a row key for the application table as follows: + * {@code clusterId!userName!flowId!flowRunId!AppId} + * + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @param appId + * @return byte array with the row key + */ + public static byte[] getRowKey(String clusterId, String userId, + String flowId, Long flowRunId, String appId) { + byte[] first = + Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, userId, + flowId)); + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + byte[] second = Bytes.toBytes(ApplicationRowKey.invert(flowRunId)); + byte[] third = Bytes.toBytes(appId); + return Separator.QUALIFIERS.join(first, second, third); + } + + /** + * Converts a timestamp into its inverse timestamp to be used in (row) keys + * where we want to have the most recent timestamp in the top of the table + * (scans start at the most recent timestamp first). + * + * @param key value to be inverted so that the latest version will be first in + * a scan. + * @return inverted long + */ + public static long invert(Long key) { + return Long.MAX_VALUE - key; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java new file mode 100644 index 0000000..9a798d8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEntitySchemaConstants; + +/** + * The application table as column families info, config and metrics. Info + * stores information about a YARN application entity, config stores + * configuration data of a YARN application, metrics stores the metrics of a + * YARN application. This table is entirely analogous to the entity table but + * created for better performance. + * + * Example application table record: + * + *
+ * |--------------------------------------------------------------------|
+ * |  Row       | Column Family           | Column Family| Column Family|
+ * |  key       | info                    | metrics      | config       |
+ * |--------------------------------------------------------------------|
+ * | clusterId! | id:appId                | metricId1:   | configKey1:  |
+ * | userName!  |                         | metricValue1 | configValue1 |
+ * | flowId!    | created_time:           | @timestamp1  |              |
+ * | flowRunId! | 1392993084018           |              | configKey2:  |
+ * | AppId      |                         | metriciD1:   | configValue2 |
+ * |            | modified_time:          | metricValue2 |              |
+ * |            | 1392995081012           | @timestamp2  |              |
+ * |            |                         |              |              |
+ * |            | i!infoKey:              | metricId2:   |              |
+ * |            | infoValue               | metricValue1 |              |
+ * |            |                         | @timestamp2  |              |
+ * |            | r!relatesToKey:         |              |              |
+ * |            | id3?id4?id5             |              |              |
+ * |            |                         |              |              |
+ * |            | s!isRelatedToKey:       |              |              |
+ * |            | id7?id9?id6             |              |              |
+ * |            |                         |              |              |
+ * |            | e!eventId?eventInfoKey: |              |              |
+ * |            | eventInfoValue          |              |              |
+ * |            | @timestamp              |              |              |
+ * |            |                         |              |              |
+ * |            | flowVersion:            |              |              |
+ * |            | versionValue            |              |              |
+ * |--------------------------------------------------------------------|
+ * 
+ */ +public class ApplicationTable extends BaseTable { + /** application prefix */ + private static final String PREFIX = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".application"; + + /** config param name that specifies the application table name */ + public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; + + /** + * config param name that specifies the TTL for metrics column family in + * application table + */ + private static final String METRICS_TTL_CONF_NAME = PREFIX + + ".table.metrics.ttl"; + + /** default value for application table name */ + private static final String DEFAULT_TABLE_NAME = + "timelineservice.application"; + + /** default TTL is 30 days for metrics timeseries */ + private static final int DEFAULT_METRICS_TTL = 2592000; + + /** default max number of versions */ + private static final int DEFAULT_METRICS_MAX_VERSIONS = 1000; + + private static final Log LOG = LogFactory.getLog(ApplicationTable.class); + + public ApplicationTable() { + super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable + * (org.apache.hadoop.hbase.client.Admin, + * org.apache.hadoop.conf.Configuration) + */ + public void createTable(Admin admin, Configuration hbaseConf) + throws IOException { + + TableName table = getTableName(hbaseConf); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor applicationTableDescp = new HTableDescriptor(table); + HColumnDescriptor infoCF = + new HColumnDescriptor(ApplicationColumnFamily.INFO.getBytes()); + infoCF.setBloomFilterType(BloomType.ROWCOL); + applicationTableDescp.addFamily(infoCF); + + HColumnDescriptor configCF = + new HColumnDescriptor(ApplicationColumnFamily.CONFIGS.getBytes()); + configCF.setBloomFilterType(BloomType.ROWCOL); + configCF.setBlockCacheEnabled(true); + applicationTableDescp.addFamily(configCF); + + HColumnDescriptor metricsCF = + new HColumnDescriptor(ApplicationColumnFamily.METRICS.getBytes()); + applicationTableDescp.addFamily(metricsCF); + metricsCF.setBlockCacheEnabled(true); + // always keep 1 version (the latest) + metricsCF.setMinVersions(1); + metricsCF.setMaxVersions(DEFAULT_METRICS_MAX_VERSIONS); + metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME, + DEFAULT_METRICS_TTL)); + applicationTableDescp + .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); + applicationTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", + TimelineEntitySchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); + admin.createTable(applicationTableDescp, + TimelineEntitySchemaConstants.getUsernameSplits()); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + } + + /** + * @param metricsTTL time to live parameter for the metrics in this table. + * @param hbaseConf configuration in which to set the metrics TTL config + * variable. + */ + public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) { + hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java new file mode 100644 index 0000000..c60e6f5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 8b7bc3e..bdd4568 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -157,7 +157,7 @@ public Object readResult(Result result, String qualifier) throws IOException { * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result) */ - public NavigableMap> + public NavigableMap> readResultsWithTimestamps(Result result) throws IOException { return column.readResultsWithTimestamps(result, columnPrefixBytes); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 61958c2..efc2783 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -53,15 +53,16 @@ } /** - * Constructs a row key prefix for the entity table as follows: - * {@code userName!clusterId!flowId!flowRunId!AppId} + * Constructs a row key for the entity table as follows: + * {@code userName!clusterId!flowId!flowRunId!AppId!entityType!entityId} * * @param clusterId * @param userId * @param flowId * @param flowRunId * @param appId - * @return byte array with the row key prefix + * @param te + * @return byte array with the row key */ public static byte[] getRowKey(String clusterId, String userId, String flowId, Long flowRunId, String appId, TimelineEntity te) { @@ -78,7 +79,7 @@ } /** - * Converts a timestamp into it's inverse timestamp to be used in (row) keys + * Converts a timestamp into its inverse timestamp to be used in (row) keys * where we want to have the most recent timestamp in the top of the table * (scans start at the most recent timestamp first). * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java index 31cb5d2..cd000ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -38,15 +38,18 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; -import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; @@ -71,6 +74,192 @@ public static void setupBeforeClass() throws Exception { private static void createSchema() throws IOException { new EntityTable() .createTable(util.getHBaseAdmin(), util.getConfiguration()); + new ApplicationTable() + .createTable(util.getHBaseAdmin(), util.getConfiguration()); + } + + @Test + public void testWriteApplicationToHBase() throws Exception { + TimelineEntities te = new TimelineEntities(); + ApplicationEntity entity = new ApplicationEntity(); + String id = "hello"; + entity.setId(id); + Long cTime = 1425016501000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + + // add the info map in Timeline Entity + Map infoMap = new HashMap(); + infoMap.put("infoMapKey1", "infoMapValue1"); + infoMap.put("infoMapKey2", 10); + entity.addInfo(infoMap); + + // add the isRelatedToEntity info + String key = "task"; + String value = "is_related_to_entity_id_here"; + Set isRelatedToSet = new HashSet(); + isRelatedToSet.add(value); + Map> isRelatedTo = new HashMap>(); + isRelatedTo.put(key, isRelatedToSet); + entity.setIsRelatedToEntities(isRelatedTo); + + // add the relatesTo info + key = "container"; + value = "relates_to_entity_id_here"; + Set relatesToSet = new HashSet(); + relatesToSet.add(value); + value = "relates_to_entity_id_here_Second"; + relatesToSet.add(value); + Map> relatesTo = new HashMap>(); + relatesTo.put(key, relatesToSet); + entity.setRelatesToEntities(relatesTo); + + // add some config entries + Map conf = new HashMap(); + conf.put("config_param1", "value1"); + conf.put("config_param2", "value2"); + entity.addConfigs(conf); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map metricValues = new HashMap(); + long ts = System.currentTimeMillis(); + metricValues.put(ts - 120000, 100000000); + metricValues.put(ts - 100000, 200000000); + metricValues.put(ts - 80000, 300000000); + metricValues.put(ts - 60000, 400000000); + metricValues.put(ts - 40000, 50000000000L); + metricValues.put(ts - 20000, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + + te.addEntity(entity); + + HBaseTimelineWriterImpl hbi = null; + try { + Configuration c1 = util.getConfiguration(); + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String cluster = "cluster1"; + String user = "user1"; + String flow = "some_flow_name"; + String flowVersion = "AB7822C10F1111"; + long runid = 1002345678919L; + hbi.write(cluster, user, flow, flowVersion, runid, id, te); + hbi.stop(); + + // scan the table and see that entity exists + Scan s = new Scan(); + byte[] startRow = + ApplicationRowKey.getRowKey(cluster, user, flow, runid, id); + s.setStartRow(startRow); + s.setMaxVersions(Integer.MAX_VALUE); + Connection conn = ConnectionFactory.createConnection(c1); + ResultScanner scanner = + new ApplicationTable().getResultScanner(c1, conn, s); + + int rowCount = 0; + int colCount = 0; + for (Result result : scanner) { + if (result != null && !result.isEmpty()) { + rowCount++; + colCount += result.size(); + byte[] row1 = result.getRow(); + assertTrue(isApplicationRowKeyCorrect(row1, cluster, user, flow, + runid, id)); + + // check info column family + String id1 = ApplicationColumn.ID.readResult(result).toString(); + assertEquals(id, id1); + + Number val = + (Number) ApplicationColumn.CREATED_TIME.readResult(result); + Long cTime1 = val.longValue(); + assertEquals(cTime1, cTime); + + val = (Number) ApplicationColumn.MODIFIED_TIME.readResult(result); + Long mTime1 = val.longValue(); + assertEquals(mTime1, mTime); + + Map infoColumns = + ApplicationColumnPrefix.INFO.readResults(result); + assertEquals(infoMap.size(), infoColumns.size()); + for (String infoItem : infoMap.keySet()) { + assertEquals(infoMap.get(infoItem), + infoColumns.get(infoItem)); + } + + // Remember isRelatedTo is of type Map> + for (String isRelatedToKey : isRelatedTo.keySet()) { + Object isRelatedToValue = + ApplicationColumnPrefix.IS_RELATED_TO.readResult(result, + isRelatedToKey); + String compoundValue = isRelatedToValue.toString(); + // id7?id9?id6 + Set isRelatedToValues = + new HashSet( + Separator.VALUES.splitEncoded(compoundValue)); + assertEquals(isRelatedTo.get(isRelatedToKey).size(), + isRelatedToValues.size()); + for (String v : isRelatedTo.get(isRelatedToKey)) { + assertTrue(isRelatedToValues.contains(v)); + } + } + + // RelatesTo + for (String relatesToKey : relatesTo.keySet()) { + String compoundValue = + ApplicationColumnPrefix.RELATES_TO.readResult(result, + relatesToKey).toString(); + // id3?id4?id5 + Set relatesToValues = + new HashSet( + Separator.VALUES.splitEncoded(compoundValue)); + assertEquals(relatesTo.get(relatesToKey).size(), + relatesToValues.size()); + for (String v : relatesTo.get(relatesToKey)) { + assertTrue(relatesToValues.contains(v)); + } + } + + // Configuration + Map configColumns = + ApplicationColumnPrefix.CONFIG.readResults(result); + assertEquals(conf.size(), configColumns.size()); + for (String configItem : conf.keySet()) { + assertEquals(conf.get(configItem), configColumns.get(configItem)); + } + + NavigableMap> metricsResult = + ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result); + + NavigableMap metricMap = metricsResult.get(m1.getId()); + // We got metrics back + assertNotNull(metricMap); + // Same number of metrics as we wrote + assertEquals(metricValues.entrySet().size(), metricMap.entrySet() + .size()); + + // Iterate over original metrics and confirm that they are present + // here. + for (Entry metricEntry : metricValues.entrySet()) { + assertEquals(metricEntry.getValue(), + metricMap.get(metricEntry.getKey())); + } + } + } + assertEquals(1, rowCount); + assertEquals(16, colCount); + + } finally { + hbi.stop(); + hbi.close(); + } } @Test @@ -281,6 +470,20 @@ private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, return true; } + private boolean isApplicationRowKeyCorrect(byte[] rowKey, String cluster, + String user, String flow, Long runid, String appName) { + + byte[][] rowKeyComponents = Separator.QUALIFIERS.split(rowKey, -1); + + assertTrue(rowKeyComponents.length == 5); + assertEquals(cluster, Bytes.toString(rowKeyComponents[0])); + assertEquals(user, Bytes.toString(rowKeyComponents[1])); + assertEquals(flow, Bytes.toString(rowKeyComponents[2])); + assertEquals(EntityRowKey.invert(runid), Bytes.toLong(rowKeyComponents[3])); + assertEquals(appName, Bytes.toString(rowKeyComponents[4])); + return true; + } + private void testAdditionalEntity() throws IOException { TimelineEvent event = new TimelineEvent(); String eventId = "foo_event_id";