diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java index de46617..2029117 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TimelineServicePerformanceV2.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import org.apache.commons.logging.Log; @@ -262,6 +264,10 @@ public void map(IntWritable key, IntWritable val, Context context) TimelineMetric metric = new TimelineMetric(); metric.setId("foo_metric"); metric.setSingleData(123456789L); + Map metricValues = new HashMap(); + long ts = System.currentTimeMillis(); + metricValues.put(ts, ts/11); + metric.addTimeSeries(metricValues); entity.addMetric(metric); // add a config entity.addConfig("foo", "bar"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index f974aee..77b6cbf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -67,6 +67,45 @@ + org.apache.hbase + hbase-client + 1.0.1 + + + + + org.apache.hbase + hbase-testing-util + 1.0.1 + test + true + + + org.apache.hadoop + hadoop-hdfs + + + org.jruby + jruby-complete + + + + + + + org.apache.hadoop + hadoop-hdfs + test + + + + org.apache.hadoop + hadoop-hdfs + test-jar + test + + + com.google.guava guava diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/CreateSchema.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/CreateSchema.java new file mode 100644 index 0000000..f9e1b39 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/CreateSchema.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This creates the schema for a hbase based backend for storing application + * timeline information. + */ +public class CreateSchema { + + private static final Log LOG = LogFactory.getLog(CreateSchema.class); + + public static void main(String[] args) throws Exception { + int res = createAllTables(args); + System.exit(res); + } + + private static int createAllTables(String[] args) { + int res = createTimelineEntityTable(); + // TODO calls to create other tables + return res; + } + + private static int createTimelineEntityTable() { + try { + Configuration config = HBaseConfiguration.create(); + // add the hbase configuration details from classpath + config.addResource("hbase-site.xml"); + Connection conn = ConnectionFactory.createConnection(config); + Admin admin = conn.getAdmin(); + TableName table = TableName + .valueOf(EntityTableDetails.DEFAULT_ENTITY_TABLE_NAME); + + if (admin.tableExists(table)) { + // do not disable / delete existing table + // similar to the approach taken by map-reduce jobs when + // output directory exists + LOG.error("Table " + table.getNameAsString() + " already exists."); + return 1; + } + + HTableDescriptor entityTableDescp = new HTableDescriptor(table); + HColumnDescriptor cf1 = new HColumnDescriptor( + EntityTableDetails.COLUMN_FAMILY_INFO_BYTES); + cf1.setBloomFilterType(BloomType.ROWCOL); + entityTableDescp.addFamily(cf1); + + HColumnDescriptor cf2 = new HColumnDescriptor( + EntityTableDetails.COLUMN_FAMILY_CONFIG_BYTES); + cf2.setBloomFilterType(BloomType.ROWCOL); + cf2.setBlockCacheEnabled(true); + entityTableDescp.addFamily(cf2); + + HColumnDescriptor cf3 = new HColumnDescriptor( + EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES); + entityTableDescp.addFamily(cf3); + cf3.setBlockCacheEnabled(true); + // always keep 1 version (the latest) + cf3.setMinVersions(1); + // TODO confirm what we want to set as max number of versions for + // timeseries data + cf3.setMaxVersions(Integer.MAX_VALUE); + // TTL is 30 days, need to make it configurable perhaps + cf3.setTimeToLive(2592000); + admin.createTable(entityTableDescp); + } catch (IOException ioe) { + LOG.error("Caught exception during create table " + ioe); + return 1; + } + return 0; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java new file mode 100644 index 0000000..b90700e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Contains the Column names, types and byte representations for + * {@linkplain TimelineEntity} object that is stored in hbase + * + */ +public enum EntityColumnDetails { + ID(String.class), + TYPE(String.class), + CREATED_TIME(Long.class), + MODIFIED_TIME(Long.class), + FLOW_VERSION(String.class); + + private final Class className; + private final byte[] inBytes; + + private EntityColumnDetails(Class className) { + this.className = className; + this.inBytes = Bytes.toBytes(this.name().toLowerCase()); + } + + public Class getClassName() { + return className; + } + + + public byte[] getInBytes() { + return inBytes; + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTableDetails.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTableDetails.java new file mode 100644 index 0000000..4ded643 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTableDetails.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.hbase.util.Bytes; + +public class EntityTableDetails { + /** default value for entity table name */ + public static final String DEFAULT_ENTITY_TABLE_NAME = "timelineservice.entity"; + + /** in bytes default value for entity table name */ + public static final byte[] DEFAULT_ENTITY_TABLE_NAME_BYTES = + Bytes.toBytes(DEFAULT_ENTITY_TABLE_NAME); + + /** column family info */ + public static final String COLUMN_FAMILY_INFO = "i"; + + /** byte representation for column family info */ + public static final byte[] COLUMN_FAMILY_INFO_BYTES = Bytes + .toBytes(COLUMN_FAMILY_INFO); + + /** column family metrics */ + public static final String COLUMN_FAMILY_METRICS = "m"; + + /** byte representation for column family metrics */ + public static final byte[] COLUMN_FAMILY_METRICS_BYTES = Bytes + .toBytes(COLUMN_FAMILY_METRICS); + + /** column family config */ + public static final String COLUMN_FAMILY_CONFIG = "c"; + + /** byte representation for column family config */ + public static final byte[] COLUMN_FAMILY_CONFIG_BYTES = Bytes + .toBytes(COLUMN_FAMILY_CONFIG); + + /** column prefix for events */ + public static final String COLUMN_PREFIX_EVENTS = "e"; + + /** byte representation for column family config */ + public static final byte[] COLUMN_PREFIX_EVENTS_BYTES = Bytes + .toBytes(COLUMN_PREFIX_EVENTS); + + /** column prefix for relatesTo */ + public static final String COLUMN_PREFIX_RELATES_TO = "r"; + + /** byte representation for COLUMN_PREFIX_RELATES_TO */ + public static final byte[] COLUMN_PREFIX_RELATES_TO_BYTES = Bytes + .toBytes(COLUMN_PREFIX_RELATES_TO); + + /** column prefix for isRelatedTo */ + public static final String COLUMN_PREFIX_IS_RELATED_TO = "ir"; + + /** byte representation for COLUMN_PREFIX_RELATES_TO */ + public static final byte[] COLUMN_PREFIX_IS_RELATED_TO_BYTES = Bytes + .toBytes(COLUMN_PREFIX_IS_RELATED_TO); + + /** separator in row key */ + public static final String ROW_KEY_SEPARATOR = "!"; + + /** zero bytes */ + public static final byte ZERO_BYTES = 0; + + /** byte representation of the separator in row key */ + public static final byte[] ROW_KEY_SEPARATOR_BYTES = Bytes + .toBytes(ROW_KEY_SEPARATOR); + + public static class Range { + private int startIdx; + private int endIdx; + + /** + * Defines a range from start index (inclusive) to end index (exclusive). + * + * @param start + * Starting index position + * @param end + * Ending index position (exclusive) + */ + public Range(int start, int end) { + if (start < 0 || end < start) { + throw new IllegalArgumentException( + "Invalid range, required that: 0 <= start <= end; start=" + start + + ", end=" + end); + } + + this.startIdx = start; + this.endIdx = end; + } + + public int start() { + return startIdx; + } + + public int end() { + return endIdx; + } + + public int length() { + return endIdx - startIdx; + } + } + + /** + * Constructs a row key prefix for the entity table + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @param appId + * @return + */ + static byte[] getRowKeyPrefix(String clusterId, String userId, String flowId, + Long flowRunId, String appId) { + return TimelineWriterUtils.join(EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, + Bytes.toBytes(userId), Bytes.toBytes(clusterId), Bytes.toBytes(flowId), + Bytes.toBytes(TimelineWriterUtils.encodeRunId(flowRunId)), + Bytes.toBytes(appId)); + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java new file mode 100644 index 0000000..7277d63 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.EntityTableDetails; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; + +/** + * This implements a hbase based backend for storing application timeline entity + * information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class HBaseTimelineWriterImpl extends AbstractService implements + TimelineWriter { + + private final TableName entityTableName; + private Table entityTable; + private Connection conn; + + private static final Log LOG = LogFactory + .getLog(HBaseTimelineWriterImpl.class); + + public HBaseTimelineWriterImpl(Configuration conf) throws IOException { + super(conf.get("yarn.application.id", "1")); + // TODO right now using a default table name + // change later to use a config driven table name + entityTableName = TableName + .valueOf(EntityTableDetails.DEFAULT_ENTITY_TABLE_NAME); + conf.addResource("hbase-site.xml"); + LOG.info("hBaseConf=" + conf.toString()); + } + + @Override + public void init(Configuration conf) { + try { + serviceInit(conf); + } catch (Exception e) { + LOG.error("caught exception during serviceInit " + e); + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + conn = ConnectionFactory.createConnection(conf); + entityTable = conn.getTable(entityTableName); + } + + @Override + public TimelineWriteResponse write(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntities data) throws IOException { + + byte[] rowKeyPrefix = EntityTableDetails.getRowKeyPrefix(clusterId, userId, + flowName, flowRunId, appId); + + TimelineWriteResponse putStatus = new TimelineWriteResponse(); + byte row[]; + for (TimelineEntity te : data.getEntities()) { + // get row key + row = TimelineWriterUtils.join( + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, rowKeyPrefix, + Bytes.toBytes(te.getType()), Bytes.toBytes(te.getId())); + + Put infoPut = getInfoPut(row, te, flowVersion); + Put eventPut = getEventPut(row, te.getEvents()); + Put configPut = getConfigPut(row, te.getConfigs()); + Put metricsPut = getMetricsPut(row, te.getMetrics()); + Put isRelatedToPut = getIsRelatedToPuts(row, te.getIsRelatedToEntities()); + Put relatesToPut = getRelatesToPuts(row, te.getRelatesToEntities()); + + List entityPuts = new ArrayList(); + entityPuts.add(infoPut); + if (configPut.size() > 0) { + entityPuts.add(configPut); + } + if (metricsPut.size() > 0) { + entityPuts.add(metricsPut); + } + if (eventPut.size() > 0) { + entityPuts.add(eventPut); + } + if (isRelatedToPut.size() > 0) { + entityPuts.add(isRelatedToPut); + } + if (relatesToPut.size() > 0) { + entityPuts.add(relatesToPut); + } + + if (entityPuts.size() > 0) { + LOG.info("Storing " + entityPuts.size() + " to " + + this.entityTableName.getNameAsString()); + entityTable.put(entityPuts); + } else { + LOG.warn("empty entity object?"); + } + } + + return putStatus; + } + + private Put getRelatesToPuts(byte[] row, + Map> relatesToEntities) { + Put relatesToEntitiesPut = new Put(row); + Set value; + for (String key : relatesToEntities.keySet()) { + value = relatesToEntities.get(key); + // create the columnName as prefix!key + byte[] columnName = TimelineWriterUtils + .join(EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, + EntityTableDetails.COLUMN_PREFIX_RELATES_TO_BYTES, + Bytes.toBytes(key)); + relatesToEntitiesPut.addColumn( + EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, columnName, Bytes + .toBytes(TimelineWriterUtils.getValueAsString( + EntityTableDetails.ROW_KEY_SEPARATOR, value))); + } + return relatesToEntitiesPut; + } + + private Put getIsRelatedToPuts(byte[] row, + Map> isRelatedToEntities) { + Put isRelatedToEntitiesPut = new Put(row); + Set value; + for (String key : isRelatedToEntities.keySet()) { + value = isRelatedToEntities.get(key); + // create the columnName as prefix!key + byte[] columnName = TimelineWriterUtils.join( + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, + EntityTableDetails.COLUMN_PREFIX_IS_RELATED_TO_BYTES, + Bytes.toBytes(key)); + isRelatedToEntitiesPut.addColumn( + EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, columnName, Bytes + .toBytes(TimelineWriterUtils.getValueAsString( + EntityTableDetails.ROW_KEY_SEPARATOR, value))); + } + return isRelatedToEntitiesPut; + } + + /** + * prepares the Put for storing {@linkplain TimelineEntity} info to the + * backend + * + * @param row + * @param te + * @return {@linkplain Put} + */ + private Put getInfoPut(byte[] row, TimelineEntity te, String flowVersion) { + Put infoPut = new Put(row); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.ID.getInBytes(), Bytes.toBytes(te.getId())); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.TYPE.getInBytes(), Bytes.toBytes(te.getType())); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.CREATED_TIME.getInBytes(), + Bytes.toBytes(te.getCreatedTime())); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.MODIFIED_TIME.getInBytes(), + Bytes.toBytes(te.getModifiedTime())); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.FLOW_VERSION.getInBytes(), + Bytes.toBytes(flowVersion)); + + if (LOG.isDebugEnabled()) { + try { + LOG.debug("info Put " + infoPut.toJSON()); + } catch (IOException e) { + LOG.debug("Caught exception while logging " + e); + } + } + return infoPut; + } + + /** + * generates a Put for the config object in the {@linkplain TimelineEntity} + * + * @param rowKey + * @param configs + * @return {@link Put} + * @throws IOException + */ + private Put getConfigPut(byte[] rowKey, Map configs) + throws IOException { + Put configPut = new Put(rowKey); + String key = ""; + Object value = ""; + for (Map.Entry entry : configs.entrySet()) { + key = entry.getKey(); + value = entry.getValue(); + configPut.addColumn(EntityTableDetails.COLUMN_FAMILY_CONFIG_BYTES, + Bytes.toBytes(key), GenericObjectMapper.write(value)); + } + return configPut; + } + + /** + * creates a put for the {@linkplain TimelineMetric} object + * + * @param row + * @param metrics + * @return {@linkplain Put} + * @throws IOException + */ + private Put getMetricsPut(byte[] row, Set metrics) + throws IOException { + Put metricsPut = new Put(row); + + String key = ""; + for (TimelineMetric metric : metrics) { + key = metric.getId(); + Map timeseries = metric.getTimeSeries(); + for (Long timestamp : timeseries.keySet()) { + Cell cell = CellUtil.createCell(row, + EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES, Bytes.toBytes(key), + // set the cell timestamp + timestamp, + // KeyValue Type minimum + EntityTableDetails.ZERO_BYTES, + GenericObjectMapper.write(timeseries.get(timestamp))); + try { + metricsPut.add(cell); + } catch (IOException e) { + LOG.error("Caught exception while adding cell to Put " + e); + } + } + } + return metricsPut; + } + + /** + * creates a put for the {@linkplain TimelineEvent} object + * + * @param row + * @param metrics + * @return {@linkplain Put} + * @throws IOException + */ + private Put getEventPut(byte[] row, Set events) + throws IOException { + Put eventsPut = new Put(row); + String key = ""; + String id = ""; + for (TimelineEvent event : events) { + id = event.getId(); + + Map eventInfo = event.getInfo(); + if (eventInfo != null) { + for (Map.Entry info : eventInfo.entrySet()) { + key = info.getKey(); + if ((info.getValue() != null) && (id != null)) { + eventsPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + TimelineWriterUtils.join( + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, + EntityTableDetails.COLUMN_PREFIX_EVENTS_BYTES, + Bytes.toBytes(id), Bytes.toBytes(key)), GenericObjectMapper + .write(info.getValue())); + } + } + } + } + return eventsPut; + } + + @Override + public TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException { + return null; + } + + @Override + protected void serviceStop() throws Exception { + try { + conn.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java new file mode 100644 index 0000000..19253ca --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.EntityTableDetails.Range; + +/** + * bunch of utility functions used across TimelineWriter classes + * + */ +public class TimelineWriterUtils { + + /** empty bytes */ + public static final byte[] EMPTY_BYTES = new byte[0]; + + private static final Log LOG = LogFactory + .getLog(TimelineWriterUtils.class); + + /** + * Returns a single byte array containing all of the individual component + * arrays separated by the separator array. + * + * @param separator + * @param components + * @return + */ + public static byte[] join(byte[] separator, byte[]... components) { + if (components == null || components.length == 0) { + return EMPTY_BYTES; + } + + int finalSize = 0; + if (separator != null) { + finalSize = separator.length * (components.length - 1); + } + for (byte[] comp : components) { + finalSize += comp.length; + } + + byte[] buf = new byte[finalSize]; + int offset = 0; + for (int i = 0; i < components.length; i++) { + System.arraycopy(components[i], 0, buf, offset, components[i].length); + offset += components[i].length; + if (i < (components.length - 1) && separator != null + && separator.length > 0) { + System.arraycopy(separator, 0, buf, offset, separator.length); + offset += separator.length; + } + } + return buf; + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see + * {@link ByteUtil#splitRanges(byte[], byte[])}. + * + * @param source + * @param separator + * @return + */ + public static byte[][] split(byte[] source, byte[] separator) { + return split(source, separator, -1); + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see + * {@link ByteUtil#splitRanges(byte[], byte[])}. + * + * @param source + * @param separator + * @return + */ + public static byte[][] split(byte[] source, byte[] separator, int limit) { + List segments = splitRanges(source, separator, limit); + + byte[][] splits = new byte[segments.size()][]; + for (int i = 0; i < segments.size(); i++) { + Range r = segments.get(i); + byte[] tmp = new byte[r.length()]; + if (tmp.length > 0) { + System.arraycopy(source, r.start(), tmp, 0, r.length()); + } + splits[i] = tmp; + } + return splits; + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + */ + public static List splitRanges(byte[] source, byte[] separator) { + return splitRanges(source, separator, -1); + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + * @param source the source data + * @param separator the separator pattern to look for + * @param limit the maximum number of splits to identify in the source + */ + public static List splitRanges(byte[] source, byte[] separator, int limit) { + List segments = new ArrayList(); + int start = 0; + itersource: for (int i = 0; i < source.length; i++) { + for (int j = 0; j < separator.length; j++) { + if (source[i + j] != separator[j]) { + continue itersource; + } + } + // all separator elements matched + if (limit > 0 && segments.size() >= (limit-1)) { + // everything else goes in one final segment + break; + } + + segments.add(new Range(start, i)); + start = i + separator.length; + // i will be incremented again in outer for loop + i += separator.length-1; + } + // add in remaining to a final range + if (start <= source.length) { + segments.add(new Range(start, source.length)); + } + return segments; + } + + /** + * converts run id into it's inverse timestamp + * @param flowRunId + * @return inverted long + */ + public static long encodeRunId(Long flowRunId) { + return Long.MAX_VALUE - flowRunId; + } + + /** + * return a value from the NavigableMap as a String + * @param key + * @param taskValues + * @return value as a String or "" + */ + public static String getValueAsString(final byte[] key, + final Map taskValues) { + byte[] value = taskValues.get(key); + if (value != null) { + return Bytes.toString(value); + } else { + return ""; + } + } + + /** + * return a value from the NavigableMap as a long + * @param key + * @param taskValues + * @return value as Long or 0L + */ + public static long getValueAsLong(final byte[] key, + final Map taskValues) { + byte[] value = taskValues.get(key); + if (value != null) { + try { + long retValue = Bytes.toLong(value); + return retValue; + } catch (NumberFormatException nfe) { + LOG.error("Caught NFE while converting to long ", nfe); + return 0L; + } catch (IllegalArgumentException iae ) { + // for exceptions like java.lang.IllegalArgumentException: + // offset (0) + length (8) exceed the capacity of the array: 7 + LOG.error("Caught IAE while converting to long ", iae); + return 0L; + } + } else { + return 0L; + } + } + + /** + * concates the values from a Set to return a single delimited string value + * @param rowKeySeparator + * @param values + * @return + */ + public static String getValueAsString(String rowKeySeparator, + Set values) { + + StringBuilder concatStrings = new StringBuilder(); + for (String value : values) { + concatStrings.append(value); + concatStrings.append(rowKeySeparator); + } + // remove the last separator + if(concatStrings.length() > 1) { + concatStrings.deleteCharAt(concatStrings.lastIndexOf(rowKeySeparator)); + } + return concatStrings.toString(); + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java new file mode 100644 index 0000000..2760f01 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; + +public class TestHBaseTimelineWriterImpl { + + /** + * Unit test for PoC YARN 3411 + * + * @throws Exception + */ + + private static HBaseTestingUtility UTIL; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + UTIL = new HBaseTestingUtility(); + UTIL.startMiniCluster(); + createSchema(); + } + + private static HTable createSchema() throws IOException { + byte[][] families = new byte[3][]; + families[0] = EntityTableDetails.COLUMN_FAMILY_INFO_BYTES; + families[1] = EntityTableDetails.COLUMN_FAMILY_CONFIG_BYTES; + families[2] = EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES; + return UTIL.createTable(EntityTableDetails.DEFAULT_ENTITY_TABLE_NAME_BYTES, + families, Integer.MAX_VALUE); + } + + @Test + public void testWriteEntityToHBase() throws Exception { + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "hello"; + String type = "world"; + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + + // add the isRelatedToEntity info + String key = "task"; + String value = "is_related_to_entity_id_here"; + Set isRelatedToSet = new HashSet(); + isRelatedToSet.add(value); + Map> isRelatedTo = new HashMap>(); + isRelatedTo.put(key, isRelatedToSet); + entity.setIsRelatedToEntities(isRelatedTo); + + // add the relatesTo info + key = "container"; + value = "relates_to_entity_id_here"; + Set relatesToSet = new HashSet(); + relatesToSet.add(value); + value = "relates_to_entity_id_here_Second"; + relatesToSet.add(value); + Map> relatesTo = new HashMap>(); + relatesTo.put(key, relatesToSet); + entity.setRelatesToEntities(relatesTo); + + // add some config entries + Map conf = new HashMap(); + conf.put("config_param1", "value1"); + conf.put("config_param2", "value2"); + entity.addConfigs(conf); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map metricValues = new HashMap(); + metricValues.put(1429741609000L, 100000000); + metricValues.put(1429742609000L, 200000000); + metricValues.put(1429743609000L, 300000000); + metricValues.put(1429744609000L, 400000000); + metricValues.put(1429745609000L, 50000000000L); + metricValues.put(1429746609000L, 60000000000L); + m1.setTimeSeries(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + + te.addEntity(entity); + + HBaseTimelineWriterImpl hbi = null; + try { + Configuration c1 = UTIL.getConfiguration(); + hbi = new HBaseTimelineWriterImpl(c1); + hbi.serviceInit(c1); + // hbi.start(); + String cluster = "cluster1"; + String user = "user1"; + String flow = "some_flow_name"; + String flowVersion = "AB7822C10F1111"; + Long runid = 1002345678919L; + String appName = "some app name"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // scan the table and see that entity exists + Scan s = new Scan(); + byte[] startRow = EntityTableDetails.getRowKeyPrefix(cluster, user, flow, + runid, appName); + s.setStartRow(startRow); + s.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = null; + TableName entityTableName = TableName + .valueOf(EntityTableDetails.DEFAULT_ENTITY_TABLE_NAME); + Connection conn = ConnectionFactory.createConnection(c1); + Table entityTable = conn.getTable(entityTableName); + int rowCount = 0; + int colCount = 0; + scanner = entityTable.getScanner(s); + for (Result result : scanner) { + if (result != null && !result.isEmpty()) { + rowCount++; + colCount += result.size(); + byte[] row1 = result.getRow(); + assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, + entity)); + + // check info column family + NavigableMap infoValues = result + .getFamilyMap(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES); + String id1 = TimelineWriterUtils.getValueAsString( + EntityColumnDetails.ID.getInBytes(), infoValues); + assertEquals(id, id1); + String type1 = TimelineWriterUtils.getValueAsString( + EntityColumnDetails.TYPE.getInBytes(), infoValues); + assertEquals(type, type1); + Long cTime1 = TimelineWriterUtils.getValueAsLong( + EntityColumnDetails.CREATED_TIME.getInBytes(), infoValues); + assertEquals(cTime1, cTime); + Long mTime1 = TimelineWriterUtils.getValueAsLong( + EntityColumnDetails.MODIFIED_TIME.getInBytes(), infoValues); + assertEquals(mTime1, mTime); + checkRelatedEntities(isRelatedTo, infoValues, + EntityTableDetails.COLUMN_PREFIX_IS_RELATED_TO_BYTES); + checkRelatedEntities(relatesTo, infoValues, + EntityTableDetails.COLUMN_PREFIX_RELATES_TO_BYTES); + + // check config column family + NavigableMap configValuesResult = result + .getFamilyMap(EntityTableDetails.COLUMN_FAMILY_CONFIG_BYTES); + checkConfigs(configValuesResult, conf); + + NavigableMap metricsResult = result + .getFamilyMap(EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES); + checkMetricsSizeAndKey(metricsResult, metrics); + List metricCells = result.getColumnCells( + EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES, + Bytes.toBytes(m1.getId())); + checkMetricsTimeseries(metricCells, m1); + } + } + assertEquals(1, rowCount); + assertEquals(15, colCount); + + } finally { + + hbi.serviceStop(); + hbi.close(); + } + } + + private void checkMetricsTimeseries(List metricCells, + TimelineMetric m1) throws IOException { + + Map timeseries = m1.getTimeSeries(); + assertEquals(metricCells.size(), timeseries.size()); + for (Cell c1 : metricCells) { + assertTrue(timeseries.containsKey(c1.getTimestamp())); + assertEquals(GenericObjectMapper.read(CellUtil.cloneValue(c1)), + timeseries.get(c1.getTimestamp())); + } + } + + private void checkMetricsSizeAndKey( + NavigableMap metricsResult, Set metrics) { + assertEquals(metrics.size(), metricsResult.size()); + for (TimelineMetric m1 : metrics) { + byte[] key = Bytes.toBytes(m1.getId()); + assertTrue(metricsResult.containsKey(key)); + } + } + + private void checkConfigs(NavigableMap configValuesResult, + Map conf) throws IOException { + + assertEquals(conf.size(), configValuesResult.size()); + byte[] columnName; + for (String key : conf.keySet()) { + columnName = Bytes.toBytes(key); + assertTrue(configValuesResult.containsKey(columnName)); + byte[] value = configValuesResult.get(columnName); + assertNotNull(value); + assertEquals(conf.get(key), GenericObjectMapper.read(value)); //Bytes.toString(value)); + } + } + + private void checkRelatedEntities(Map> isRelatedTo, + NavigableMap infoValues, byte[] columnPrefix) { + + for (String key : isRelatedTo.keySet()) { + byte[] columnName = TimelineWriterUtils.join( + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, columnPrefix, + Bytes.toBytes(key)); + + byte[] value = infoValues.get(columnName); + assertNotNull(value); + String isRelatedToEntities = Bytes.toString(value); + assertNotNull(isRelatedToEntities); + assertEquals(TimelineWriterUtils.getValueAsString( + EntityTableDetails.ROW_KEY_SEPARATOR, isRelatedTo.get(key)), + isRelatedToEntities); + } + } + + private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, + String flow, Long runid, String appName, TimelineEntity te) { + + byte[][] rowKeyComponents = TimelineWriterUtils.split(rowKey, + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES); + + assertTrue(rowKeyComponents.length == 7); + assertEquals(user, Bytes.toString(rowKeyComponents[0])); + assertEquals(cluster, Bytes.toString(rowKeyComponents[1])); + assertEquals(flow, Bytes.toString(rowKeyComponents[2])); + assertEquals(TimelineWriterUtils.encodeRunId(runid), + Bytes.toLong(rowKeyComponents[3])); + assertEquals(appName, Bytes.toString(rowKeyComponents[4])); + assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5])); + assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6])); + return true; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } +}