diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java new file mode 100644 index 0000000..67aad1e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Contains the Column names, types and byte representations for + * {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity} + * object that is stored in hbase + * + */ +public enum EntityColumnDetails { + ID(String.class), + TYPE(String.class), + CREATED_TIME(Long.class), + MODIFIED_TIME(Long.class), + FLOW_VERSION(String.class); + + private final Class className; + private final byte[] inBytes; + + private EntityColumnDetails(Class className) { + this.className = className; + this.inBytes = Bytes.toBytes(this.name().toLowerCase()); + } + + public Class getClassName() { + return className; + } + + + byte[] getInBytes() { + return inBytes; + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTableDetails.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTableDetails.java new file mode 100644 index 0000000..39ac0d0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTableDetails.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.hbase.util.Bytes; + +public class EntityTableDetails { + /** default value for entity table name */ + public static final String DEFAULT_ENTITY_TABLE_NAME = "timelineservice.entity"; + + /** in bytes default value for entity table name */ + public static final byte[] DEFAULT_ENTITY_TABLE_NAME_BYTES = + Bytes.toBytes(DEFAULT_ENTITY_TABLE_NAME); + + /** column family info */ + public static final String COLUMN_FAMILY_INFO = "i"; + + /** byte representation for column family info */ + static final byte[] COLUMN_FAMILY_INFO_BYTES = Bytes + .toBytes(COLUMN_FAMILY_INFO); + + /** column family metrics */ + public static final String COLUMN_FAMILY_METRICS = "m"; + + /** byte representation for column family metrics */ + static final byte[] COLUMN_FAMILY_METRICS_BYTES = Bytes + .toBytes(COLUMN_FAMILY_METRICS); + + /** column family config */ + public static final String COLUMN_FAMILY_CONFIG = "c"; + + /** byte representation for column family config */ + static final byte[] COLUMN_FAMILY_CONFIG_BYTES = Bytes + .toBytes(COLUMN_FAMILY_CONFIG); + + /** column prefix for events */ + public static final String COLUMN_PREFIX_EVENTS = "e"; + + /** byte representation for column family config */ + static final byte[] COLUMN_PREFIX_EVENTS_BYTES = Bytes + .toBytes(COLUMN_PREFIX_EVENTS); + + /** column prefix for relatesTo */ + public static final String COLUMN_PREFIX_RELATES_TO = "r"; + + /** byte representation for COLUMN_PREFIX_RELATES_TO */ + static final byte[] COLUMN_PREFIX_RELATES_TO_BYTES = Bytes + .toBytes(COLUMN_PREFIX_RELATES_TO); + + /** column prefix for isRelatedTo */ + public static final String COLUMN_PREFIX_IS_RELATED_TO = "ir"; + + /** byte representation for COLUMN_PREFIX_RELATES_TO */ + static final byte[] COLUMN_PREFIX_IS_RELATED_TO_BYTES = Bytes + .toBytes(COLUMN_PREFIX_IS_RELATED_TO); + + /** separator in row key */ + public static final String ROW_KEY_SEPARATOR = "!"; + + /** zero bytes */ + public static final byte ZERO_BYTES = 0; + + /** byte representation of the separator in row key */ + static final byte[] ROW_KEY_SEPARATOR_BYTES = Bytes + .toBytes(ROW_KEY_SEPARATOR); + + /** config param name that specifies the entity table name */ + public static final String ENTITY_TABLE_NAME = "entity.table.name"; + + /** config param name that specifies the TTL for metrics column family in entity table */ + public static final String ENTITY_TABLE_METRICS_TTL = "entity.table.metrics.ttl"; + + /** default TTL is 30 days for metrics timeseries */ + public static final int ENTITY_TABLE_METRICS_TTL_DEFAULT = 2592000; + + /** default max number of versions */ + public static final int ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT = 200; + + /** + * Constructs a row key prefix for the entity table + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @param appId + * @return byte array with the row key prefix + */ + static byte[] getRowKeyPrefix(String clusterId, String userId, String flowId, + Long flowRunId, String appId) { + return TimelineWriterUtils.join(EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, + Bytes.toBytes(userId), Bytes.toBytes(clusterId), Bytes.toBytes(flowId), + Bytes.toBytes(TimelineWriterUtils.encodeRunId(flowRunId)), + Bytes.toBytes(appId)); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java new file mode 100644 index 0000000..1f6fa0b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.EntityTableDetails; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; + +/** + * This implements a hbase based backend for storing application timeline entity + * information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class HBaseTimelineWriterImpl extends AbstractService implements + TimelineWriter { + + private final TableName entityTableName; + private Table entityTable; + private Connection conn; + + private static final Log LOG = LogFactory + .getLog(HBaseTimelineWriterImpl.class); + + public HBaseTimelineWriterImpl(Configuration conf) throws IOException { + super(conf.get("yarn.application.id", "1")); + entityTableName = TableName.valueOf(conf.get( + EntityTableDetails.ENTITY_TABLE_NAME, + EntityTableDetails.DEFAULT_ENTITY_TABLE_NAME)); + conf.addResource("hbase-site.xml"); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + conn = ConnectionFactory.createConnection(conf); + entityTable = conn.getTable(entityTableName); + } + + @Override + public TimelineWriteResponse write(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntities data) throws IOException { + + byte[] rowKeyPrefix = EntityTableDetails.getRowKeyPrefix(clusterId, userId, + flowName, flowRunId, appId); + + TimelineWriteResponse putStatus = new TimelineWriteResponse(); + byte row[]; + for (TimelineEntity te : data.getEntities()) { + // get row key + row = TimelineWriterUtils.join( + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, rowKeyPrefix, + Bytes.toBytes(te.getType()), Bytes.toBytes(te.getId())); + + Put infoPut = getInfoPut(row, te, flowVersion); + Put eventPut = getEventPut(row, te.getEvents()); + Put configPut = getConfigPut(row, te.getConfigs()); + Put metricsPut = getMetricsPut(row, te.getMetrics()); + Put isRelatedToPut = getRelationPuts(row, te.getIsRelatedToEntities(), + EntityTableDetails.COLUMN_PREFIX_IS_RELATED_TO_BYTES); + Put relatesToPut = getRelationPuts(row, te.getRelatesToEntities(), + EntityTableDetails.COLUMN_PREFIX_RELATES_TO_BYTES); + + List entityPuts = new ArrayList(); + entityPuts.add(infoPut); + if (configPut.size() > 0) { + entityPuts.add(configPut); + } + if (metricsPut.size() > 0) { + entityPuts.add(metricsPut); + } + if (eventPut.size() > 0) { + entityPuts.add(eventPut); + } + if (isRelatedToPut.size() > 0) { + entityPuts.add(isRelatedToPut); + } + if (relatesToPut.size() > 0) { + entityPuts.add(relatesToPut); + } + + if (entityPuts.size() > 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing " + entityPuts.size() + " to " + + this.entityTableName.getNameAsString()); + } + entityTable.put(entityPuts); + } else { + LOG.warn("empty entity object?"); + } + } + + return putStatus; + } + + private Put getRelationPuts(byte[] row, + Map> connectedEntities, byte[] columnNamePrefix) { + Put relatesToEntitiesPut = new Put(row); + for (Map.Entry> entry : connectedEntities.entrySet()) { + String key = entry.getKey(); + Set value = entry.getValue(); + if ((value != null) && (value.size() > 0)) { + // create the columnName as prefix!key + byte[] columnName = TimelineWriterUtils.join( + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, + columnNamePrefix, + Bytes.toBytes(key)); + relatesToEntitiesPut.addColumn( + EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, columnName, Bytes + .toBytes(TimelineWriterUtils.getValueAsString( + EntityTableDetails.ROW_KEY_SEPARATOR, value))); + } + } + return relatesToEntitiesPut; + } + + /** + * prepares the Put for storing {@linkplain TimelineEntity} info to the + * backend + * + * @param row + * @param te + * @return {@linkplain Put} + */ + private Put getInfoPut(byte[] row, TimelineEntity te, String flowVersion) { + Put infoPut = new Put(row); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.ID.getInBytes(), Bytes.toBytes(te.getId())); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.TYPE.getInBytes(), Bytes.toBytes(te.getType())); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.CREATED_TIME.getInBytes(), + Bytes.toBytes(te.getCreatedTime())); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.MODIFIED_TIME.getInBytes(), + Bytes.toBytes(te.getModifiedTime())); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.FLOW_VERSION.getInBytes(), + Bytes.toBytes(flowVersion)); + + if (LOG.isDebugEnabled()) { + try { + LOG.debug("info Put " + infoPut.toJSON()); + } catch (IOException e) { + LOG.debug("Caught exception while logging " + e); + } + } + return infoPut; + } + + /** + * generates a Put for the config object in the {@linkplain TimelineEntity} + * + * @param rowKey + * @param map + * @return {@link Put} + * @throws IOException + */ + private Put getConfigPut(byte[] rowKey, Map map) + throws IOException { + Put configPut = new Put(rowKey); + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (value != null) { + configPut.addColumn(EntityTableDetails.COLUMN_FAMILY_CONFIG_BYTES, + Bytes.toBytes(key), GenericObjectMapper.write(value)); + } + } + return configPut; + } + + /** + * creates a put for the {@linkplain TimelineMetric} object + * + * @param row + * @param metrics + * @return {@linkplain Put} + * @throws IOException + */ + private Put getMetricsPut(byte[] row, Set metrics) + throws IOException { + Put metricsPut = new Put(row); + + for (TimelineMetric metric : metrics) { + String key = metric.getId(); + Map timeseries = metric.getValues(); + for (Map.Entry entry : timeseries.entrySet()) { + Long timestamp = entry.getKey(); + Number value = entry.getValue(); + if (value != null) { + Cell cell = CellUtil.createCell(row, + EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES, + Bytes.toBytes(key), + // set the cell timestamp + timestamp, + // KeyValue Type minimum + EntityTableDetails.ZERO_BYTES, + GenericObjectMapper.write(value)); + try { + metricsPut.add(cell); + } catch (IOException e) { + LOG.error("Caught exception while adding cell to Put ", e); + } + } + } + } + return metricsPut; + } + + /** + * creates a put for the {@linkplain TimelineEvent} object + * + * @param row + * @param metrics + * @return {@linkplain Put} + * @throws IOException + */ + private Put getEventPut(byte[] row, Set events) + throws IOException { + Put eventsPut = new Put(row); + for (TimelineEvent event : events) { + String id = event.getId(); + + Map eventInfo = event.getInfo(); + if (eventInfo != null) { + for (Map.Entry info : eventInfo.entrySet()) { + String key = info.getKey(); + if ((info.getValue() != null) && (id != null)) { + eventsPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + TimelineWriterUtils.join( + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, + EntityTableDetails.COLUMN_PREFIX_EVENTS_BYTES, + Bytes.toBytes(id), Bytes.toBytes(key)), GenericObjectMapper + .write(info.getValue())); + } + } + } + } + return eventsPut; + } + + @Override + public TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException { + return null; + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + try { + conn.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java new file mode 100644 index 0000000..0eeb1b4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/Range.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +public class Range { + private final int startIdx; + private final int endIdx; + + /** + * Defines a range from start index (inclusive) to end index (exclusive). + * + * @param start + * Starting index position + * @param end + * Ending index position (exclusive) + */ + public Range(int start, int end) { + if (start < 0 || end < start) { + throw new IllegalArgumentException( + "Invalid range, required that: 0 <= start <= end; start=" + start + + ", end=" + end); + } + + this.startIdx = start; + this.endIdx = end; + } + + public int start() { + return startIdx; + } + + public int end() { + return endIdx; + } + + public int length() { + return endIdx - startIdx; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java new file mode 100644 index 0000000..816762f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.hadoop.util.GenericOptionsParser; + +/** + * This creates the schema for a hbase based backend for storing application + * timeline information. + */ +public class TimelineSchemaCreator { + + final static String NAME = TimelineSchemaCreator.class.getSimpleName(); + private static final Log LOG = LogFactory + .getLog(TimelineSchemaCreator.class); + final static byte[][] splits = { Bytes.toBytes("a"), + Bytes.toBytes("ad"), + Bytes.toBytes("an"), + Bytes.toBytes("b"), + Bytes.toBytes("ca"), + Bytes.toBytes("cl"), + Bytes.toBytes("d"), + Bytes.toBytes("e"), + Bytes.toBytes("f"), + Bytes.toBytes("g"), + Bytes.toBytes("h"), + Bytes.toBytes("i"), + Bytes.toBytes("j"), + Bytes.toBytes("k"), + Bytes.toBytes("l"), + Bytes.toBytes("m"), + Bytes.toBytes("n"), + Bytes.toBytes("o"), + Bytes.toBytes("q"), + Bytes.toBytes("r"), + Bytes.toBytes("s"), + Bytes.toBytes("se"), + Bytes.toBytes("t"), + Bytes.toBytes("u"), + Bytes.toBytes("v"), + Bytes.toBytes("w"), + Bytes.toBytes("x"), + Bytes.toBytes("y"), + Bytes.toBytes("z") }; + + public static final String SPLIT_KEY_PREFIX_LENGTH = "4"; + + public static void main(String[] args) throws Exception { + + Configuration hbaseConf = HBaseConfiguration.create(); + // Grab input args and allow for -Dxyz style arguments + String[] otherArgs = new GenericOptionsParser(hbaseConf, args) + .getRemainingArgs(); + + // Grab the arguments we're looking for. + CommandLine commandLine = parseArgs(otherArgs); + + // Grab the entityTableName argument + String entityTableName = commandLine.getOptionValue("e"); + if (StringUtils.isNotBlank(entityTableName)) { + hbaseConf.set(EntityTableDetails.ENTITY_TABLE_NAME, entityTableName); + } + String entityTable_TTL_Metrics = commandLine.getOptionValue("m"); + if (StringUtils.isNotBlank(entityTable_TTL_Metrics)) { + hbaseConf.set(EntityTableDetails.ENTITY_TABLE_METRICS_TTL, + entityTable_TTL_Metrics); + } + createAllTables(hbaseConf); + } + + /** + * Parse command-line arguments. + * + * @param args + * command line arguments passed to program. + * @return parsed command line. + * @throws ParseException + */ + private static CommandLine parseArgs(String[] args) throws ParseException { + Options options = new Options(); + + // Input + Option o = new Option("e", "entityTableName", true, "entity table name"); + o.setArgName("entityTableName"); + o.setRequired(false); + options.addOption(o); + + o = new Option("m", "metricsTTL", true, "TTL for metrics column family"); + o.setArgName("metricsTTL"); + o.setRequired(false); + options.addOption(o); + + CommandLineParser parser = new PosixParser(); + CommandLine commandLine = null; + try { + commandLine = parser.parse(options, args); + } catch (Exception e) { + System.err.println("ERROR: " + e.getMessage() + "\n"); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(NAME + " ", options, true); + System.exit(-1); + } + + // Set debug level right away + if (commandLine.hasOption("d")) { + Logger log = Logger.getLogger(TimelineSchemaCreator.class); + log.setLevel(Level.DEBUG); + } + + return commandLine; + } + + private static void createAllTables(Configuration hbaseConf) + throws IOException { + + Connection conn = null; + try { + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + if (admin == null) { + throw new IOException("Cannot create table since admin is null"); + } + createTimelineEntityTable(admin, hbaseConf); + } finally { + if (conn != null) { + conn.close(); + } + } + } + + public static void createTimelineEntityTable(Admin admin, + Configuration hbaseConf) throws IOException { + + TableName table = TableName.valueOf(hbaseConf.get( + EntityTableDetails.ENTITY_TABLE_NAME, + EntityTableDetails.DEFAULT_ENTITY_TABLE_NAME)); + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + throw new IOException("Table " + table.getNameAsString() + + " already exists."); + } + + HTableDescriptor entityTableDescp = new HTableDescriptor(table); + HColumnDescriptor cf1 = new HColumnDescriptor( + EntityTableDetails.COLUMN_FAMILY_INFO_BYTES); + cf1.setBloomFilterType(BloomType.ROWCOL); + entityTableDescp.addFamily(cf1); + + HColumnDescriptor cf2 = new HColumnDescriptor( + EntityTableDetails.COLUMN_FAMILY_CONFIG_BYTES); + cf2.setBloomFilterType(BloomType.ROWCOL); + cf2.setBlockCacheEnabled(true); + entityTableDescp.addFamily(cf2); + + HColumnDescriptor cf3 = new HColumnDescriptor( + EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES); + entityTableDescp.addFamily(cf3); + cf3.setBlockCacheEnabled(true); + // always keep 1 version (the latest) + cf3.setMinVersions(1); + cf3.setMaxVersions(EntityTableDetails.ENTITY_TABLE_METRICS_MAX_VERSIONS_DEFAULT); + cf3.setTimeToLive(hbaseConf.getInt( + EntityTableDetails.ENTITY_TABLE_METRICS_TTL, + EntityTableDetails.ENTITY_TABLE_METRICS_TTL_DEFAULT)); + entityTableDescp + .setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); + entityTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", + SPLIT_KEY_PREFIX_LENGTH); + admin.createTable(entityTableDescp, splits); + LOG.info("Status of table creation for " + table.getNameAsString() + "=" + + admin.tableExists(table)); + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java new file mode 100644 index 0000000..c396547 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.Range; + +/** + * bunch of utility functions used across TimelineWriter classes + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class TimelineWriterUtils { + + /** empty bytes */ + public static final byte[] EMPTY_BYTES = new byte[0]; + + private static final Log LOG = LogFactory + .getLog(TimelineWriterUtils.class); + + /** + * Returns a single byte array containing all of the individual component + * arrays separated by the separator array. + * + * @param separator + * @param components + * @return byte array after joining the components + */ + public static byte[] join(byte[] separator, byte[]... components) { + if (components == null || components.length == 0) { + return EMPTY_BYTES; + } + + int finalSize = 0; + if (separator != null) { + finalSize = separator.length * (components.length - 1); + } + for (byte[] comp : components) { + finalSize += comp.length; + } + + byte[] buf = new byte[finalSize]; + int offset = 0; + for (int i = 0; i < components.length; i++) { + System.arraycopy(components[i], 0, buf, offset, components[i].length); + offset += components[i].length; + if (i < (components.length - 1) && separator != null + && separator.length > 0) { + System.arraycopy(separator, 0, buf, offset, separator.length); + offset += separator.length; + } + } + return buf; + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see + * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}. + * + * @param source + * @param separator + * @return byte[] array after splitting the source + */ + public static byte[][] split(byte[] source, byte[] separator) { + return split(source, separator, -1); + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see + * {@link TimelineWriterUtils#splitRanges(byte[], byte[])}. + * + * @param source + * @param separator + * @return byte[][] after splitting the input source + */ + public static byte[][] split(byte[] source, byte[] separator, int limit) { + List segments = splitRanges(source, separator, limit); + + byte[][] splits = new byte[segments.size()][]; + for (int i = 0; i < segments.size(); i++) { + Range r = segments.get(i); + byte[] tmp = new byte[r.length()]; + if (tmp.length > 0) { + System.arraycopy(source, r.start(), tmp, 0, r.length()); + } + splits[i] = tmp; + } + return splits; + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + */ + public static List splitRanges(byte[] source, byte[] separator) { + return splitRanges(source, separator, -1); + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + * @param source the source data + * @param separator the separator pattern to look for + * @param limit the maximum number of splits to identify in the source + */ + public static List splitRanges(byte[] source, byte[] separator, int limit) { + List segments = new ArrayList(); + int start = 0; + itersource: for (int i = 0; i < source.length; i++) { + for (int j = 0; j < separator.length; j++) { + if (source[i + j] != separator[j]) { + continue itersource; + } + } + // all separator elements matched + if (limit > 0 && segments.size() >= (limit-1)) { + // everything else goes in one final segment + break; + } + + segments.add(new Range(start, i)); + start = i + separator.length; + // i will be incremented again in outer for loop + i += separator.length-1; + } + // add in remaining to a final range + if (start <= source.length) { + segments.add(new Range(start, source.length)); + } + return segments; + } + + /** + * converts run id into it's inverse timestamp + * @param flowRunId + * @return inverted long + */ + public static long encodeRunId(Long flowRunId) { + return Long.MAX_VALUE - flowRunId; + } + + /** + * return a value from the NavigableMap as a String + * @param key + * @param taskValues + * @return value as a String or "" + */ + public static String getValueAsString(final byte[] key, + final Map taskValues) { + byte[] value = taskValues.get(key); + if (value != null) { + return Bytes.toString(value); + } else { + return ""; + } + } + + /** + * return a value from the NavigableMap as a long + * @param key + * @param taskValues + * @return value as Long or 0L + */ + public static long getValueAsLong(final byte[] key, + final Map taskValues) { + byte[] value = taskValues.get(key); + if (value != null) { + try { + long retValue = Bytes.toLong(value); + return retValue; + } catch (NumberFormatException nfe) { + LOG.error("Caught NFE while converting to long ", nfe); + return 0L; + } catch (IllegalArgumentException iae ) { + // for exceptions like java.lang.IllegalArgumentException: + // offset (0) + length (8) exceed the capacity of the array: 7 + LOG.error("Caught IAE while converting to long ", iae); + return 0L; + } + } else { + return 0L; + } + } + + /** + * concates the values from a Set to return a single delimited string value + * @param rowKeySeparator + * @param values + * @return Value from the set of strings as a string + */ + public static String getValueAsString(String rowKeySeparator, + Set values) { + + StringBuilder concatStrings = new StringBuilder(); + for (String value : values) { + concatStrings.append(value); + concatStrings.append(rowKeySeparator); + } + // remove the last separator + if(concatStrings.length() > 1) { + concatStrings.deleteCharAt(concatStrings.lastIndexOf(rowKeySeparator)); + } + return concatStrings.toString(); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java new file mode 100644 index 0000000..6611e0a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; + +public class TestHBaseTimelineWriterImpl { + + /** + * Unit test for PoC YARN 3411 + * + * @throws Exception + */ + + private static HBaseTestingUtility UTIL; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + UTIL = new HBaseTestingUtility(); + UTIL.startMiniCluster(); + createSchema(); + } + + private static void createSchema() throws IOException { + byte[][] families = new byte[3][]; + families[0] = EntityTableDetails.COLUMN_FAMILY_INFO_BYTES; + families[1] = EntityTableDetails.COLUMN_FAMILY_CONFIG_BYTES; + families[2] = EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES; + TimelineSchemaCreator.createTimelineEntityTable(UTIL.getHBaseAdmin(), + UTIL.getConfiguration()); + } + + @Test + public void testWriteEntityToHBase() throws Exception { + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "hello"; + String type = "world"; + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + + // add the isRelatedToEntity info + String key = "task"; + String value = "is_related_to_entity_id_here"; + Set isRelatedToSet = new HashSet(); + isRelatedToSet.add(value); + Map> isRelatedTo = new HashMap>(); + isRelatedTo.put(key, isRelatedToSet); + entity.setIsRelatedToEntities(isRelatedTo); + + // add the relatesTo info + key = "container"; + value = "relates_to_entity_id_here"; + Set relatesToSet = new HashSet(); + relatesToSet.add(value); + value = "relates_to_entity_id_here_Second"; + relatesToSet.add(value); + Map> relatesTo = new HashMap>(); + relatesTo.put(key, relatesToSet); + entity.setRelatesToEntities(relatesTo); + + // add some config entries + Map conf = new HashMap(); + conf.put("config_param1", "value1"); + conf.put("config_param2", "value2"); + entity.addConfigs(conf); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map metricValues = new HashMap(); + metricValues.put(1429741609000L, 100000000); + metricValues.put(1429742609000L, 200000000); + metricValues.put(1429743609000L, 300000000); + metricValues.put(1429744609000L, 400000000); + metricValues.put(1429745609000L, 50000000000L); + metricValues.put(1429746609000L, 60000000000L); + m1.setType(Type.TIME_SERIES); + m1.setValues(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + + te.addEntity(entity); + + HBaseTimelineWriterImpl hbi = null; + try { + Configuration c1 = UTIL.getConfiguration(); + hbi = new HBaseTimelineWriterImpl(c1); + hbi.serviceInit(c1); + // hbi.start(); + String cluster = "cluster1"; + String user = "user1"; + String flow = "some_flow_name"; + String flowVersion = "AB7822C10F1111"; + Long runid = 1002345678919L; + String appName = "some app name"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // scan the table and see that entity exists + Scan s = new Scan(); + byte[] startRow = EntityTableDetails.getRowKeyPrefix(cluster, user, flow, + runid, appName); + s.setStartRow(startRow); + s.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = null; + TableName entityTableName = TableName + .valueOf(EntityTableDetails.DEFAULT_ENTITY_TABLE_NAME); + Connection conn = ConnectionFactory.createConnection(c1); + Table entityTable = conn.getTable(entityTableName); + int rowCount = 0; + int colCount = 0; + scanner = entityTable.getScanner(s); + for (Result result : scanner) { + if (result != null && !result.isEmpty()) { + rowCount++; + colCount += result.size(); + byte[] row1 = result.getRow(); + assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, + entity)); + + // check info column family + NavigableMap infoValues = result + .getFamilyMap(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES); + String id1 = TimelineWriterUtils.getValueAsString( + EntityColumnDetails.ID.getInBytes(), infoValues); + assertEquals(id, id1); + String type1 = TimelineWriterUtils.getValueAsString( + EntityColumnDetails.TYPE.getInBytes(), infoValues); + assertEquals(type, type1); + Long cTime1 = TimelineWriterUtils.getValueAsLong( + EntityColumnDetails.CREATED_TIME.getInBytes(), infoValues); + assertEquals(cTime1, cTime); + Long mTime1 = TimelineWriterUtils.getValueAsLong( + EntityColumnDetails.MODIFIED_TIME.getInBytes(), infoValues); + assertEquals(mTime1, mTime); + checkRelatedEntities(isRelatedTo, infoValues, + EntityTableDetails.COLUMN_PREFIX_IS_RELATED_TO_BYTES); + checkRelatedEntities(relatesTo, infoValues, + EntityTableDetails.COLUMN_PREFIX_RELATES_TO_BYTES); + + // check config column family + NavigableMap configValuesResult = result + .getFamilyMap(EntityTableDetails.COLUMN_FAMILY_CONFIG_BYTES); + checkConfigs(configValuesResult, conf); + + NavigableMap metricsResult = result + .getFamilyMap(EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES); + checkMetricsSizeAndKey(metricsResult, metrics); + List metricCells = result.getColumnCells( + EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES, + Bytes.toBytes(m1.getId())); + checkMetricsTimeseries(metricCells, m1); + } + } + assertEquals(1, rowCount); + assertEquals(15, colCount); + + } finally { + + hbi.serviceStop(); + hbi.close(); + } + } + + private void checkMetricsTimeseries(List metricCells, + TimelineMetric m1) throws IOException { + + Map timeseries = m1.getValues(); + assertEquals(metricCells.size(), timeseries.size()); + for (Cell c1 : metricCells) { + assertTrue(timeseries.containsKey(c1.getTimestamp())); + assertEquals(GenericObjectMapper.read(CellUtil.cloneValue(c1)), + timeseries.get(c1.getTimestamp())); + } + } + + private void checkMetricsSizeAndKey( + NavigableMap metricsResult, Set metrics) { + assertEquals(metrics.size(), metricsResult.size()); + for (TimelineMetric m1 : metrics) { + byte[] key = Bytes.toBytes(m1.getId()); + assertTrue(metricsResult.containsKey(key)); + } + } + + private void checkConfigs(NavigableMap configValuesResult, + Map conf) throws IOException { + + assertEquals(conf.size(), configValuesResult.size()); + byte[] columnName; + for (String key : conf.keySet()) { + columnName = Bytes.toBytes(key); + assertTrue(configValuesResult.containsKey(columnName)); + byte[] value = configValuesResult.get(columnName); + assertNotNull(value); + assertEquals(conf.get(key), GenericObjectMapper.read(value)); + } + } + + private void checkRelatedEntities(Map> isRelatedTo, + NavigableMap infoValues, byte[] columnPrefix) { + + for (String key : isRelatedTo.keySet()) { + byte[] columnName = TimelineWriterUtils.join( + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, columnPrefix, + Bytes.toBytes(key)); + + byte[] value = infoValues.get(columnName); + assertNotNull(value); + String isRelatedToEntities = Bytes.toString(value); + assertNotNull(isRelatedToEntities); + assertEquals(TimelineWriterUtils.getValueAsString( + EntityTableDetails.ROW_KEY_SEPARATOR, isRelatedTo.get(key)), + isRelatedToEntities); + } + } + + private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, + String flow, Long runid, String appName, TimelineEntity te) { + + byte[][] rowKeyComponents = TimelineWriterUtils.split(rowKey, + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES); + + assertTrue(rowKeyComponents.length == 7); + assertEquals(user, Bytes.toString(rowKeyComponents[0])); + assertEquals(cluster, Bytes.toString(rowKeyComponents[1])); + assertEquals(flow, Bytes.toString(rowKeyComponents[2])); + assertEquals(TimelineWriterUtils.encodeRunId(runid), + Bytes.toLong(rowKeyComponents[3])); + assertEquals(appName, Bytes.toString(rowKeyComponents[4])); + assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5])); + assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6])); + return true; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } +}