diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index f974aee..ce88897 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -67,6 +67,30 @@
+ org.apache.hbase
+ hbase-client
+ 1.0.0
+
+
+
+ org.apache.hbase
+ hbase-server
+ 1.0.0
+
+
+
+ org.apache.hbase
+ hbase-testing-util
+ 1.0.0
+
+
+
+
com.google.guava
guava
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
new file mode 100644
index 0000000..ac77656
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityColumnDetails.java
@@ -0,0 +1,33 @@
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Contains the Column names, types and byte representations for
+ * {@linkplain TimelineEntity} object that is stored in hbase
+ *
+ */
+public enum EntityColumnDetails {
+ ID(String.class),
+ TYPE(String.class),
+ CREATED_TIME(Long.class),
+ MODIFIED_TIME(Long.class);
+
+ private final Class> className;
+ private final byte[] inBytes;
+
+ private EntityColumnDetails(Class> className) {
+ this.className = className;
+ this.inBytes = Bytes.toBytes(this.name().toLowerCase());
+ }
+
+ public Class> getClassName() {
+ return className;
+ }
+
+
+ public byte[] getInBytes() {
+ return inBytes;
+ }
+
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTableDetails.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTableDetails.java
new file mode 100644
index 0000000..1561ef1
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/EntityTableDetails.java
@@ -0,0 +1,294 @@
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class EntityTableDetails {
+ /** default value for entity table name */
+ public static final String DEFAULT_ENTITY_TABLE_NAME = "ats.entity";
+
+ /** in bytes default value for entity table name */
+ public static final byte[] DEFAULT_ENTITY_TABLE_NAME_BYTES =
+ Bytes.toBytes(DEFAULT_ENTITY_TABLE_NAME);
+
+ /** column family info */
+ public static final String COLUMN_FAMILY_INFO = "i";
+
+ /** byte representation for column family info */
+ public static final byte[] COLUMN_FAMILY_INFO_BYTES = Bytes
+ .toBytes(COLUMN_FAMILY_INFO);
+
+ /** column family metrics */
+ public static final String COLUMN_FAMILY_METRICS = "m";
+
+ /** byte representation for column family metrics */
+ public static final byte[] COLUMN_FAMILY_METRICS_BYTES = Bytes
+ .toBytes(COLUMN_FAMILY_METRICS);
+
+ /** column family config */
+ public static final String COLUMN_FAMILY_CONFIG = "c";
+
+ /** byte representation for column family config */
+ public static final byte[] COLUMN_FAMILY_CONFIG_BYTES = Bytes
+ .toBytes(COLUMN_FAMILY_CONFIG);
+
+ /** column prefix for events */
+ public static final String COLUMN_PREFIX_EVENTS = "e";
+
+ /** byte representation for column family config */
+ public static final byte[] COLUMN_PREFIX_EVENTS_BYTES = Bytes
+ .toBytes(COLUMN_PREFIX_EVENTS);
+
+ /** column prefix for relatesTo */
+ public static final String COLUMN_PREFIX_RELATES_TO = "r";
+
+ /** byte representation for COLUMN_PREFIX_RELATES_TO */
+ public static final byte[] COLUMN_PREFIX_RELATES_TO_BYTES = Bytes
+ .toBytes(COLUMN_PREFIX_RELATES_TO);
+
+ /** separator in row key */
+ public static final String ROW_KEY_SEPARATOR = "!";
+
+ /** empty bytes */
+ public static final byte[] EMPTY_BYTES = new byte[0];
+
+ /** zero bytes */
+ public static final byte ZERO_BYTES = 0;
+
+ /** byte representation of the separator in row key */
+ public static final byte[] ROW_KEY_SEPARATOR_BYTES = Bytes
+ .toBytes(ROW_KEY_SEPARATOR);
+
+ private static final Log LOG = LogFactory.getLog(EntityTableDetails.class);
+
+ public static class Range {
+ private int startIdx;
+ private int endIdx;
+
+ /**
+ * Defines a range from start index (inclusive) to end index (exclusive).
+ *
+ * @param start
+ * Starting index position
+ * @param end
+ * Ending index position (exclusive)
+ */
+ public Range(int start, int end) {
+ if (start < 0 || end < start) {
+ throw new IllegalArgumentException(
+ "Invalid range, required that: 0 <= start <= end; start=" + start
+ + ", end=" + end);
+ }
+
+ this.startIdx = start;
+ this.endIdx = end;
+ }
+
+ public int start() {
+ return startIdx;
+ }
+
+ public int end() {
+ return endIdx;
+ }
+
+ public int length() {
+ return endIdx - startIdx;
+ }
+ }
+
+ /**
+ * Constructs a row key prefix for the entity table
+ * @param clusterId
+ * @param userId
+ * @param flowId
+ * @param flowRunId
+ * @param appId
+ * @return
+ */
+ static byte[] getRowKeyPrefix(String clusterId, String userId, String flowId,
+ Long flowRunId, String appId) {
+ return join(EntityTableDetails.ROW_KEY_SEPARATOR_BYTES,
+ Bytes.toBytes(userId), Bytes.toBytes(clusterId), Bytes.toBytes(flowId),
+ Bytes.toBytes(encodeRunId(flowRunId)),
+ Bytes.toBytes(appId));
+ }
+
+ /**
+ * Returns a single byte array containing all of the individual component
+ * arrays separated by the separator array.
+ *
+ * @param separator
+ * @param components
+ * @return
+ */
+ public static byte[] join(byte[] separator, byte[]... components) {
+ if (components == null || components.length == 0) {
+ return EMPTY_BYTES;
+ }
+
+ int finalSize = 0;
+ if (separator != null) {
+ finalSize = separator.length * (components.length - 1);
+ }
+ for (byte[] comp : components) {
+ finalSize += comp.length;
+ }
+
+ byte[] buf = new byte[finalSize];
+ int offset = 0;
+ for (int i = 0; i < components.length; i++) {
+ System.arraycopy(components[i], 0, buf, offset, components[i].length);
+ offset += components[i].length;
+ if (i < (components.length - 1) && separator != null
+ && separator.length > 0) {
+ System.arraycopy(separator, 0, buf, offset, separator.length);
+ offset += separator.length;
+ }
+ }
+ return buf;
+ }
+
+ /**
+ * Splits the source array into multiple array segments using the given
+ * separator, up to a maximum of count items. This will naturally produce
+ * copied byte arrays for each of the split segments. To identify the split
+ * ranges without the array copies, see
+ * {@link ByteUtil#splitRanges(byte[], byte[])}.
+ *
+ * @param source
+ * @param separator
+ * @return
+ */
+ public static byte[][] split(byte[] source, byte[] separator) {
+ return split(source, separator, -1);
+ }
+
+ /**
+ * Splits the source array into multiple array segments using the given
+ * separator, up to a maximum of count items. This will naturally produce
+ * copied byte arrays for each of the split segments. To identify the split
+ * ranges without the array copies, see
+ * {@link ByteUtil#splitRanges(byte[], byte[])}.
+ *
+ * @param source
+ * @param separator
+ * @return
+ */
+ public static byte[][] split(byte[] source, byte[] separator, int limit) {
+ List segments = splitRanges(source, separator, limit);
+
+ byte[][] splits = new byte[segments.size()][];
+ for (int i = 0; i < segments.size(); i++) {
+ Range r = segments.get(i);
+ byte[] tmp = new byte[r.length()];
+ if (tmp.length > 0) {
+ System.arraycopy(source, r.start(), tmp, 0, r.length());
+ }
+ splits[i] = tmp;
+ }
+ return splits;
+ }
+
+ /**
+ * Returns a list of ranges identifying [start, end) -- closed, open --
+ * positions within the source byte array that would be split using the
+ * separator byte array.
+ */
+ public static List splitRanges(byte[] source, byte[] separator) {
+ return splitRanges(source, separator, -1);
+ }
+
+ /**
+ * Returns a list of ranges identifying [start, end) -- closed, open --
+ * positions within the source byte array that would be split using the
+ * separator byte array.
+ * @param source the source data
+ * @param separator the separator pattern to look for
+ * @param limit the maximum number of splits to identify in the source
+ */
+ public static List splitRanges(byte[] source, byte[] separator, int limit) {
+ List segments = new ArrayList();
+ int start = 0;
+ itersource: for (int i = 0; i < source.length; i++) {
+ for (int j = 0; j < separator.length; j++) {
+ if (source[i + j] != separator[j]) {
+ continue itersource;
+ }
+ }
+ // all separator elements matched
+ if (limit > 0 && segments.size() >= (limit-1)) {
+ // everything else goes in one final segment
+ break;
+ }
+
+ segments.add(new Range(start, i));
+ start = i + separator.length;
+ // i will be incremented again in outer for loop
+ i += separator.length-1;
+ }
+ // add in remaining to a final range
+ if (start <= source.length) {
+ segments.add(new Range(start, source.length));
+ }
+ return segments;
+ }
+
+ /**
+ * converts run id into it's inverse timestamp
+ * @param flowRunId
+ * @return inverted long
+ */
+ public static long encodeRunId(Long flowRunId) {
+ return Long.MAX_VALUE - flowRunId;
+ }
+
+ /**
+ * return a value from the NavigableMap as a String
+ * @param key
+ * @param taskValues
+ * @return value as a String or ""
+ */
+ public static String getValueAsString(final byte[] key,
+ final Map taskValues) {
+ byte[] value = taskValues.get(key);
+ if (value != null) {
+ return Bytes.toString(value);
+ } else {
+ return "";
+ }
+ }
+
+ /**
+ * return a value from the NavigableMap as a long
+ * @param key
+ * @param taskValues
+ * @return value as Long or 0L
+ */
+ public static long getValueAsLong(final byte[] key,
+ final Map taskValues) {
+ byte[] value = taskValues.get(key);
+ if (value != null) {
+ try {
+ long retValue = Bytes.toLong(value);
+ return retValue;
+ } catch (NumberFormatException nfe) {
+ LOG.error("Caught NFE while converting to long ", nfe);
+ return 0L;
+ } catch (IllegalArgumentException iae ) {
+ // for exceptions like java.lang.IllegalArgumentException:
+ // offset (0) + length (8) exceed the capacity of the array: 7
+ LOG.error("Caught IAE while converting to long ", iae);
+ return 0L;
+ }
+ } else {
+ return 0L;
+ }
+ }
+
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
new file mode 100644
index 0000000..c91e4cb
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java
@@ -0,0 +1,240 @@
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.server.timelineservice.storage.EntityTableDetails;
+
+/**
+ * This implements a hbase based backend for storing application timeline
+ * information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class HBaseTimelineWriterImpl extends AbstractService implements
+ TimelineWriter {
+
+ private final Table entityTable;
+ private final Connection conn;
+
+ private static final Log LOG = LogFactory
+ .getLog(HBaseTimelineWriterImpl.class);
+
+ public HBaseTimelineWriterImpl(Configuration conf) throws IOException {
+ super(conf.get("yarn.application.id", "1"));
+ conn = ConnectionFactory.createConnection(conf);
+ // right now using a default table name for poc, should change later
+ // to use a config driven table name
+ TableName entityTableName = TableName
+ .valueOf(EntityTableDetails.DEFAULT_ENTITY_TABLE_NAME);
+ entityTable = conn.getTable(entityTableName);
+ }
+
+ @Override
+ public TimelineWriteResponse write(String clusterId, String userId,
+ String flowName, String flowVersion, long flowRunId, String appId,
+ TimelineEntities data) throws IOException {
+
+ byte[] rowKeyPrefix = EntityTableDetails.getRowKeyPrefix(clusterId, userId,
+ flowName, flowRunId, appId);
+
+ TimelineWriteResponse putStatus = new TimelineWriteResponse();
+ byte row[];
+ for (TimelineEntity te : data.getEntities()) {
+ // get row key
+ row = EntityTableDetails.join(EntityTableDetails.ROW_KEY_SEPARATOR_BYTES,
+ rowKeyPrefix, Bytes.toBytes(te.getType()), Bytes.toBytes(te.getId()));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" row key prefix: " + Bytes.toString(rowKeyPrefix));
+ }
+
+ // it would be nice to have a way
+ // of distinguishing what info is new in this TimelineEntity object
+ // then only that part can be written to the backend
+ Put infoPut = getInfoPut(row, te);
+ Put eventPut = getEventPut(row, te.getEvents());
+ Put configPut = getConfigPut(row, te.getConfigs());
+ Put metricsPut = getMetricsPut(row, te.getMetrics());
+ List entityPuts = new ArrayList();
+ entityPuts.add(infoPut);
+ if (configPut.size() > 0) {
+ entityPuts.add(configPut);
+ }
+ if (metricsPut.size() > 0) {
+ entityPuts.add(metricsPut);
+ }
+ if (eventPut.size() > 0) {
+ entityPuts.add(eventPut);
+ }
+
+ LOG.info(" entity puts size " + entityPuts.size());
+ if (entityPuts.size() > 0) {
+ entityTable.put(entityPuts);
+ } else {
+ LOG.warn("empty entity object?");
+ }
+ }
+
+ return putStatus;
+ }
+
+ /**
+ * prepares the Put for storing {@linkplain TimelineEntity} info
+ * to the backend
+ * @param row
+ * @param te
+ * @return {@linkplain Put}
+ */
+ private Put getInfoPut(byte[] row, TimelineEntity te) {
+ Put infoPut = new Put(row);
+ infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES,
+ EntityColumnDetails.ID.getInBytes(), Bytes.toBytes(te.getId()));
+ infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES,
+ EntityColumnDetails.TYPE.getInBytes(), Bytes.toBytes(te.getType()));
+ infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES,
+ EntityColumnDetails.CREATED_TIME.getInBytes(),
+ Bytes.toBytes(te.getCreatedTime()));
+ infoPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES,
+ EntityColumnDetails.MODIFIED_TIME.getInBytes(),
+ Bytes.toBytes(te.getModifiedTime()));
+
+ if (LOG.isDebugEnabled()) {
+ try {
+ LOG.debug("info Put " + infoPut.toJSON());
+ } catch (IOException e) {
+ LOG.debug("Caught exception while logging " + e);
+ }
+ }
+ return infoPut;
+ }
+
+ /**
+ * generates a Put for the config object in the {@linkplain TimelineEntity}
+ * @param rowKey
+ * @param configs
+ * @return {@link Put}
+ */
+ private Put getConfigPut(byte[] rowKey, Map configs) {
+ Put configPut = new Put(rowKey);
+ String key = "";
+ Object value = "";
+ for (Map.Entry entry : configs.entrySet()) {
+ key = entry.getKey();
+ value = entry.getValue();
+ configPut.addColumn(EntityTableDetails.COLUMN_FAMILY_CONFIG_BYTES,
+ Bytes.toBytes(key),
+ // object is stored as it's string value
+ // POC question: do we expect anything other than a string in the config?
+ Bytes.toBytes(value.toString()));
+ }
+ return configPut;
+ }
+
+ /**
+ * creates a put for the {@linkplain TimelineMetric} object
+ * @param row
+ * @param metrics
+ * @return {@linkplain Put}
+ */
+ private Put getMetricsPut(byte[] row, Set metrics) {
+ Put metricsPut = new Put(row);
+
+ String key = "";
+ Long value = 1L;
+ for (TimelineMetric metric : metrics) {
+ key = metric.getId();
+ // Need to add a getLatestValue API to TimlineMetrics like metric.getLatestValue();
+ value = 1L;
+ Cell cell = CellUtil.createCell(row,
+ EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES,
+ Bytes.toBytes(key),
+ // set the cell timestamp
+ metric.getEndTime(), // perhaps add a getLatestTimestamp() to TimelineMetric,
+ EntityTableDetails.ZERO_BYTES,
+ Bytes.toBytes(value));
+ try {
+ metricsPut.add(cell);
+ } catch (IOException e) {
+ LOG.error("Caught exception while adding cell to Put " + e);
+ }
+ }
+ return metricsPut;
+ }
+
+ /**
+ * creates a put for the {@linkplain TimelineEvent} object
+ *
+ * @param row
+ * @param metrics
+ * @return {@linkplain Put}
+ */
+ private Put getEventPut(byte[] row, Set events) {
+ Put eventsPut = new Put(row);
+ String key = "";
+ String id = "";
+ Long value = 1L;
+ for (TimelineEvent event : events) {
+ id = event.getId();
+
+ Map eventInfo = event.getInfo();
+ if (eventInfo != null) {
+ for (Map.Entry info : eventInfo.entrySet()) {
+ key = info.getKey();
+ eventsPut.addColumn(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES,
+ EntityTableDetails.join(
+ EntityTableDetails.ROW_KEY_SEPARATOR_BYTES,
+ EntityTableDetails.COLUMN_PREFIX_EVENTS_BYTES,
+ Bytes.toBytes(id), Bytes.toBytes(key)),
+ /*
+ * the value here may not be string need to confirm what types are
+ * allowed then perhaps we need a function in TimelineEvent to
+ * return values appropriately Storing this in bytes is
+ * independent of which type of hbase table is chosen
+ */
+ Bytes.toBytes(value));
+ }
+ }
+ }
+ return eventsPut;
+ }
+
+ @Override
+ public TimelineWriteResponse aggregate(TimelineEntity data,
+ TimelineAggregationTrack track) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ try {
+ conn.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
new file mode 100644
index 0000000..4471728
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineWriterImpl.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+import java.io.IOException;
+import java.util.NavigableMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHBaseTimelineWriterImpl {
+
+ /**
+ * Unit test for PoC YARN 3411
+ * @throws Exception
+ */
+
+ private static HBaseTestingUtility UTIL;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ UTIL = new HBaseTestingUtility();
+ UTIL.startMiniCluster();
+ System.out.println(" in setup");
+ createSchema();
+ }
+
+ private static HTable createSchema() throws IOException {
+ byte [][] families = new byte[3][];
+ families[0] = EntityTableDetails.COLUMN_FAMILY_INFO_BYTES;
+ families[1] = EntityTableDetails.COLUMN_FAMILY_CONFIG_BYTES;
+ families[2] = EntityTableDetails.COLUMN_FAMILY_METRICS_BYTES;
+ return UTIL.createTable(EntityTableDetails.DEFAULT_ENTITY_TABLE_NAME_BYTES,
+ families);
+ }
+
+ @Test
+ public void testWriteEntityToHBase() throws Exception {
+ TimelineEntities te = new TimelineEntities();
+ TimelineEntity entity = new TimelineEntity();
+ String id = "hello";
+ String type = "world";
+ entity.setId(id);
+ entity.setType(type);
+ Long cTime = 1425016501000L;
+ Long mTime = 1425026901000L;
+ entity.setCreatedTime(cTime);
+ entity.setModifiedTime(mTime);
+ te.addEntity(entity);
+
+ HBaseTimelineWriterImpl hbi = null;
+ try {
+ Configuration c1 = UTIL.getConfiguration();
+ hbi = new HBaseTimelineWriterImpl(c1);
+ hbi.init(c1);
+ hbi.start();
+ String cluster = "cluster1";
+ String user = "user1";
+ String flow = "some_flow_name";
+ String flowVersion = "AB7822C10F1111";
+ Long runid = 1002345678919L;
+ String appName = "some app name";
+ hbi.write(cluster, user, flow, flowVersion, runid, appName, te);
+
+ // scan the table and see that entity exists
+ Scan s = new Scan();
+ byte[] startRow = EntityTableDetails.getRowKeyPrefix(cluster, user, flow, runid, appName);
+ s.setStartRow(startRow);
+ ResultScanner scanner = null;
+ TableName entityTableName = TableName.valueOf(
+ EntityTableDetails.DEFAULT_ENTITY_TABLE_NAME);
+ Connection conn = ConnectionFactory.createConnection(c1);
+ Table entityTable = conn.getTable(entityTableName);
+ int rowCount = 0;
+ int colCount = 0;
+ scanner = entityTable.getScanner(s);
+ for (Result result : scanner) {
+ if (result != null && !result.isEmpty()) {
+ rowCount++;
+ colCount += result.size();
+ byte[] row1 = result.getRow();
+ assertTrue(isRowKeyCorrect(row1, cluster, user, flow, runid, appName, entity));
+ NavigableMap infoValues = result.getFamilyMap(EntityTableDetails.COLUMN_FAMILY_INFO_BYTES);
+ String id1 = EntityTableDetails.getValueAsString(EntityColumnDetails.ID.getInBytes(), infoValues);
+ assertEquals(id,id1);
+ String type1 = EntityTableDetails.getValueAsString(EntityColumnDetails.TYPE.getInBytes(), infoValues);
+ assertEquals(type,type1);
+ Long cTime1 = EntityTableDetails.getValueAsLong(EntityColumnDetails.CREATED_TIME.getInBytes(), infoValues);
+ assertEquals(cTime1,cTime);
+ Long mTime1 = EntityTableDetails.getValueAsLong(EntityColumnDetails.MODIFIED_TIME.getInBytes(), infoValues);
+ assertEquals(mTime1,mTime);
+ }
+ }
+ assertTrue(rowCount == 1);
+ assertTrue(colCount == 4);
+
+ } finally {
+ hbi.close();
+ }
+ }
+
+ private boolean isRowKeyCorrect(byte[] rowKey, String cluster, String user, String flow,
+ Long runid, String appName, TimelineEntity te) {
+
+ byte[][] rowKeyComponents = EntityTableDetails.split(rowKey, EntityTableDetails.ROW_KEY_SEPARATOR_BYTES);
+
+ assertTrue(rowKeyComponents.length == 7);
+ assertEquals(user, Bytes.toString(rowKeyComponents[0]));
+ assertEquals(cluster, Bytes.toString(rowKeyComponents[1]));
+ assertEquals(flow, Bytes.toString(rowKeyComponents[2]));
+ assertEquals(EntityTableDetails.encodeRunId(runid), Bytes.toLong(rowKeyComponents[3]));
+ assertEquals(appName, Bytes.toString(rowKeyComponents[4]));
+ assertEquals(te.getType(), Bytes.toString(rowKeyComponents[5]));
+ assertEquals(te.getId(), Bytes.toString(rowKeyComponents[6]));
+ return true;
+ }
+
+}