{
+ /**
+ * Mapping column family houses known columns such as flowId and flowRunId
+ */
+ MAPPING("m");
+
+ /**
+ * Byte representation of this column family.
+ */
+ private final byte[] bytes;
+
+ /**
+ * @param value create a column family with this name. Must be lower case and
+ * without spaces.
+ */
+ AppToFlowColumnFamily(String value) {
+ // column families should be lower case and not contain any spaces.
+ this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
+ }
+
+ public byte[] getBytes() {
+ return Bytes.copy(bytes);
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
new file mode 100644
index 0000000..ad4fec6
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
+
+/**
+ * Represents a rowkey for the app_flow table.
+ */
+public class AppToFlowRowKey {
+ /**
+ * Constructs a row key prefix for the app_flow table as follows:
+ * {@code clusterId!AppId}
+ *
+ * @param clusterId
+ * @param appId
+ * @return byte array with the row key
+ */
+ public static byte[] getRowKey(String clusterId, String appId) {
+ return Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(clusterId, appId));
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
new file mode 100644
index 0000000..2467856
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowTable.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
+
+import java.io.IOException;
+
+/**
+ * The app_flow table as column families mapping. Mapping stores
+ * appId to flowId and flowRunId mapping information
+ *
+ * Example app_flow table record:
+ *
+ *
+ * |--------------------------------------|
+ * | Row | Column Family |
+ * | key | info |
+ * |--------------------------------------|
+ * | clusterId! | flowId: |
+ * | AppId | foo@daily_hive_report |
+ * | | |
+ * | | flowRunId: |
+ * | | 1452828720457 |
+ * | | |
+ * | | |
+ * | | |
+ * |--------------------------------------|
+ *
+ */
+public class AppToFlowTable extends BaseTable {
+ /** app_flow prefix */
+ private static final String PREFIX =
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + "app-flow";
+
+ /** config param name that specifies the app_flow table name */
+ public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
+
+ /** default value for app_flow table name */
+ private static final String DEFAULT_TABLE_NAME = "timelineservice.app_flow";
+
+ private static final Log LOG = LogFactory.getLog(AppToFlowTable.class);
+
+ public AppToFlowTable() {
+ super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable
+ * (org.apache.hadoop.hbase.client.Admin,
+ * org.apache.hadoop.conf.Configuration)
+ */
+ public void createTable(Admin admin, Configuration hbaseConf)
+ throws IOException {
+
+ TableName table = getTableName(hbaseConf);
+ if (admin.tableExists(table)) {
+ // do not disable / delete existing table
+ // similar to the approach taken by map-reduce jobs when
+ // output directory exists
+ throw new IOException("Table " + table.getNameAsString()
+ + " already exists.");
+ }
+
+ HTableDescriptor appToFlowTableDescp = new HTableDescriptor(table);
+ HColumnDescriptor mappCF =
+ new HColumnDescriptor(AppToFlowColumnFamily.MAPPING.getBytes());
+ mappCF.setBloomFilterType(BloomType.ROWCOL);
+ appToFlowTableDescp.addFamily(mappCF);
+
+ appToFlowTableDescp
+ .setRegionSplitPolicyClassName(
+ "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
+ appToFlowTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
+ TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+ admin.createTable(appToFlowTableDescp,
+ TimelineHBaseSchemaConstants.getUsernameSplits());
+ LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ + admin.tableExists(table));
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
new file mode 100644
index 0000000..df7ffc1
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
index e8d8b5c..abba79a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java
@@ -24,6 +24,8 @@
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
@@ -94,6 +96,20 @@ public ResultScanner getResultScanner(Configuration hbaseConf,
}
/**
+ *
+ * @param hbaseConf used to read settings that override defaults
+ * @param conn used to create table from
+ * @param get that specifies what single row you want to get from this table
+ * @return result of get operation
+ * @throws IOException
+ */
+ public Result getResult(Configuration hbaseConf, Connection conn, Get get)
+ throws IOException {
+ Table table = conn.getTable(getTableName(hbaseConf));
+ return table.get(get);
+ }
+
+ /**
* Get the table name for this table.
*
* @param hbaseConf
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
index 671c824..509ff49 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/ColumnPrefix.java
@@ -64,7 +64,7 @@ public void store(byte[] rowKey, TypedBufferedMutator tableMutator,
public Object readResult(Result result, String qualifier) throws IOException;
/**
- * @param resultfrom which to read columns
+ * @param result from which to read columns
* @return the latest values of columns in the column family with this prefix
* (or all of them if the prefix value is null).
* @throws IOException
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java
deleted file mode 100644
index 5518a27..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineEntitySchemaConstants.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.timelineservice.storage.common;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * contains the constants used in the context of schema accesses for
- * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
- * information
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TimelineEntitySchemaConstants {
-
- /**
- * Used to create a pre-split for tables starting with a username in the
- * prefix. TODO: this may have to become a config variable (string with
- * separators) so that different installations can presplit based on their own
- * commonly occurring names.
- */
- private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"),
- Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"),
- Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"),
- Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"),
- Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"),
- Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"),
- Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"),
- Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"),
- Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"),
- Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"),
- Bytes.toBytes("z") };
-
- /**
- * The length at which keys auto-split
- */
- public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4";
-
- /**
- * @return splits for splits where a user is a prefix.
- */
- public final static byte[][] getUsernameSplits() {
- byte[][] kloon = USERNAME_SPLITS.clone();
- // Deep copy.
- for (int row = 0; row < USERNAME_SPLITS.length; row++) {
- kloon[row] = Bytes.copy(USERNAME_SPLITS[row]);
- }
- return kloon;
- }
-
-}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
new file mode 100644
index 0000000..bbf498a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineHBaseSchemaConstants.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * contains the constants used in the context of schema accesses for
+ * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+ * information
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TimelineHBaseSchemaConstants {
+
+ /**
+ * Used to create a pre-split for tables starting with a username in the
+ * prefix. TODO: this may have to become a config variable (string with
+ * separators) so that different installations can presplit based on their own
+ * commonly occurring names.
+ */
+ private final static byte[][] USERNAME_SPLITS = { Bytes.toBytes("a"),
+ Bytes.toBytes("ad"), Bytes.toBytes("an"), Bytes.toBytes("b"),
+ Bytes.toBytes("ca"), Bytes.toBytes("cl"), Bytes.toBytes("d"),
+ Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"),
+ Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"),
+ Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"),
+ Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("q"),
+ Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("se"),
+ Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"),
+ Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"),
+ Bytes.toBytes("z") };
+
+ /**
+ * The length at which keys auto-split
+ */
+ public static final String USERNAME_SPLIT_KEY_PREFIX_LENGTH = "4";
+
+ /**
+ * @return splits for splits where a user is a prefix.
+ */
+ public final static byte[][] getUsernameSplits() {
+ byte[][] kloon = USERNAME_SPLITS.clone();
+ // Deep copy.
+ for (int row = 0; row < USERNAME_SPLITS.length; row++) {
+ kloon[row] = Bytes.copy(USERNAME_SPLITS[row]);
+ }
+ return kloon;
+ }
+
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
new file mode 100644
index 0000000..91d7ba4
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineReaderUtils.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timelineservice.storage.common;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class TimelineReaderUtils {
+ /**
+ *
+ * @param entityRelations the relations of an entity
+ * @param relationFilters the relations for filtering
+ * @return a boolean flag to indicate if both match
+ */
+ public static boolean matchRelations(
+ Map> entityRelations,
+ Map> relationFilters) {
+ for (Map.Entry> relation : relationFilters.entrySet()) {
+ Set ids = entityRelations.get(relation.getKey());
+ if (ids == null) {
+ return false;
+ }
+ for (String id : relation.getValue()) {
+ if (!ids.contains(id)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ *
+ * @param map the map of key/value pairs in an entity
+ * @param filters the map of key/value pairs for filtering
+ * @return a boolean flag to indicate if both match
+ */
+ public static boolean matchFilters(Map map,
+ Map filters) {
+ for (Map.Entry filter : filters.entrySet()) {
+ Object value = map.get(filter.getKey());
+ if (value == null) {
+ return false;
+ }
+ if (!value.equals(filter.getValue())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ *
+ * @param entityEvents the set of event objects in an entity
+ * @param eventFilters the set of event Ids for filtering
+ * @return a boolean flag to indicate if both match
+ */
+ public static boolean matchEventFilters(Set entityEvents,
+ Set eventFilters) {
+ Set eventIds = new HashSet();
+ for (TimelineEvent event : entityEvents) {
+ eventIds.add(event.getId());
+ }
+ for (String eventFilter : eventFilters) {
+ if (!eventIds.contains(eventFilter)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ *
+ * @param metrics the set of metric objects in an entity
+ * @param metricFilters the set of metric Ids for filtering
+ * @return a boolean flag to indicate if both match
+ */
+ public static boolean matchMetricFilters(Set metrics,
+ Set metricFilters) {
+ Set metricIds = new HashSet();
+ for (TimelineMetric metric : metrics) {
+ metricIds.add(metric.getId());
+ }
+
+ for (String metricFilter : metricFilters) {
+ if (!metricIds.contains(metricFilter)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
index 90da966..26e7748 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java
@@ -62,7 +62,7 @@
private final String columnQualifier;
private final byte[] columnQualifierBytes;
- private EntityColumn(ColumnFamily columnFamily,
+ EntityColumn(ColumnFamily columnFamily,
String columnQualifier) {
this.columnFamily = columnFamily;
this.columnQualifier = columnQualifier;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
index 8a95d12..7c63727 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnFamily.java
@@ -53,7 +53,7 @@
* @param value create a column family with this name. Must be lower case and
* without spaces.
*/
- private EntityColumnFamily(String value) {
+ EntityColumnFamily(String value) {
// column families should be lower case and not contain any spaces.
this.bytes = Bytes.toBytes(Separator.SPACE.encode(value));
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
index 8b7bc3e..58272ab 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java
@@ -80,7 +80,7 @@
* @param columnFamily that this column is stored in.
* @param columnPrefix for this column.
*/
- private EntityColumnPrefix(ColumnFamily columnFamily,
+ EntityColumnPrefix(ColumnFamily columnFamily,
String columnPrefix) {
column = new ColumnHelper(columnFamily);
this.columnFamily = columnFamily;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
index 3e17ad0..9a72be0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java
@@ -55,17 +55,45 @@
/**
* Constructs a row key prefix for the entity table as follows:
- * {@code userName!clusterId!flowId!flowRunId!AppId}
+ * {@code userName!clusterId!flowId!flowRunId!AppId!entityType!}
*
* @param clusterId
* @param userId
* @param flowId
* @param flowRunId
* @param appId
+ * @param entityType
* @return byte array with the row key prefix
*/
+ public static byte[] getRowKeyPrefix(String clusterId, String userId,
+ String flowId, Long flowRunId, String appId, String entityType) {
+ byte[] first =
+ Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
+ flowId));
+ // Note that flowRunId is a long, so we can't encode them all at the same
+ // time.
+ byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
+ byte[] third =
+ Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType, ""));
+ return Separator.QUALIFIERS.join(first, second, third);
+ }
+
+ /**
+ * Constructs a row key for the entity table as follows:
+ * {@code userName!clusterId!flowId!flowRunId!AppId!entityType!entityId}
+ *
+ * @param clusterId
+ * @param userId
+ * @param flowId
+ * @param flowRunId
+ * @param appId
+ * @param entityType
+ * @param entityId
+ * @return byte array with the row key
+ */
public static byte[] getRowKey(String clusterId, String userId,
- String flowId, Long flowRunId, String appId, TimelineEntity te) {
+ String flowId, Long flowRunId, String appId, String entityType,
+ String entityId) {
byte[] first =
Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(userId, clusterId,
flowId));
@@ -73,8 +101,8 @@
// time.
byte[] second = Bytes.toBytes(TimelineWriterUtils.invert(flowRunId));
byte[] third =
- Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, te.getType(),
- te.getId()));
+ Bytes.toBytes(Separator.QUALIFIERS.joinEncoded(appId, entityType,
+ entityId));
return Separator.QUALIFIERS.join(first, second, third);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
index 2ae7d39..f657a14 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java
@@ -29,7 +29,7 @@
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineEntitySchemaConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants;
/**
* The entity table as column families info, config and metrics. Info stores
@@ -75,7 +75,7 @@
public class EntityTable extends BaseTable {
/** entity prefix */
private static final String PREFIX =
- YarnConfiguration.TIMELINE_SERVICE_PREFIX + ".entity";
+ YarnConfiguration.TIMELINE_SERVICE_PREFIX + "entity";
/** config param name that specifies the entity table name */
public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name";
@@ -146,9 +146,9 @@ public void createTable(Admin admin, Configuration hbaseConf)
entityTableDescp
.setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length",
- TimelineEntitySchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
+ TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH);
admin.createTable(entityTableDescp,
- TimelineEntitySchemaConstants.getUsernameSplits());
+ TimelineHBaseSchemaConstants.getUsernameSplits());
LOG.info("Status of table creation for " + table.getNameAsString() + "="
+ admin.tableExists(table));
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
index fd5643d..ab02779 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
@@ -23,6 +23,7 @@
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -38,11 +39,15 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineWriterUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn;
@@ -71,6 +76,8 @@ public static void setupBeforeClass() throws Exception {
private static void createSchema() throws IOException {
new EntityTable()
.createTable(util.getHBaseAdmin(), util.getConfiguration());
+ new AppToFlowTable()
+ .createTable(util.getHBaseAdmin(), util.getConfiguration());
}
@Test
@@ -138,10 +145,15 @@ public void testWriteEntityToHBase() throws Exception {
te.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
+ HBaseTimelineReaderImpl hbr = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
+ hbi.start();
+ hbr = new HBaseTimelineReaderImpl();
+ hbr.init(c1);
+ hbr.start();
String cluster = "cluster1";
String user = "user1";
String flow = "some_flow_name";
@@ -255,9 +267,22 @@ public void testWriteEntityToHBase() throws Exception {
assertEquals(1, rowCount);
assertEquals(17, colCount);
+ TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
+ entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
+ Set es1 = hbr.getEntities(user, cluster, flow, runid,
+ appName, entity.getType(), null, null, null, null, null, null, null,
+ null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+ assertNotNull(e1);
+ assertEquals(1, es1.size());
} finally {
- hbi.stop();
- hbi.close();
+ if (hbi != null) {
+ hbi.stop();
+ hbi.close();
+ }
+ if (hbr != null) {
+ hbr.stop();
+ hbr.close();
+ }
}
// Somewhat of a hack, not a separate test in order not to have to deal with
@@ -283,7 +308,7 @@ private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user,
private void testAdditionalEntity() throws IOException {
TimelineEvent event = new TimelineEvent();
- String eventId = "foo_event_id";
+ String eventId = ApplicationMetricsConstants.CREATED_EVENT_TYPE;
event.setId(eventId);
Long expTs = 1436512802000L;
event.setTimestamp(expTs);
@@ -291,19 +316,23 @@ private void testAdditionalEntity() throws IOException {
Object expVal = "test";
event.addInfo(expKey, expVal);
- final TimelineEntity entity = new TimelineEntity();
- entity.setId("attempt_1329348432655_0001_m_000008_18");
- entity.setType("FOO_ATTEMPT");
+ final TimelineEntity entity = new ApplicationEntity();
+ entity.setId(ApplicationId.newInstance(0, 1).toString());
entity.addEvent(event);
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
+ HBaseTimelineReaderImpl hbr = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
+ hbi.start();
+ hbr = new HBaseTimelineReaderImpl();
+ hbr.init(c1);
+ hbr.start();
String cluster = "cluster2";
String user = "user2";
String flow = "other_flow_name";
@@ -352,9 +381,31 @@ private void testAdditionalEntity() throws IOException {
}
assertEquals(1, rowCount);
+ TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
+ entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
+ TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
+ entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
+ Set es1 = hbr.getEntities(user, cluster, flow, runid,
+ appName, entity.getType(), null, null, null, null, null, null, null,
+ null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+ Set es2 = hbr.getEntities(user, cluster, null, null,
+ appName, entity.getType(), null, null, null, null, null, null, null,
+ null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+ assertNotNull(e1);
+ assertNotNull(e2);
+ assertEquals(e1, e2);
+ assertEquals(1, es1.size());
+ assertEquals(1, es2.size());
+ assertEquals(es1, es2);
} finally {
- hbi.stop();
- hbi.close();
+ if (hbi != null) {
+ hbi.stop();
+ hbi.close();
+ }
+ if (hbr != null) {
+ hbr.stop();
+ hbr.close();
+ }
}
}
@@ -375,10 +426,15 @@ public void testAdditionalEntityEmptyEventInfo() throws IOException {
entities.addEntity(entity);
HBaseTimelineWriterImpl hbi = null;
+ HBaseTimelineReaderImpl hbr = null;
try {
Configuration c1 = util.getConfiguration();
hbi = new HBaseTimelineWriterImpl(c1);
hbi.init(c1);
+ hbi.start();
+ hbr = new HBaseTimelineReaderImpl();
+ hbr.init(c1);
+ hbr.start();
String cluster = "cluster_emptyeventkey";
String user = "user_emptyeventkey";
String flow = "other_flow_name";
@@ -430,13 +486,21 @@ public void testAdditionalEntityEmptyEventInfo() throws IOException {
}
assertEquals(1, rowCount);
+ TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appName,
+ entity.getType(), entity.getId(), EnumSet.of(TimelineReader.Field.ALL));
+ Set es1 = hbr.getEntities(user, cluster, flow, runid,
+ appName, entity.getType(), null, null, null, null, null, null, null,
+ null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
+ assertNotNull(e1);
+ assertEquals(1, es1.size());
} finally {
hbi.stop();
hbi.close();
+ hbr.stop();;
+ hbr.close();
}
}
-
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();