diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 61fa1d7..f156665 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -151,4 +151,12 @@ public boolean containsTimelineCollector(ApplicationId appId) { return collectors.containsKey(appId); } + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + if (writer != null) { + writer.close(); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamilyDetails.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamilyDetails.java new file mode 100644 index 0000000..915acba --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnFamilyDetails.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Contains the Column family names and byte representations for + * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * object that is stored in hbase + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public enum EntityColumnFamilyDetails { + INFO("i"), + CONFIG("c"), + METRICS("m"); + + private final String value; + private final byte[] inBytes; + + private EntityColumnFamilyDetails(String value) { + this.value = value; + this.inBytes = Bytes.toBytes(this.value.toLowerCase()); + } + + byte[] getInBytes() { + return inBytes; + } + + public String getValue() { + return value; + } + + // TODO add a method that accepts a byte array, + // iterates over the enum and returns an enum from those bytes +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityInfoColumnDetails.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityInfoColumnDetails.java new file mode 100644 index 0000000..ab30790 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityInfoColumnDetails.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Contains the Info column family details such as + * Column names, types and byte representations for + * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * object that is stored in hbase + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public enum EntityInfoColumnDetails { + ID("id", String.class), + TYPE("type", String.class), + CREATED_TIME("created_time", Long.class), + MODIFIED_TIME("modified_time", Long.class), + FLOW_VERSION("flow_version", String.class), + PREFIX_IS_RELATED_TO("r", null), + PREFIX_RELATES_TO("s", null), + PREFIX_EVENTS("e", null); + + private final String value; + private final Class className; + private final byte[] inBytes; + + private EntityInfoColumnDetails(String value, Class className) { + this.value = value; + this.className = className; + this.inBytes = Bytes.toBytes(this.value.toLowerCase()); + } + + public String getValue() { + return value; + } + + public Class getClassName() { + return className; + } + + byte[] getInBytes() { + return inBytes; + } + + // TODO add a method that accepts a byte array, + // iterates over the enum and returns an enum from those bytes +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java new file mode 100644 index 0000000..b93919a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineEntitySchemaConstants; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; + +/** + * This implements a hbase based backend for storing application timeline entity + * information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class HBaseTimelineWriterImpl extends AbstractService implements + TimelineWriter { + + private final TableName entityTableName; + private Connection conn; + private BufferedMutator entityTable; + + private static final Log LOG = LogFactory + .getLog(HBaseTimelineWriterImpl.class); + + public HBaseTimelineWriterImpl() { + super("HBaseTimelineWriterImpl"); + entityTableName = TableName + .valueOf(TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME); + } + + public HBaseTimelineWriterImpl(Configuration conf) throws IOException { + super(conf.get("yarn.application.id", "1")); + conf.addResource("hbase-site.xml"); + entityTableName = TableName.valueOf(conf.get( + TimelineEntitySchemaConstants.ENTITY_TABLE_NAME, + TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME)); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + conn = ConnectionFactory.createConnection(conf); + entityTable = conn.getBufferedMutator(entityTableName); + } + + @Override + public TimelineWriteResponse write(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntities data) throws IOException { + + byte[] rowKeyPrefix = TimelineWriterUtils.getRowKeyPrefix(clusterId, + userId, flowName, flowRunId, appId); + + TimelineWriteResponse putStatus = new TimelineWriteResponse(); + for (TimelineEntity te : data.getEntities()) { + // get row key + byte[] row = TimelineWriterUtils.join( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, rowKeyPrefix, + Bytes.toBytes(te.getType()), Bytes.toBytes(te.getId())); + + doInfoPut(row, te, flowVersion); + doEventPut(row, te.getEvents()); + doConfigPut(row, te.getConfigs()); + doMetricsPut(row, te.getMetrics()); + doRelationPuts(row, te.getIsRelatedToEntities(), + EntityInfoColumnDetails.PREFIX_IS_RELATED_TO.getInBytes()); + doRelationPuts(row, te.getRelatesToEntities(), + EntityInfoColumnDetails.PREFIX_RELATES_TO.getInBytes()); + } + + return putStatus; + } + + /** + * executes Put(s) for relatedTo or isRelatedTo entities + * + * @param row + * @param connectedEntities + * @param columnNamePrefix + * @return void + * @throws IOException + */ + private void doRelationPuts(byte[] row, + Map> connectedEntities, byte[] columnNamePrefix) + throws IOException { + List allPuts = new ArrayList(); + for (Map.Entry> entry : connectedEntities.entrySet()) { + String key = entry.getKey(); + Set value = entry.getValue(); + if ((value != null) && (value.size() > 0)) { + // create the columnName as prefix!key + byte[] columnName = TimelineWriterUtils.join( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, + columnNamePrefix, Bytes.toBytes(key)); + Put relatesToEntitiesPut = new Put(row); + relatesToEntitiesPut.addColumn(EntityColumnFamilyDetails.INFO + .getInBytes(), columnName, Bytes.toBytes(TimelineWriterUtils + .getValueAsString(TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, + value))); + allPuts.add(relatesToEntitiesPut); + } + } + if (allPuts.size() > 0) { + entityTable.mutate(allPuts); + } + } + + /** + * executes the Put(s) for storing {@linkplain TimelineEntity} info to the + * backend + * + * @param row + * @param te + * @return void + * @throws IOException + */ + private void doInfoPut(byte[] row, TimelineEntity te, String flowVersion) + throws IOException { + Put infoPut = new Put(row); + infoPut.addColumn(EntityColumnFamilyDetails.INFO.getInBytes(), + EntityInfoColumnDetails.ID.getInBytes(), Bytes.toBytes(te.getId())); + infoPut.addColumn(EntityColumnFamilyDetails.INFO.getInBytes(), + EntityInfoColumnDetails.TYPE.getInBytes(), Bytes.toBytes(te.getType())); + infoPut.addColumn(EntityColumnFamilyDetails.INFO.getInBytes(), + EntityInfoColumnDetails.CREATED_TIME.getInBytes(), + Bytes.toBytes(te.getCreatedTime())); + infoPut.addColumn(EntityColumnFamilyDetails.INFO.getInBytes(), + EntityInfoColumnDetails.MODIFIED_TIME.getInBytes(), + Bytes.toBytes(te.getModifiedTime())); + infoPut.addColumn(EntityColumnFamilyDetails.INFO.getInBytes(), + EntityInfoColumnDetails.FLOW_VERSION.getInBytes(), + Bytes.toBytes(flowVersion)); + + entityTable.mutate(infoPut); + } + + /** + * executes Put(s) for the config object in the {@linkplain TimelineEntity} + * + * @param rowKey + * @param map + * @return {@link Put} + * @throws IOException + */ + private void doConfigPut(byte[] rowKey, Map map) + throws IOException { + List allPuts = new ArrayList(); + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (value != null) { + Put configPut = new Put(rowKey); + configPut.addColumn(EntityColumnFamilyDetails.CONFIG.getInBytes(), + Bytes.toBytes(key), GenericObjectMapper.write(value)); + allPuts.add(configPut); + } + } + if (allPuts.size() > 0) { + entityTable.mutate(allPuts); + } + } + + /** + * executes Put(s) for the {@linkplain TimelineMetric} object + * + * @param row + * @param metrics + * @return {@linkplain Put} + * @throws IOException + */ + private void doMetricsPut(byte[] row, Set metrics) + throws IOException { + List allPuts = new ArrayList(); + for (TimelineMetric metric : metrics) { + String key = metric.getId(); + byte[] keyBytes = Bytes.toBytes(key); + Map timeseries = metric.getValues(); + for (Map.Entry entry : timeseries.entrySet()) { + Long timestamp = entry.getKey(); + Number value = entry.getValue(); + if (value != null) { + Cell cell = CellUtil.createCell(row, + EntityColumnFamilyDetails.METRICS.getInBytes(), + keyBytes, + // set the cell timestamp + timestamp, + // KeyValue Type minimum + TimelineEntitySchemaConstants.ZERO_BYTES, + GenericObjectMapper.write(value)); + try { + Put metricsPut = new Put(row); + metricsPut.add(cell); + allPuts.add(metricsPut); + } catch (IOException e) { + LOG.error("Caught exception while adding cell to Put ", e); + } + } + } + } + if (allPuts.size() > 0) { + entityTable.mutate(allPuts); + } + } + + /** + * executes put(s) for the {@linkplain TimelineEvent} object + * + * @param row + * @param metrics + * @return {@linkplain Put} + * @throws IOException + */ + private void doEventPut(byte[] row, Set events) + throws IOException { + List allPuts = new ArrayList(); + for (TimelineEvent event : events) { + String id = event.getId(); + if (id != null) { + byte[] idBytes = Bytes.toBytes(id); + + Map eventInfo = event.getInfo(); + if (eventInfo != null) { + for (Map.Entry info : eventInfo.entrySet()) { + String key = info.getKey(); + if ((info.getValue() != null) && (id != null)) { + Put eventsPut = new Put(row); + eventsPut.addColumn(EntityColumnFamilyDetails.INFO.getInBytes(), + TimelineWriterUtils.join( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, + EntityInfoColumnDetails.PREFIX_EVENTS.getInBytes(), + idBytes, Bytes.toBytes(key)), + GenericObjectMapper.write(info.getValue())); + allPuts.add(eventsPut); + } + } + } + } + } + if (allPuts.size() > 0) { + entityTable.mutate(allPuts); + } + } + + @Override + public TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException { + return null; + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + try { + LOG.info("closing entity table"); + entityTable.close(); + } catch (IOException e) { + LOG.error( + "Caught exception trying to close " + + entityTableName.getNameAsString() + "'s buffered mutator ", e); + } + try { + conn.close(); + } catch (IOException e) { + LOG.error("Caught exception trying to close conn ", e); + } + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java new file mode 100644 index 0000000..2a2db81 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class Range { + private final int startIdx; + private final int endIdx; + + /** + * Defines a range from start index (inclusive) to end index (exclusive). + * + * @param start + * Starting index position + * @param end + * Ending index position (exclusive) + */ + public Range(int start, int end) { + if (start < 0 || end < start) { + throw new IllegalArgumentException( + "Invalid range, required that: 0 <= start <= end; start=" + start + + ", end=" + end); + } + + this.startIdx = start; + this.endIdx = end; + } + + public int start() { + return startIdx; + } + + public int end() { + return endIdx; + } + + public int length() { + return endIdx - startIdx; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java new file mode 100644 index 0000000..d259965 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineEntitySchemaConstants.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * contains the constants used in the context of schema accesses for + * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * information + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TimelineEntitySchemaConstants { + /** default value for entity table name */ + public static final String DEFAULT_ENTITY_TABLE_NAME = "timelineservice.entity"; + + /** in bytes default value for entity table name */ + static final byte[] DEFAULT_ENTITY_TABLE_NAME_BYTES = + Bytes.toBytes(DEFAULT_ENTITY_TABLE_NAME); + + /** separator in row key */ + public static final String ROW_KEY_SEPARATOR = "!"; + + /** byte representation of the separator in row key */ + static final byte[] ROW_KEY_SEPARATOR_BYTES = Bytes + .toBytes(ROW_KEY_SEPARATOR); + + public static final byte ZERO_BYTES = 0; + + /** config param name that specifies the entity table name */ + public static final String ENTITY_TABLE_NAME = "entity.table.name"; + + /** config param name that specifies the TTL for metrics column family in entity table */ + public static final String ENTITY_TABLE_METRICS_TTL = "entity.table.metrics.ttl"; + + /** default TTL is 30 days for metrics timeseries */ + public static final int ENTITY_TABLE_METRICS_TTL_DEFAULT = 2592000; + + /** default max number of versions */ + public static final int ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT = 1000; +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java new file mode 100644 index 0000000..f98e62e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.hadoop.util.GenericOptionsParser; + +/** + * This creates the schema for a hbase based backend for storing application + * timeline information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TimelineSchemaCreator { + + final static String NAME = TimelineSchemaCreator.class.getSimpleName(); + private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class); + final static byte[][] splits = { Bytes.toBytes("a"), Bytes.toBytes("ad"), + Bytes.toBytes("an"), Bytes.toBytes("b"), Bytes.toBytes("ca"), + Bytes.toBytes("cl"), Bytes.toBytes("d"), Bytes.toBytes("e"), + Bytes.toBytes("f"), Bytes.toBytes("g"), Bytes.toBytes("h"), + Bytes.toBytes("i"), Bytes.toBytes("j"), Bytes.toBytes("k"), + Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"), + Bytes.toBytes("o"), Bytes.toBytes("q"), Bytes.toBytes("r"), + Bytes.toBytes("s"), Bytes.toBytes("se"), Bytes.toBytes("t"), + Bytes.toBytes("u"), Bytes.toBytes("v"), Bytes.toBytes("w"), + Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") }; + + public static final String SPLIT_KEY_PREFIX_LENGTH = "4"; + + public static void main(String[] args) throws Exception { + + Configuration hbaseConf = HBaseConfiguration.create(); + // Grab input args and allow for -Dxyz style arguments + String[] otherArgs = new GenericOptionsParser(hbaseConf, args) + .getRemainingArgs(); + + // Grab the arguments we're looking for. + CommandLine commandLine = parseArgs(otherArgs); + + // Grab the entityTableName argument + String entityTableName = commandLine.getOptionValue("e"); + if (StringUtils.isNotBlank(entityTableName)) { + hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_NAME, + entityTableName); + } + String entityTable_TTL_Metrics = commandLine.getOptionValue("m"); + if (StringUtils.isNotBlank(entityTable_TTL_Metrics)) { + hbaseConf.set(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL, + entityTable_TTL_Metrics); + } + createAllTables(hbaseConf); + } + + /** + * Parse command-line arguments. + * + * @param args + * command line arguments passed to program. + * @return parsed command line. + * @throws ParseException + */ + private static CommandLine parseArgs(String[] args) throws ParseException { + Options options = new Options(); + + // Input + Option o = new Option("e", "entityTableName", true, "entity table name"); + o.setArgName("entityTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option("m", "metricsTTL", true, "TTL for metrics column family"); + o.setArgName("metricsTTL"); + o.setRequired(false); + options.addOption(o); + + CommandLineParser parser = new PosixParser(); + CommandLine commandLine = null; + try { + commandLine = parser.parse(options, args); + } catch (Exception e) { + LOG.error("ERROR: " + e.getMessage() + "\n"); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(NAME + " ", options, true); + System.exit(-1); + } + + return commandLine; + } + + private static void createAllTables(Configuration hbaseConf) + throws IOException { + + Connection conn = null; + try { + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + if (admin == null) { + throw new IOException("Cannot create table since admin is null"); + } + createTimelineEntityTable(admin, hbaseConf); + } finally { + if (conn != null) { + conn.close(); + } + } + } + + /** + * Creates a table with column families info, config and metrics + * info stores information about a timeline entity object + * config stores configuration data of a timeline entity object + * metrics stores the metrics of a timeline entity object + * + * Example entity table record: + *
+   *|------------------------------------------------------------|
+   *|  Row       | Column Family  | Column Family | Column Family|
+   *|  key       | info           | metrics       | config       |
+   *|------------------------------------------------------------|
+   *| userName!  | id:entityId    | metricName1:  | configKey1:  |
+   *| clusterId! |                | metricValue1  | configValue1 |
+   *| flowId!    | type:entityType| @timestamp1   |              |
+   *| flowRunId! |                |               | configKey2:  |
+   *| AppId!     | created_time:  | metricName1:  | configValue2 |
+   *| entityType!| 1392993084018  | metricValue2  |              |
+   *| entityId   |                | @timestamp2   |              |
+   *|            | modified_time: |               |              |
+   *|            | 1392995081012  | metricName2:  |              |
+   *|            |                | metricValue1  |              |
+   *|            | r!relatesToKey:| @timestamp2   |              |
+   *|            | id3!id4!id5    |               |              |
+   *|            |                |               |              |
+   *|            | s!isRelatedToKey|              |              |
+   *|            | id7!id9!id5    |               |              |
+   *|            |                |               |              |
+   *|            | e!eventKey:    |               |              |
+   *|            | eventValue     |               |              |
+   *|            |                |               |              |
+   *|            | flowVersion:   |               |              |
+   *|            | versionValue   |               |              |
+   *|------------------------------------------------------------|
+   *
+ * @param {@link org.apache.hadoop.hbase.client.Admin} + * @param {@link org.apache.hadoop.conf.Configuration} + * @throws IOException + */ + public static void createTimelineEntityTable(Admin admin, + Configuration hbaseConf) throws IOException { + + TableName table = TableName.valueOf(hbaseConf.get( + TimelineEntitySchemaConstants.ENTITY_TABLE_NAME, + TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME)); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor entityTableDescp = new HTableDescriptor(table); + HColumnDescriptor cf1 = new HColumnDescriptor( + EntityColumnFamilyDetails.INFO.getInBytes()); + cf1.setBloomFilterType(BloomType.ROWCOL); + entityTableDescp.addFamily(cf1); + + HColumnDescriptor cf2 = new HColumnDescriptor( + EntityColumnFamilyDetails.CONFIG.getInBytes()); + cf2.setBloomFilterType(BloomType.ROWCOL); + cf2.setBlockCacheEnabled(true); + entityTableDescp.addFamily(cf2); + + HColumnDescriptor cf3 = new HColumnDescriptor( + EntityColumnFamilyDetails.METRICS.getInBytes()); + entityTableDescp.addFamily(cf3); + cf3.setBlockCacheEnabled(true); + // always keep 1 version (the latest) + cf3.setMinVersions(1); + cf3.setMaxVersions(TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT); + cf3.setTimeToLive(hbaseConf.getInt( + TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL, + TimelineEntitySchemaConstants.ENTITY_TABLE_METRICS_TTL_DEFAULT)); + entityTableDescp + .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); + entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", + SPLIT_KEY_PREFIX_LENGTH); + admin.createTable(entityTableDescp, splits); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java new file mode 100644 index 0000000..8b04207 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.Range; + +/** + * bunch of utility functions used across TimelineWriter classes + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TimelineWriterUtils { + + /** empty bytes */ + public static final byte[] EMPTY_BYTES = new byte[0]; + private static final String SPACE = " "; + private static final String UNDERSCORE = "_"; + + private static final Log LOG = LogFactory + .getLog(TimelineWriterUtils.class); + + /** + * Returns a single byte array containing all of the individual component + * arrays separated by the separator array. + * + * @param separator + * @param components + * @return byte array after joining the components + */ + public static byte[] join(byte[] separator, byte[]... components) { + if (components == null || components.length == 0) { + return EMPTY_BYTES; + } + + int finalSize = 0; + if (separator != null) { + finalSize = separator.length * (components.length - 1); + } + for (byte[] comp : components) { + finalSize += comp.length; + } + + byte[] buf = new byte[finalSize]; + int offset = 0; + for (int i = 0; i < components.length; i++) { + System.arraycopy(components[i], 0, buf, offset, components[i].length); + offset += components[i].length; + if (i < (components.length - 1) && separator != null + && separator.length > 0) { + System.arraycopy(separator, 0, buf, offset, separator.length); + offset += separator.length; + } + } + return buf; + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see + * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}. + * + * @param source + * @param separator + * @return byte[] array after splitting the source + */ + public static byte[][] split(byte[] source, byte[] separator) { + return split(source, separator, -1); + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see + * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}. + * + * @param source + * @param separator + * @return byte[][] after splitting the input source + */ + public static byte[][] split(byte[] source, byte[] separator, int limit) { + List segments = splitRanges(source, separator, limit); + + byte[][] splits = new byte[segments.size()][]; + for (int i = 0; i < segments.size(); i++) { + Range r = segments.get(i); + byte[] tmp = new byte[r.length()]; + if (tmp.length > 0) { + System.arraycopy(source, r.start(), tmp, 0, r.length()); + } + splits[i] = tmp; + } + return splits; + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + */ + public static List splitRanges(byte[] source, byte[] separator) { + return splitRanges(source, separator, -1); + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + * @param source the source data + * @param separator the separator pattern to look for + * @param limit the maximum number of splits to identify in the source + */ + public static List splitRanges(byte[] source, byte[] separator, int limit) { + List segments = new ArrayList(); + int start = 0; + itersource: for (int i = 0; i < source.length; i++) { + for (int j = 0; j < separator.length; j++) { + if (source[i + j] != separator[j]) { + continue itersource; + } + } + // all separator elements matched + if (limit > 0 && segments.size() >= (limit-1)) { + // everything else goes in one final segment + break; + } + + segments.add(new Range(start, i)); + start = i + separator.length; + // i will be incremented again in outer for loop + i += separator.length-1; + } + // add in remaining to a final range + if (start <= source.length) { + segments.add(new Range(start, source.length)); + } + return segments; + } + + /** + * converts run id into it's inverse timestamp + * @param flowRunId + * @return inverted long + */ + public static long encodeRunId(Long flowRunId) { + return Long.MAX_VALUE - flowRunId; + } + + /** + * return a value from the Map as a String + * @param key + * @param values + * @return value as a String or "" + */ + public static String getValueAsString(final byte[] key, + final Map values) { + byte[] value = values.get(key); + if (value != null) { + return Bytes.toString(value); + } else { + return ""; + } + } + + /** + * return a value from the Map as a long + * @param key + * @param taskValues + * @return value as Long or 0L + */ + public static long getValueAsLong(final byte[] key, + final Map taskValues) { + byte[] value = taskValues.get(key); + if (value != null) { + try { + long retValue = Bytes.toLong(value); + return retValue; + } catch (NumberFormatException nfe) { + LOG.error("Caught NFE while converting to long ", nfe); + return 0L; + } catch (IllegalArgumentException iae ) { + // for exceptions like java.lang.IllegalArgumentException: + // offset (0) + length (8) exceed the capacity of the array: 7 + LOG.error("Caught IAE while converting to long ", iae); + return 0L; + } + } else { + return 0L; + } + } + + /** + * concates the values from a Set to return a single delimited string value + * @param rowKeySeparator + * @param values + * @return Value from the set of strings as a string + */ + public static String getValueAsString(String rowKeySeparator, + Set values) { + + StringBuilder concatStrings = new StringBuilder(); + for (String value : values) { + concatStrings.append(value); + concatStrings.append(rowKeySeparator); + } + // remove the last separator + if(concatStrings.length() > 1) { + concatStrings.deleteCharAt(concatStrings.lastIndexOf(rowKeySeparator)); + } + return concatStrings.toString(); + } + /** + * Constructs a row key prefix for the entity table + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @param appId + * @return byte array with the row key prefix + */ + static byte[] getRowKeyPrefix(String clusterId, String userId, String flowId, + Long flowRunId, String appId) { + return TimelineWriterUtils.join( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, + Bytes.toBytes(cleanse(userId)), Bytes.toBytes(cleanse(clusterId)), + Bytes.toBytes(cleanse(flowId)), + Bytes.toBytes(TimelineWriterUtils.encodeRunId(flowRunId)), + Bytes.toBytes(cleanse(appId))); + } + + /** + * Takes a string token to be used as a key or qualifier and + * cleanses out reserved tokens. + * This operation is not symmetrical. + * Logic is to replace all spaces and separator chars in input with + * underscores. + * + * @param token token to cleanse. + * @return String with no spaces and no separator chars + */ + public static String cleanse(String token) { + if (token == null || token.length() == 0) { return token; }; + + String cleansed = token.replaceAll(SPACE, UNDERSCORE); + cleansed = cleansed.replaceAll( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, UNDERSCORE); + + return cleansed; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java new file mode 100644 index 0000000..98ee2a1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; + +/** + * Unit test HBaseTimelineWriterImpl + * YARN 3411 + * + * @throws Exception + */ +public class TestHBaseTimelineWriterImpl { + + private static HBaseTestingUtility util; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + util.startMiniCluster(); + createSchema(); + } + + private static void createSchema() throws IOException { + byte[][] families = new byte[3][]; + families[0] = EntityColumnFamilyDetails.INFO.getInBytes(); + families[1] = EntityColumnFamilyDetails.CONFIG.getInBytes(); + families[2] = EntityColumnFamilyDetails.METRICS.getInBytes(); + TimelineSchemaCreator.createTimelineEntityTable(util.getHBaseAdmin(), + util.getConfiguration()); + } + + @Test + public void testWriteEntityToHBase() throws Exception { + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "hello"; + String type = "world"; + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + + // add the isRelatedToEntity info + String key = "task"; + String value = "is_related_to_entity_id_here"; + Set isRelatedToSet = new HashSet(); + isRelatedToSet.add(value); + Map> isRelatedTo = new HashMap>(); + isRelatedTo.put(key, isRelatedToSet); + entity.setIsRelatedToEntities(isRelatedTo); + + // add the relatesTo info + key = "container"; + value = "relates_to_entity_id_here"; + Set relatesToSet = new HashSet(); + relatesToSet.add(value); + value = "relates_to_entity_id_here_Second"; + relatesToSet.add(value); + Map> relatesTo = new HashMap>(); + relatesTo.put(key, relatesToSet); + entity.setRelatesToEntities(relatesTo); + + // add some config entries + Map conf = new HashMap(); + conf.put("config_param1", "value1"); + conf.put("config_param2", "value2"); + entity.addConfigs(conf); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map metricValues = new HashMap(); + metricValues.put(1429741609000L, 100000000); + metricValues.put(1429742609000L, 200000000); + metricValues.put(1429743609000L, 300000000); + metricValues.put(1429744609000L, 400000000); + metricValues.put(1429745609000L, 50000000000L); + metricValues.put(1429746609000L, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + + te.addEntity(entity); + + HBaseTimelineWriterImpl hbi = null; + try { + Configuration c1 = util.getConfiguration(); + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + String cluster = "cluster1"; + String user = "user1"; + String flow = "some_flow_name"; + String flowVersion = "AB7822C10F1111"; + long runid = 1002345678919L; + String appName = "some app name"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + hbi.stop(); + + // scan the table and see that entity exists + Scan s = new Scan(); + byte[] startRow = TimelineWriterUtils.getRowKeyPrefix(cluster, user, flow, + runid, appName); + s.setStartRow(startRow); + s.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = null; + TableName entityTableName = TableName + .valueOf(TimelineEntitySchemaConstants.DEFAULT_ENTITY_TABLE_NAME); + Connection conn = ConnectionFactory.createConnection(c1); + Table entityTable = conn.getTable(entityTableName); + int rowCount = 0; + int colCount = 0; + scanner = entityTable.getScanner(s); + for (Result result : scanner) { + if (result != null && !result.isEmpty()) { + rowCount++; + colCount += result.size(); + byte[] row1 = result.getRow(); + assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, + entity)); + + // check info column family + NavigableMap infoValues = result + .getFamilyMap(EntityColumnFamilyDetails.INFO.getInBytes()); + String id1 = TimelineWriterUtils.getValueAsString( + EntityInfoColumnDetails.ID.getInBytes(), infoValues); + assertEquals(id, id1); + String type1 = TimelineWriterUtils.getValueAsString( + EntityInfoColumnDetails.TYPE.getInBytes(), infoValues); + assertEquals(type, type1); + Long cTime1 = TimelineWriterUtils.getValueAsLong( + EntityInfoColumnDetails.CREATED_TIME.getInBytes(), infoValues); + assertEquals(cTime1, cTime); + Long mTime1 = TimelineWriterUtils.getValueAsLong( + EntityInfoColumnDetails.MODIFIED_TIME.getInBytes(), infoValues); + assertEquals(mTime1, mTime); + checkRelatedEntities(isRelatedTo, infoValues, + EntityInfoColumnDetails.PREFIX_IS_RELATED_TO.getInBytes()); + checkRelatedEntities(relatesTo, infoValues, + EntityInfoColumnDetails.PREFIX_RELATES_TO.getInBytes()); + + // check config column family + NavigableMap configValuesResult = result + .getFamilyMap(EntityColumnFamilyDetails.CONFIG.getInBytes()); + checkConfigs(configValuesResult, conf); + + NavigableMap metricsResult = result + .getFamilyMap(EntityColumnFamilyDetails.METRICS.getInBytes()); + checkMetricsSizeAndKey(metricsResult, metrics); + List metricCells = result.getColumnCells( + EntityColumnFamilyDetails.METRICS.getInBytes(), + Bytes.toBytes(m1.getId())); + checkMetricsTimeseries(metricCells, m1); + } + } + assertEquals(1, rowCount); + assertEquals(15, colCount); + + } finally { + hbi.stop(); + hbi.close(); + } + } + + private void checkMetricsTimeseries(List metricCells, + TimelineMetric m1) throws IOException { + Map timeseries = m1.getValues(); + assertEquals(metricCells.size(), timeseries.size()); + for (Cell c1 : metricCells) { + assertTrue(timeseries.containsKey(c1.getTimestamp())); + assertEquals(GenericObjectMapper.read(CellUtil.cloneValue(c1)), + timeseries.get(c1.getTimestamp())); + } + } + + private void checkMetricsSizeAndKey( + NavigableMap metricsResult, Set metrics) { + assertEquals(metrics.size(), metricsResult.size()); + for (TimelineMetric m1 : metrics) { + byte[] key = Bytes.toBytes(m1.getId()); + assertTrue(metricsResult.containsKey(key)); + } + } + + private void checkConfigs(NavigableMap configValuesResult, + Map conf) throws IOException { + + assertEquals(conf.size(), configValuesResult.size()); + byte[] columnName; + for (String key : conf.keySet()) { + columnName = Bytes.toBytes(key); + assertTrue(configValuesResult.containsKey(columnName)); + byte[] value = configValuesResult.get(columnName); + assertNotNull(value); + assertEquals(conf.get(key), GenericObjectMapper.read(value)); + } + } + + private void checkRelatedEntities(Map> isRelatedTo, + NavigableMap infoValues, byte[] columnPrefix) { + + for (String key : isRelatedTo.keySet()) { + byte[] columnName = TimelineWriterUtils.join( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES, columnPrefix, + Bytes.toBytes(key)); + + byte[] value = infoValues.get(columnName); + assertNotNull(value); + String isRelatedToEntities = Bytes.toString(value); + assertNotNull(isRelatedToEntities); + assertEquals(TimelineWriterUtils.getValueAsString( + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR, isRelatedTo.get(key)), + isRelatedToEntities); + } + } + + private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, + String flow, Long runid, String appName, TimelineEntity te) { + + byte[][] rowKeyComponents = TimelineWriterUtils.split(rowKey, + TimelineEntitySchemaConstants.ROW_KEY_SEPARATOR_BYTES); + + assertTrue(rowKeyComponents.length == 7); + assertEquals(user, Bytes.toString(rowKeyComponents[0])); + assertEquals(cluster, Bytes.toString(rowKeyComponents[1])); + assertEquals(flow, Bytes.toString(rowKeyComponents[2])); + assertEquals(TimelineWriterUtils.encodeRunId(runid), + Bytes.toLong(rowKeyComponents[3])); + assertEquals(TimelineWriterUtils.cleanse(appName), Bytes.toString(rowKeyComponents[4])); + assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5])); + assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6])); + return true; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } +}