diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index f974aee..33ab23d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -67,6 +67,38 @@ + org.apache.hbase + hbase-client + 1.0.0 + + + + org.apache.hbase + hbase-server + 1.0.0 + + + + + org.apache.hbase + hbase-testing-util + 1.0.0 + test + true + + + org.apache.hadoop + hadoop-hdfs + + + org.jruby + jruby-complete + + + + + + com.google.guava guava diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java new file mode 100644 index 0000000..ac77656 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java @@ -0,0 +1,33 @@ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Contains the Column names, types and byte representations for + * {@linkplain TimelineEntity} object that is stored in hbase + * + */ +public enum EntityColumnDetails { + ID(String.class), + TYPE(String.class), + CREATED_TIME(Long.class), + MODIFIED_TIME(Long.class); + + private final Class className; + private final byte[] inBytes; + + private EntityColumnDetails(Class className) { + this.className = className; + this.inBytes = Bytes.toBytes(this.name().toLowerCase()); + } + + public Class getClassName() { + return className; + } + + + public byte[] getInBytes() { + return inBytes; + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTableDetails.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTableDetails.java new file mode 100644 index 0000000..b3074c0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTableDetails.java @@ -0,0 +1,118 @@ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.hbase.util.Bytes; + +public class EntityTableDetails { + /** default value for entity table name */ + public static final String DEFAULT_ENTITY_TABLE_NAME = "ats.entity"; + + /** in bytes default value for entity table name */ + public static final byte[] DEFAULT_ENTITY_TABLE_NAME_BYTES = + Bytes.toBytes(DEFAULT_ENTITY_TABLE_NAME); + + /** column family info */ + public static final String COLUMN_FAMILY_INFO = "i"; + + /** byte representation for column family info */ + public static final byte[] COLUMN_FAMILY_INFO_BYTES = Bytes + .toBytes(COLUMN_FAMILY_INFO); + + /** column family metrics */ + public static final String COLUMN_FAMILY_METRICS = "m"; + + /** byte representation for column family metrics */ + public static final byte[] COLUMN_FAMILY_METRICS_BYTES = Bytes + .toBytes(COLUMN_FAMILY_METRICS); + + /** column family config */ + public static final String COLUMN_FAMILY_CONFIG = "c"; + + /** byte representation for column family config */ + public static final byte[] COLUMN_FAMILY_CONFIG_BYTES = Bytes + .toBytes(COLUMN_FAMILY_CONFIG); + + /** column prefix for events */ + public static final String COLUMN_PREFIX_EVENTS = "e"; + + /** byte representation for column family config */ + public static final byte[] COLUMN_PREFIX_EVENTS_BYTES = Bytes + .toBytes(COLUMN_PREFIX_EVENTS); + + /** column prefix for relatesTo */ + public static final String COLUMN_PREFIX_RELATES_TO = "r"; + + /** byte representation for COLUMN_PREFIX_RELATES_TO */ + public static final byte[] COLUMN_PREFIX_RELATES_TO_BYTES = Bytes + .toBytes(COLUMN_PREFIX_RELATES_TO); + + /** column prefix for isRelatedTo */ + public static final String COLUMN_PREFIX_IS_RELATED_TO = "ir"; + + /** byte representation for COLUMN_PREFIX_RELATES_TO */ + public static final byte[] COLUMN_PREFIX_IS_RELATED_TO_BYTES = Bytes + .toBytes(COLUMN_PREFIX_IS_RELATED_TO); + + /** separator in row key */ + public static final String ROW_KEY_SEPARATOR = "!"; + + /** zero bytes */ + public static final byte ZERO_BYTES = 0; + + /** byte representation of the separator in row key */ + public static final byte[] ROW_KEY_SEPARATOR_BYTES = Bytes + .toBytes(ROW_KEY_SEPARATOR); + + public static class Range { + private int startIdx; + private int endIdx; + + /** + * Defines a range from start index (inclusive) to end index (exclusive). + * + * @param start + * Starting index position + * @param end + * Ending index position (exclusive) + */ + public Range(int start, int end) { + if (start < 0 || end < start) { + throw new IllegalArgumentException( + "Invalid range, required that: 0 <= start <= end; start=" + start + + ", end=" + end); + } + + this.startIdx = start; + this.endIdx = end; + } + + public int start() { + return startIdx; + } + + public int end() { + return endIdx; + } + + public int length() { + return endIdx - startIdx; + } + } + + /** + * Constructs a row key prefix for the entity table + * @param clusterId + * @param userId + * @param flowId + * @param flowRunId + * @param appId + * @return + */ + static byte[] getRowKeyPrefix(String clusterId, String userId, String flowId, + Long flowRunId, String appId) { + return TimelineWriterUtils.join(EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, + Bytes.toBytes(userId), Bytes.toBytes(clusterId), Bytes.toBytes(flowId), + Bytes.toBytes(TimelineWriterUtils.encodeRunId(flowRunId)), + Bytes.toBytes(appId)); + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java new file mode 100644 index 0000000..48f2f59 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -0,0 +1,281 @@ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.EntityTableDetails; + +/** + * This implements a hbase based backend for storing application timeline + * information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class HBaseTimelineWriterImpl extends AbstractService implements + TimelineWriter { + + private final Table entityTable; + private final Connection conn; + + private static final Log LOG = LogFactory + .getLog(HBaseTimelineWriterImpl.class); + + public HBaseTimelineWriterImpl(Configuration conf) throws IOException { + super(conf.get("yarn.application.id", "1")); + conn = ConnectionFactory.createConnection(conf); + // right now using a default table name for poc, should change later + // to use a config driven table name + TableName entityTableName = TableName + .valueOf(EntityTableDetails.DEFAULT_ENTITY_TABLE_NAME); + entityTable = conn.getTable(entityTableName); + } + + @Override + public TimelineWriteResponse write(String clusterId, String userId, + String flowName, String flowVersion, long flowRunId, String appId, + TimelineEntities data) throws IOException { + + byte[] rowKeyPrefix = EntityTableDetails.getRowKeyPrefix(clusterId, userId, + flowName, flowRunId, appId); + + TimelineWriteResponse putStatus = new TimelineWriteResponse(); + byte row[]; + for (TimelineEntity te : data.getEntities()) { + // get row key + row = TimelineWriterUtils.join(EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, + rowKeyPrefix, Bytes.toBytes(te.getType()), Bytes.toBytes(te.getId())); + + Put infoPut = getInfoPut(row, te); + Put eventPut = getEventPut(row, te.getEvents()); + Put configPut = getConfigPut(row, te.getConfigs()); + Put metricsPut = getMetricsPut(row, te.getMetrics()); + Put isRelatedToPut = getIsRelatedToPuts(row, te.getIsRelatedToEntities()); + Put relatesToPut = getRelatesToPuts(row, te.getRelatesToEntities()); + + List entityPuts = new ArrayList(); + entityPuts.add(infoPut); + if (configPut.size() > 0) { + entityPuts.add(configPut); + } + if (metricsPut.size() > 0) { + entityPuts.add(metricsPut); + } + if (eventPut.size() > 0) { + entityPuts.add(eventPut); + } + if (isRelatedToPut.size() > 0) { + entityPuts.add(isRelatedToPut); + } + if (relatesToPut.size() > 0) { + entityPuts.add(relatesToPut); + } + + LOG.info(" entity puts size " + entityPuts.size()); + if (entityPuts.size() > 0) { + entityTable.put(entityPuts); + } else { + LOG.warn("empty entity object?"); + } + } + + return putStatus; + } + + private Put getRelatesToPuts(byte[] row, + Map> relatesToEntities) { + Put relatesToEntitiesPut = new Put(row); + Set value; + for (String key : relatesToEntities.keySet()) { + value = relatesToEntities.get(key); + // create the columnName as prefix!key + byte[] columnName = TimelineWriterUtils.join( + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, + EntityTableDetails.COLUMN_PREFIX_RELATES_TO_BYTES, + Bytes.toBytes(key)); + relatesToEntitiesPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + columnName, + Bytes.toBytes(TimelineWriterUtils.getValueAsString(EntityTableDetails.ROW_KEY_SEPARATOR, value))); + } + return relatesToEntitiesPut; + } + + + private Put getIsRelatedToPuts(byte[] row, + Map> isRelatedToEntities) { + Put isRelatedToEntitiesPut = new Put(row); + Set value; + for (String key : isRelatedToEntities.keySet()) { + value = isRelatedToEntities.get(key); + // create the columnName as prefix!key + byte[] columnName = TimelineWriterUtils.join( + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, + EntityTableDetails.COLUMN_PREFIX_IS_RELATED_TO_BYTES, + Bytes.toBytes(key)); + isRelatedToEntitiesPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + columnName, + Bytes.toBytes(TimelineWriterUtils.getValueAsString(EntityTableDetails.ROW_KEY_SEPARATOR, value))); + } + return isRelatedToEntitiesPut; + } + + /** + * prepares the Put for storing {@linkplain TimelineEntity} info + * to the backend + * @param row + * @param te + * @return {@linkplain Put} + */ + private Put getInfoPut(byte[] row, TimelineEntity te) { + Put infoPut = new Put(row); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.ID.getInBytes(), Bytes.toBytes(te.getId())); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.TYPE.getInBytes(), Bytes.toBytes(te.getType())); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.CREATED_TIME.getInBytes(), + Bytes.toBytes(te.getCreatedTime())); + infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + EntityColumnDetails.MODIFIED_TIME.getInBytes(), + Bytes.toBytes(te.getModifiedTime())); + + if (LOG.isDebugEnabled()) { + try { + LOG.debug("info Put " + infoPut.toJSON()); + } catch (IOException e) { + LOG.debug("Caught exception while logging " + e); + } + } + return infoPut; + } + + /** + * generates a Put for the config object in the {@linkplain TimelineEntity} + * @param rowKey + * @param configs + * @return {@link Put} + */ + private Put getConfigPut(byte[] rowKey, Map configs) { + Put configPut = new Put(rowKey); + String key = ""; + Object value = ""; + for (Map.Entry entry : configs.entrySet()) { + key = entry.getKey(); + value = entry.getValue(); + configPut.addColumn(EntityTableDetails.COLUMN_FAMILY_CONFIG_BYTES, + Bytes.toBytes(key), + // object is stored as it's string value + // POC question: do we expect anything other than a string in the config? + Bytes.toBytes(value.toString())); + } + return configPut; + } + + /** + * creates a put for the {@linkplain TimelineMetric} object + * + * @param row + * @param metrics + * @return {@linkplain Put} + */ + private Put getMetricsPut(byte[] row, Set metrics) { + Put metricsPut = new Put(row); + + String key = ""; + for (TimelineMetric metric : metrics) { + key = metric.getId(); + Map timeseries = metric.getTimeSeries(); + for (Long timestamp : timeseries.keySet()) { + Long metricValue = (Long) timeseries.get(timestamp); + Cell cell = CellUtil.createCell(row, + EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES, Bytes.toBytes(key), + // set the cell timestamp + timestamp, + // KeyValue Type minimum + EntityTableDetails.ZERO_BYTES, + Bytes.toBytes(metricValue)); + try { + metricsPut.add(cell); + } catch (IOException e) { + LOG.error("Caught exception while adding cell to Put " + e); + } + } + } + return metricsPut; + } + + /** + * creates a put for the {@linkplain TimelineEvent} object + * + * @param row + * @param metrics + * @return {@linkplain Put} + */ + private Put getEventPut(byte[] row, Set events) { + Put eventsPut = new Put(row); + String key = ""; + String id = ""; + Long value = 1L; + for (TimelineEvent event : events) { + id = event.getId(); + + Map eventInfo = event.getInfo(); + if (eventInfo != null) { + for (Map.Entry info : eventInfo.entrySet()) { + key = info.getKey(); + eventsPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES, + TimelineWriterUtils.join( + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, + EntityTableDetails.COLUMN_PREFIX_EVENTS_BYTES, + Bytes.toBytes(id), Bytes.toBytes(key)), + /* + * the value here may not be string need to confirm what types are + * allowed then perhaps we need a function in TimelineEvent to + * return values appropriately Storing this in bytes is + * independent of which type of hbase table is chosen + */ + Bytes.toBytes(value)); + } + } + } + return eventsPut; + } + + @Override + public TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException { + return null; + } + + @Override + public void stop() { + try { + conn.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java new file mode 100644 index 0000000..5ed4f23 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineWriterUtils.java @@ -0,0 +1,218 @@ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.EntityTableDetails.Range; + +/** + * bunch of utility functions used across TimelineWriter classes + * + */ +public class TimelineWriterUtils { + + /** empty bytes */ + public static final byte[] EMPTY_BYTES = new byte[0]; + + private static final Log LOG = LogFactory + .getLog(TimelineWriterUtils.class); + + /** + * Returns a single byte array containing all of the individual component + * arrays separated by the separator array. + * + * @param separator + * @param components + * @return + */ + public static byte[] join(byte[] separator, byte[]... components) { + if (components == null || components.length == 0) { + return EMPTY_BYTES; + } + + int finalSize = 0; + if (separator != null) { + finalSize = separator.length * (components.length - 1); + } + for (byte[] comp : components) { + finalSize += comp.length; + } + + byte[] buf = new byte[finalSize]; + int offset = 0; + for (int i = 0; i < components.length; i++) { + System.arraycopy(components[i], 0, buf, offset, components[i].length); + offset += components[i].length; + if (i < (components.length - 1) && separator != null + && separator.length > 0) { + System.arraycopy(separator, 0, buf, offset, separator.length); + offset += separator.length; + } + } + return buf; + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see + * {@link ByteUtil#splitRanges(byte[], byte[])}. + * + * @param source + * @param separator + * @return + */ + public static byte[][] split(byte[] source, byte[] separator) { + return split(source, separator, -1); + } + + /** + * Splits the source array into multiple array segments using the given + * separator, up to a maximum of count items. This will naturally produce + * copied byte arrays for each of the split segments. To identify the split + * ranges without the array copies, see + * {@link ByteUtil#splitRanges(byte[], byte[])}. + * + * @param source + * @param separator + * @return + */ + public static byte[][] split(byte[] source, byte[] separator, int limit) { + List segments = splitRanges(source, separator, limit); + + byte[][] splits = new byte[segments.size()][]; + for (int i = 0; i < segments.size(); i++) { + Range r = segments.get(i); + byte[] tmp = new byte[r.length()]; + if (tmp.length > 0) { + System.arraycopy(source, r.start(), tmp, 0, r.length()); + } + splits[i] = tmp; + } + return splits; + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + */ + public static List splitRanges(byte[] source, byte[] separator) { + return splitRanges(source, separator, -1); + } + + /** + * Returns a list of ranges identifying [start, end) -- closed, open -- + * positions within the source byte array that would be split using the + * separator byte array. + * @param source the source data + * @param separator the separator pattern to look for + * @param limit the maximum number of splits to identify in the source + */ + public static List splitRanges(byte[] source, byte[] separator, int limit) { + List segments = new ArrayList(); + int start = 0; + itersource: for (int i = 0; i < source.length; i++) { + for (int j = 0; j < separator.length; j++) { + if (source[i + j] != separator[j]) { + continue itersource; + } + } + // all separator elements matched + if (limit > 0 && segments.size() >= (limit-1)) { + // everything else goes in one final segment + break; + } + + segments.add(new Range(start, i)); + start = i + separator.length; + // i will be incremented again in outer for loop + i += separator.length-1; + } + // add in remaining to a final range + if (start <= source.length) { + segments.add(new Range(start, source.length)); + } + return segments; + } + + /** + * converts run id into it's inverse timestamp + * @param flowRunId + * @return inverted long + */ + public static long encodeRunId(Long flowRunId) { + return Long.MAX_VALUE - flowRunId; + } + + /** + * return a value from the NavigableMap as a String + * @param key + * @param taskValues + * @return value as a String or "" + */ + public static String getValueAsString(final byte[] key, + final Map taskValues) { + byte[] value = taskValues.get(key); + if (value != null) { + return Bytes.toString(value); + } else { + return ""; + } + } + + /** + * return a value from the NavigableMap as a long + * @param key + * @param taskValues + * @return value as Long or 0L + */ + public static long getValueAsLong(final byte[] key, + final Map taskValues) { + byte[] value = taskValues.get(key); + if (value != null) { + try { + long retValue = Bytes.toLong(value); + return retValue; + } catch (NumberFormatException nfe) { + LOG.error("Caught NFE while converting to long ", nfe); + return 0L; + } catch (IllegalArgumentException iae ) { + // for exceptions like java.lang.IllegalArgumentException: + // offset (0) + length (8) exceed the capacity of the array: 7 + LOG.error("Caught IAE while converting to long ", iae); + return 0L; + } + } else { + return 0L; + } + } + + /** + * concates the values from a Set to return a single delimited string value + * @param rowKeySeparator + * @param values + * @return + */ + public static String getValueAsString(String rowKeySeparator, + Set values) { + + StringBuilder concatStrings = new StringBuilder(); + for (String value : values) { + concatStrings.append(value); + concatStrings.append(rowKeySeparator); + } + // remove the last separator + if(concatStrings.length() > 1) { + concatStrings.deleteCharAt(concatStrings.lastIndexOf(rowKeySeparator)); + } + return concatStrings.toString(); + } + +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/scripts/create_schema.rb hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/scripts/create_schema.rb new file mode 100644 index 0000000..0adf295 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/scripts/create_schema.rb @@ -0,0 +1,6 @@ + +create 'ats.entity', + {NAME => 'i', COMPRESSION => 'LZO', BLOOMFILTER => 'ROWCOL'}, + {NAME => 'm', VERSIONS => 2147483647, MIN_VERSIONS => 1, COMPRESSION => 'LZO', BLOCKCACHE => false, TTL => '2592000'}, + {NAME => 'c', COMPRESSION => 'LZO', BLOCKCACHE => false, BLOOMFILTER => 'ROWCOL' } + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java new file mode 100644 index 0000000..21bc6b9 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.junit.BeforeClass; +import org.junit.AfterClass; +import org.junit.Test; + +public class TestHBaseTimelineWriterImpl { + + /** + * Unit test for PoC YARN 3411 + * + * @throws Exception + */ + + private static HBaseTestingUtility UTIL; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + UTIL = new HBaseTestingUtility(); + UTIL.startMiniCluster(); + createSchema(); + } + + private static HTable createSchema() throws IOException { + byte[][] families = new byte[3][]; + families[0] = EntityTableDetails.COLUMN_FAMILY_INFO_BYTES; + families[1] = EntityTableDetails.COLUMN_FAMILY_CONFIG_BYTES; + families[2] = EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES; + return UTIL.createTable(EntityTableDetails.DEFAULT_ENTITY_TABLE_NAME_BYTES, + families, Integer.MAX_VALUE); + } + + @Test + public void testWriteEntityToHBase() throws Exception { + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + String id = "hello"; + String type = "world"; + entity.setId(id); + entity.setType(type); + Long cTime = 1425016501000L; + Long mTime = 1425026901000L; + entity.setCreatedTime(cTime); + entity.setModifiedTime(mTime); + + // add the isRelatedToEntity info + String key = "task"; + String value = "is_related_to_entity_id_here"; + Set isRelatedToSet = new HashSet(); + isRelatedToSet.add(value); + Map> isRelatedTo = new HashMap>(); + isRelatedTo.put(key, isRelatedToSet); + entity.setIsRelatedToEntities(isRelatedTo); + + // add the relatesTo info + key = "container"; + value = "relates_to_entity_id_here"; + Set relatesToSet = new HashSet(); + relatesToSet.add(value); + value = "relates_to_entity_id_here_Second"; + relatesToSet.add(value); + Map> relatesTo = new HashMap>(); + relatesTo.put(key, relatesToSet); + entity.setRelatesToEntities(relatesTo); + + // add some config entries + Map conf = new HashMap(); + conf.put("config_param1", "value1"); + conf.put("config_param2", "value2"); + entity.addConfigs(conf); + + // add metrics + Set metrics = new HashSet<>(); + TimelineMetric m1 = new TimelineMetric(); + m1.setId("MAP_SLOT_MILLIS"); + Map metricValues = new HashMap(); + metricValues.put(1429741609000L, 1000L); + metricValues.put(1429742609000L, 2000L); + metricValues.put(1429743609000L, 3000L); + metricValues.put(1429744609000L, 4000L); + metricValues.put(1429745609000L, 5000L); + metricValues.put(1429746609000L, 6000L); + m1.setTimeSeries(metricValues); + metrics.add(m1); + entity.addMetrics(metrics); + + te.addEntity(entity); + + HBaseTimelineWriterImpl hbi = null; + try { + Configuration c1 = UTIL.getConfiguration(); + hbi = new HBaseTimelineWriterImpl(c1); + hbi.init(c1); + hbi.start(); + String cluster = "cluster1"; + String user = "user1"; + String flow = "some_flow_name"; + String flowVersion = "AB7822C10F1111"; + Long runid = 1002345678919L; + String appName = "some app name"; + hbi.write(cluster, user, flow, flowVersion, runid, appName, te); + + // scan the table and see that entity exists + Scan s = new Scan(); + byte[] startRow = EntityTableDetails.getRowKeyPrefix(cluster, user, flow, + runid, appName); + s.setStartRow(startRow); + s.setMaxVersions(Integer.MAX_VALUE); + ResultScanner scanner = null; + TableName entityTableName = TableName + .valueOf(EntityTableDetails.DEFAULT_ENTITY_TABLE_NAME); + Connection conn = ConnectionFactory.createConnection(c1); + Table entityTable = conn.getTable(entityTableName); + int rowCount = 0; + int colCount = 0; + scanner = entityTable.getScanner(s); + for (Result result : scanner) { + if (result != null && !result.isEmpty()) { + rowCount++; + colCount += result.size(); + byte[] row1 = result.getRow(); + assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, + entity)); + + // check info column family + NavigableMap infoValues = result + .getFamilyMap(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES); + String id1 = TimelineWriterUtils.getValueAsString( + EntityColumnDetails.ID.getInBytes(), infoValues); + assertEquals(id, id1); + String type1 = TimelineWriterUtils.getValueAsString( + EntityColumnDetails.TYPE.getInBytes(), infoValues); + assertEquals(type, type1); + Long cTime1 = TimelineWriterUtils.getValueAsLong( + EntityColumnDetails.CREATED_TIME.getInBytes(), infoValues); + assertEquals(cTime1, cTime); + Long mTime1 = TimelineWriterUtils.getValueAsLong( + EntityColumnDetails.MODIFIED_TIME.getInBytes(), infoValues); + assertEquals(mTime1, mTime); + checkRelatedEntities(isRelatedTo, infoValues, + EntityTableDetails.COLUMN_PREFIX_IS_RELATED_TO_BYTES); + checkRelatedEntities(relatesTo, infoValues, + EntityTableDetails.COLUMN_PREFIX_RELATES_TO_BYTES); + + // check config column family + NavigableMap configValuesResult = result + .getFamilyMap(EntityTableDetails.COLUMN_FAMILY_CONFIG_BYTES); + checkConfigs(configValuesResult, conf); + + NavigableMap metricsResult = result + .getFamilyMap(EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES); + checkMetricsSizeAndKey(metricsResult, metrics); + List metricCells = result.getColumnCells( + EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES, + Bytes.toBytes(m1.getId())); + checkMetricsTimeseries(metricCells, m1); + } + } + assertEquals(1, rowCount); + assertEquals(14, colCount); + + } finally { + hbi.close(); + } + } + + private void checkMetricsTimeseries(List metricCells, + TimelineMetric m1) { + + Map timeseries = m1.getTimeSeries(); + assertEquals(metricCells.size(), timeseries.size()); + for (Cell c1 : metricCells) { + assertTrue(timeseries.containsKey(c1.getTimestamp())); + long value = (long) timeseries.get(c1.getTimestamp()); + assertEquals(Bytes.toLong(CellUtil.cloneValue(c1)), value); + } + } + + private void checkMetricsSizeAndKey( + NavigableMap metricsResult, Set metrics) { + assertEquals(metrics.size(), metricsResult.size()); + for (TimelineMetric m1 : metrics) { + byte[] key = Bytes.toBytes(m1.getId()); + assertTrue(metricsResult.containsKey(key)); + } + } + + private void checkConfigs(NavigableMap configValuesResult, + Map conf) { + + assertEquals(conf.size(), configValuesResult.size()); + byte[] columnName; + for (String key : conf.keySet()) { + columnName = Bytes.toBytes(key); + assertTrue(configValuesResult.containsKey(columnName)); + byte[] value = configValuesResult.get(columnName); + assertNotNull(value); + assertEquals(conf.get(key), Bytes.toString(value)); + } + } + + private void checkRelatedEntities(Map> isRelatedTo, + NavigableMap infoValues, byte[] columnPrefix) { + + for (String key : isRelatedTo.keySet()) { + byte[] columnName = TimelineWriterUtils.join( + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES, columnPrefix, + Bytes.toBytes(key)); + + byte[] value = infoValues.get(columnName); + assertNotNull(value); + String isRelatedToEntities = Bytes.toString(value); + assertNotNull(isRelatedToEntities); + assertEquals(TimelineWriterUtils.getValueAsString( + EntityTableDetails.ROW_KEY_SEPARATOR, isRelatedTo.get(key)), + isRelatedToEntities); + } + } + + private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, + String flow, Long runid, String appName, TimelineEntity te) { + + byte[][] rowKeyComponents = TimelineWriterUtils.split(rowKey, + EntityTableDetails.ROW_KEY_SEPARATOR_BYTES); + + assertTrue(rowKeyComponents.length == 7); + assertEquals(user, Bytes.toString(rowKeyComponents[0])); + assertEquals(cluster, Bytes.toString(rowKeyComponents[1])); + assertEquals(flow, Bytes.toString(rowKeyComponents[2])); + assertEquals(TimelineWriterUtils.encodeRunId(runid), + Bytes.toLong(rowKeyComponents[3])); + assertEquals(appName, Bytes.toString(rowKeyComponents[4])); + assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5])); + assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6])); + return true; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } +}