diff --git hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 114851f..d25d1d9 100644
--- hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -485,6 +485,17 @@
+
+
+
+
+
+
+
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index f974aee..f62230f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -120,6 +120,23 @@
mockito-all
test
+
+
+ org.apache.phoenix
+ phoenix-core
+ 4.3.0
+
+
+
+ jline
+ jline
+
+
+
+
+ com.google.guava
+ guava
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 4eced5b..bb7db12 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -27,11 +27,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
/**
@@ -55,11 +52,6 @@ public TimelineCollector(String name) {
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
- writer = ReflectionUtils.newInstance(conf.getClass(
- YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
- FileSystemTimelineWriterImpl.class,
- TimelineWriter.class), conf);
- writer.init(conf);
}
@Override
@@ -70,11 +62,10 @@ protected void serviceStart() throws Exception {
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
- writer.stop();
}
- public TimelineWriter getWriter() {
- return writer;
+ protected void setWriter(TimelineWriter w) {
+ this.writer = w;
}
/**
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index 7b3da6b..953d9b7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -23,9 +23,14 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import java.util.Collections;
import java.util.HashMap;
@@ -42,6 +47,19 @@
private static final Log LOG =
LogFactory.getLog(TimelineCollectorManager.class);
+ protected TimelineWriter writer;
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ writer = ReflectionUtils.newInstance(conf.getClass(
+ YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+ FileSystemTimelineWriterImpl.class,
+ TimelineWriter.class), conf);
+ writer.init(conf);
+ super.serviceInit(conf);
+ }
+
+
// access to this map is synchronized with the map itself
private final Map collectors =
Collections.synchronizedMap(
@@ -69,6 +87,7 @@ public TimelineCollector putIfAbsent(ApplicationId appId,
// initialize, start, and add it to the collection so it can be
// cleaned up when the parent shuts down
collector.init(getConfig());
+ collector.setWriter(writer);
collector.start();
collectors.put(appId, collector);
LOG.info("the collector for " + appId + " was added");
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index dd8ad06..27e69c4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -20,8 +20,9 @@
import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileWriter;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -83,7 +84,8 @@ private void write(String clusterId, String userId, String flowName,
String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,flowName,
flowVersion, String.valueOf(flowRun), appId, entity.getType());
String fileName = dir + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION;
- out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)));
+ out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(
+ new FileOutputStream(fileName),"UTF-8")));
out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
out.write("\n");
} catch (IOException ioe) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
new file mode 100644
index 0000000..67daa64
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+@Private
+@Unstable
+public class PhoenixTimelineWriterImpl extends AbstractService
+ implements TimelineWriter {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixTimelineWriterImpl.class);
+ private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER = "timeline_cf_placeholder";
+ // These lists are not taking effects in table creations.
+ private static final String[] PHOENIX_STORAGE_PK_LIST
+ = {"cluster", "user", "flow", "run", "appid", "type", "entityid"};
+ private static final String[] TIMELINE_EVENT_EXTRA_PK_LIST =
+ {"timestamp", "eventid"};
+ private static final String[] TIMELINE_METRIC_EXTRA_PK_LIST =
+ {"metricid"};
+
+ /** Default Phoenix JDBC driver name */
+ static final String DRIVER_CLASS_NAME = "org.apache.phoenix.jdbc.PhoenixDriver";
+ /** Default Phoenix timeline entity table name */
+ static final String ENTITY_TABLE_NAME = "TIMELINE_ENTITY";
+ /** Default Phoenix event table name */
+ static final String EVENT_TABLE_NAME = "TIMELINE_EVENT";
+ /** Default Phoenix metric table name */
+ static final String METRIC_TABLE_NAME = "METRIC_SINGLEDATA";
+ /** Default Phoenix timeline config column family */
+ static final String CONFIG_COLUMN_FAMILY = "c.";
+ /** Default Phoenix timeline info column family */
+ static final String INFO_COLUMN_FAMILY = "i.";
+ /** Default Phoenix event info column family */
+ static final String EVENT_INFO_COLUMN_FAMILY = "ei.";
+ /** Default Phoenix isRelatedTo column family */
+ static final String IS_RELATED_TO_FAMILY = "ir.";
+ /** Default Phoenix relatesTo column family */
+ static final String RELATES_TO_FAMILY = "rt.";
+ /** Default separator for Phoenix storage */
+ static final String STORAGE_SEPARATOR = ";";
+
+ /** Connection string to the deployed Phoenix cluster */
+ static final String CONN_STRING = "jdbc:phoenix:localhost:2181:/hbase";
+
+ // Connection in Phoenix is not thread safe so we have to keep thread local
+ // connections. All Phoenix connections share the same underlying HBase
+ // connection.
+ @Private
+ @VisibleForTesting
+ LoadingCache connectionCache = null;
+
+ PhoenixTimelineWriterImpl() {
+ super((PhoenixTimelineWriterImpl.class.getName()));
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ RemovalListener removalListener =
+ new RemovalListener() {
+ public void onRemoval(RemovalNotification removal) {
+ try {
+ shutdownConnection(removal.getValue());
+ } catch (Exception e) {
+ LOG.error("Exception on cache removal " + e.getMessage());
+ }
+ }
+ };
+ connectionCache = CacheBuilder.newBuilder().maximumSize(16)
+ .expireAfterAccess(10, TimeUnit.SECONDS).removalListener(
+ removalListener)
+ .build(new CacheLoader() {
+ @Override public Connection load(Thread key) throws Exception {
+ Connection conn = null;
+ try {
+ Class.forName(DRIVER_CLASS_NAME);
+ conn = DriverManager.getConnection(CONN_STRING);
+ conn.setAutoCommit(false);
+ } catch (SQLException se) {
+ LOG.error("Failed to connect to phoenix server! "
+ + se.getLocalizedMessage());
+ } catch (ClassNotFoundException e) {
+ LOG.error("Class not found! " + e.getLocalizedMessage());
+ }
+ return conn;
+ }
+ }
+ );
+ tryInitTable();
+ super.init(conf);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ // Close all Phoenix connections
+ for (Connection conn : connectionCache.asMap().values()) {
+ shutdownConnection(conn);
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public TimelineWriteResponse write(String clusterId, String userId,
+ String flowName, String flowVersion, long flowRunId, String appId,
+ TimelineEntities entities) throws IOException {
+ TimelineWriteResponse response = new TimelineWriteResponse();
+ TimelineCollectorContext currContext = new TimelineCollectorContext(
+ clusterId, userId, flowName, flowVersion, flowRunId, appId);
+ String sql = "UPSERT INTO " + ENTITY_TABLE_NAME
+ + " (" + StringUtils.join(PHOENIX_STORAGE_PK_LIST, ",")
+ + ", creationtime, modifiedtime, configs) "
+ + "VALUES (" + StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length)
+ + "?, ?, ?)";
+ Connection conn;
+ try {
+ conn = connectionCache.get(Thread.currentThread());
+ } catch (ExecutionException ee) {
+ throw new IOException(ee);
+ }
+
+ try (PreparedStatement ps = conn.prepareStatement(sql)) {
+ for (TimelineEntity entity : entities.getEntities()) {
+ int idx = setStringsForPrimaryKey(ps, currContext, entity, 1);
+ ps.setLong(idx++, entity.getCreatedTime());
+ ps.setLong(idx++, entity.getModifiedTime());
+ String configKeys = StringUtils.join(
+ entity.getConfigs().keySet(), STORAGE_SEPARATOR);
+ ps.setString(idx++, configKeys);
+ ps.addBatch();
+
+ storeEntityVariableLengthFields(entity, currContext, conn);
+ storeEvents(entity, currContext, conn);
+ storeMetrics(entity, currContext, conn);
+ }
+ ps.executeBatch();
+ conn.commit();
+ } catch (SQLException se) {
+ LOG.error("Failed to add entity to Phoenix " + se.getMessage());
+ }
+ return response;
+ }
+
+ /**
+ * Aggregates the entity information to the timeline store based on which
+ * track this entity is to be rolled up to The tracks along which aggregations
+ * are to be done are given by {@link TimelineAggregationTrack}
+ *
+ * Any errors occurring for individual write request objects will be reported
+ * in the response.
+ *
+ * @param data
+ * a {@link TimelineEntity} object
+ * a {@link TimelineAggregationTrack} enum value
+ * @return a {@link TimelineWriteResponse} object.
+ * @throws IOException
+ */
+ @Override
+ public TimelineWriteResponse aggregate(TimelineEntity data,
+ TimelineAggregationTrack track) throws IOException {
+ return null;
+
+ }
+
+ // Utility functions
+
+ private void shutdownConnection(Connection conn) throws Exception {
+ try {
+ conn.commit();
+ } catch (SQLException se) {
+ LOG.error("Failed to close the phoenix connection! "
+ + se.getLocalizedMessage());
+ } finally {
+ conn.close();
+ }
+ }
+
+ private void tryInitTable() throws Exception {
+ Connection conn = connectionCache.get(Thread.currentThread());
+ // Create tables if necessary
+ try (Statement stmt = conn.createStatement()) {
+ // Table schema defined as in YARN-3134.
+ String sql = "CREATE TABLE IF NOT EXISTS " + ENTITY_TABLE_NAME
+ + "(cluster VARCHAR NOT NULL, user VARCHAR NOT NULL, "
+ + "flow VARCHAR NOT NULL, run UNSIGNED_LONG NOT NULL, "
+ + "appid VARCHAR NOT NULL, type VARCHAR NOT NULL, "
+ + "entityid VARCHAR NOT NULL, "
+ + "creationtime UNSIGNED_LONG, modifiedtime UNSIGNED_LONG, "
+ + "parent VARCHAR, queue VARCHAR, configs VARCHAR, "
+ + CONFIG_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR, "
+ + INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR, "
+ + IS_RELATED_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR, "
+ + RELATES_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR "
+ + "CONSTRAINT pk PRIMARY KEY("
+ + "cluster, user, flow, run DESC, appid, type, entityid))";
+ stmt.executeUpdate(sql);
+ sql = "CREATE TABLE IF NOT EXISTS " + EVENT_TABLE_NAME
+ + "(cluster VARCHAR NOT NULL, user VARCHAR NOT NULL, "
+ + "flow VARCHAR NOT NULL, run UNSIGNED_LONG NOT NULL, "
+ + "appid VARCHAR NOT NULL, type VARCHAR NOT NULL, "
+ + "entityid VARCHAR NOT NULL, "
+ + "timestamp UNSIGNED_LONG NOT NULL, eventid VARCHAR NOT NULL, "
+ + EVENT_INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR "
+ + "CONSTRAINT pk PRIMARY KEY("
+ + "cluster, user, flow, run DESC, appid, type, entityid, "
+ + "timestamp DESC, eventid))";
+ stmt.executeUpdate(sql);
+ sql = "CREATE TABLE IF NOT EXISTS " + METRIC_TABLE_NAME
+ + "(cluster VARCHAR NOT NULL, user VARCHAR NOT NULL, "
+ + "flow VARCHAR NOT NULL, run UNSIGNED_LONG NOT NULL, "
+ + "appid VARCHAR NOT NULL, type VARCHAR NOT NULL, "
+ + "entityid VARCHAR NOT NULL, "
+ + "metricid VARCHAR NOT NULL, "
+ + "singledata VARBINARY "
+ + "CONSTRAINT pk PRIMARY KEY("
+ + "cluster, user, flow, run DESC, appid, type, entityid, metricid))";
+ stmt.executeUpdate(sql);
+ stmt.close();
+ conn.commit();
+ conn.close();
+ } catch (SQLException se) {
+ LOG.error("Failed in init data " + se.getLocalizedMessage());
+ }
+ return;
+ }
+
+ private static class ColumnFamilyInfo {
+ String columnFamilyPrefix;
+ Set columns;
+
+ public ColumnFamilyInfo(String columnFamilyPrefix, Set keyValues) {
+ this.columnFamilyPrefix = columnFamilyPrefix;
+ this.columns = keyValues;
+ }
+ }
+
+ private StringBuilder appendVarcharColumnsSQL(
+ StringBuilder colNames, ColumnFamilyInfo cfInfo) {
+ return appendColumnsSQL(colNames, cfInfo, " VARCHAR");
+ }
+
+ private StringBuilder appendColumnsSQL(
+ StringBuilder colNames, ColumnFamilyInfo cfInfo, String type) {
+ // Prepare the sql template by iterating through all keys
+ for (K key : cfInfo.columns) {
+ colNames.append(",").append(cfInfo.columnFamilyPrefix)
+ .append(key.toString()).append(type);
+ }
+ return colNames;
+ }
+
+ private int setStringsForColumnFamily(
+ PreparedStatement ps, Map keyValues, int startPos)
+ throws SQLException{
+ int idx = startPos;
+ for (Map.Entry entry : keyValues.entrySet()) {
+ V value = entry.getValue();
+ if (value instanceof Collection) {
+ ps.setString(idx++, StringUtils.join(
+ (Collection) value, STORAGE_SEPARATOR));
+ } else {
+ ps.setString(idx++, value.toString());
+ }
+ }
+ return idx;
+ }
+
+ private int setBytesForColumnFamily(
+ PreparedStatement ps, Map keyValues, int startPos)
+ throws SQLException{
+ int idx = startPos;
+ for (Map.Entry entry : keyValues.entrySet()) {
+ try {
+ ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue()));
+ } catch (IOException ie) {
+ LOG.error("Exception in converting values into bytes "
+ + ie.getMessage());
+ }
+ }
+ return idx;
+ }
+
+ private int setStringsForPrimaryKey(PreparedStatement ps,
+ TimelineCollectorContext context, TimelineEntity entity, int startPos)
+ throws SQLException {
+ int idx = startPos;
+ ps.setString(idx++, context.getClusterId());
+ ps.setString(idx++, context.getUserId());
+ ps.setString(idx++,
+ context.getFlowName() + STORAGE_SEPARATOR + context.getFlowVersion());
+ ps.setLong(idx++, context.getFlowRunId());
+ ps.setString(idx++, context.getAppId());
+ ps.setString(idx++, entity.getType());
+ ps.setString(idx++, entity.getId());
+ return idx;
+ }
+
+ private void storeEntityVariableLengthFields(TimelineEntity entity,
+ TimelineCollectorContext context, Connection conn) throws SQLException {
+ if (entity.getConfigs() == null
+ && entity.getInfo() == null
+ && entity.getIsRelatedToEntities() == null
+ && entity.getRelatesToEntities() == null) {
+ return;
+ }
+ int numPlaceholders = 0;
+ StringBuilder sqlColumns = new StringBuilder(
+ StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
+ if (entity.getConfigs() != null) {
+ appendColumnsSQL(sqlColumns, new ColumnFamilyInfo<>(
+ CONFIG_COLUMN_FAMILY, entity.getConfigs().keySet()), " VARBINARY");
+ numPlaceholders += entity.getConfigs().keySet().size();
+ }
+ if (entity.getInfo() != null) {
+ appendColumnsSQL(sqlColumns, new ColumnFamilyInfo<>(
+ INFO_COLUMN_FAMILY, entity.getInfo().keySet()), " VARBINARY");
+ numPlaceholders += entity.getInfo().keySet().size();
+ }
+ if (entity.getIsRelatedToEntities() != null) {
+ appendVarcharColumnsSQL(sqlColumns, new ColumnFamilyInfo<>(
+ IS_RELATED_TO_FAMILY, entity.getIsRelatedToEntities().keySet()));
+ numPlaceholders += entity.getIsRelatedToEntities().keySet().size();
+ }
+ if (entity.getRelatesToEntities() != null) {
+ appendVarcharColumnsSQL(sqlColumns, new ColumnFamilyInfo<>(
+ RELATES_TO_FAMILY, entity.getRelatesToEntities().keySet()));
+ numPlaceholders += entity.getRelatesToEntities().keySet().size();
+ }
+ if (numPlaceholders == 0) {
+ return;
+ }
+ StringBuilder placeholders = new StringBuilder();
+ placeholders.append(
+ StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length));
+ // numPlaceholders >= 1 now
+ placeholders.append("?")
+ .append(StringUtils.repeat(",?", numPlaceholders - 1));
+ String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ")
+ .append(ENTITY_TABLE_NAME).append(" (").append(sqlColumns)
+ .append(") VALUES(").append(placeholders).append(")").toString();
+ // Use try with resource statement for the prepared statement
+ try (PreparedStatement psVariableLengthFields =
+ conn.prepareStatement(sqlVariableLengthFields)) {
+ int idx = setStringsForPrimaryKey(
+ psVariableLengthFields, context, entity, 1);
+ if (entity.getConfigs() != null) {
+ idx = setBytesForColumnFamily(
+ psVariableLengthFields, entity.getConfigs(), idx);
+ }
+ if (entity.getInfo() != null) {
+ idx = setBytesForColumnFamily(
+ psVariableLengthFields, entity.getInfo(), idx);
+ }
+ if (entity.getIsRelatedToEntities() != null) {
+ idx = setStringsForColumnFamily(
+ psVariableLengthFields, entity.getIsRelatedToEntities(), idx);
+ }
+ if (entity.getRelatesToEntities() != null) {
+ idx = setStringsForColumnFamily(
+ psVariableLengthFields, entity.getRelatesToEntities(), idx);
+ }
+ psVariableLengthFields.execute();
+ }
+ }
+
+ private void storeMetrics(TimelineEntity entity,
+ TimelineCollectorContext context, Connection conn) throws SQLException {
+ if (entity.getMetrics() == null) {
+ return;
+ }
+ Set metrics = entity.getMetrics();
+ for (TimelineMetric metric : metrics) {
+ StringBuilder sqlColumns = new StringBuilder(
+ StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
+ sqlColumns.append(",")
+ .append(StringUtils.join(TIMELINE_METRIC_EXTRA_PK_LIST, ","));
+ sqlColumns.append(",").append("singledata");
+ StringBuilder placeholders = new StringBuilder();
+ placeholders.append(
+ StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length))
+ .append(StringUtils.repeat("?,", TIMELINE_METRIC_EXTRA_PK_LIST.length));
+ placeholders.append("?");
+ String sqlMetric = new StringBuilder("UPSERT INTO ")
+ .append(METRIC_TABLE_NAME).append(" (").append(sqlColumns)
+ .append(") VALUES(").append(placeholders).append(")").toString();
+ try (PreparedStatement psMetrics = conn.prepareStatement(sqlMetric)) {
+ int idx = setStringsForPrimaryKey(psMetrics, context, entity, 1);
+ psMetrics.setString(idx++, metric.getId());
+ // TODO: support time series storage
+ psMetrics.setBytes(idx++,
+ GenericObjectMapper.write(metric.getSingleData()));
+ psMetrics.execute();
+ } catch (IOException ie) {
+ LOG.error("Exception on converting single data to bytes: "
+ + ie.getMessage());
+ }
+ }
+ }
+
+ private void storeEvents(TimelineEntity entity,
+ TimelineCollectorContext context, Connection conn) throws SQLException {
+ if (entity.getEvents() == null) {
+ return;
+ }
+ Set events = entity.getEvents();
+ for (TimelineEvent event : events) {
+ // We need this number to check if the incoming event's info field is empty
+ int numPlaceholders = 0;
+ StringBuilder sqlColumns = new StringBuilder(
+ StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
+ sqlColumns.append(",")
+ .append(StringUtils.join(TIMELINE_EVENT_EXTRA_PK_LIST, ","));
+ appendVarcharColumnsSQL(sqlColumns, new ColumnFamilyInfo<>(
+ EVENT_INFO_COLUMN_FAMILY, event.getInfo().keySet()));
+ numPlaceholders += event.getInfo().keySet().size();
+ if (numPlaceholders == 0) {
+ continue;
+ }
+ StringBuilder placeholders = new StringBuilder();
+ placeholders.append(
+ StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length))
+ .append(StringUtils.repeat("?,", TIMELINE_EVENT_EXTRA_PK_LIST.length));
+ // numPlaceholders >= 1 now
+ placeholders.append("?")
+ .append(StringUtils.repeat(",?", numPlaceholders - 1));
+ String sqlEvents = new StringBuilder("UPSERT INTO ")
+ .append(EVENT_TABLE_NAME).append(" (").append(sqlColumns)
+ .append(") VALUES(").append(placeholders).append(")").toString();
+ try (PreparedStatement psEvent = conn.prepareStatement(sqlEvents)) {
+ int idx = setStringsForPrimaryKey(psEvent, context, entity, 1);
+ psEvent.setLong(idx++, event.getTimestamp());
+ psEvent.setString(idx++, event.getId());
+ setStringsForColumnFamily(psEvent, event.getInfo(), idx);
+ psEvent.execute();
+ }
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ void dropTable(String tableName) throws Exception{
+ try
+ (Statement stmt =
+ connectionCache.get(Thread.currentThread()).createStatement()) {
+ String sql = "DROP TABLE " + tableName;
+ stmt.executeUpdate(sql);
+ stmt.close();
+ } catch (SQLException se) {
+ LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
new file mode 100644
index 0000000..b00d3dc
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class TestPhoenixTimelineWriterImpl {
+ private PhoenixTimelineWriterImpl writer;
+
+ @Before
+ public void setup() throws Exception {
+ // TODO: launch a miniphoenix cluster, or else we're directly operating on
+ // the active Phoenix cluster
+ YarnConfiguration conf = new YarnConfiguration();
+ writer = createPhoenixWriter(conf);
+ }
+
+ @Ignore
+ @Test
+ public void testPhoenixWriterBasic() throws Exception {
+ // Set up a list of timeline entities and write them back to Phoenix
+ int numEntity = 12;
+ TimelineEntities te =
+ TestTimelineWriterImpl.getStandardTestTimelineEntities(numEntity);
+ writer.write("cluster_1", "user1", "testFlow", "version1", 1l, "app_test_1", te);
+ // Verify if we're storing all entities
+ String sql = "SELECT COUNT(entityid) FROM "
+ + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME;
+ verifySQLWithCount(sql, numEntity, "Number of entities should be ");
+ // Check config (half of all entities)
+ sql = "SELECT COUNT(c.config) FROM "
+ + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(c.config VARCHAR) ";
+ verifySQLWithCount(sql, (numEntity / 2),
+ "Number of entities with config should be ");
+ // Check info (half of all entities)
+ sql = "SELECT COUNT(i.info) FROM "
+ + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(i.info VARCHAR) ";
+ verifySQLWithCount(sql, (numEntity / 2),
+ "Number of entities with info should be ");
+ // Check config and info (a quarter of all entities)
+ sql = "SELECT COUNT(entityid) FROM "
+ + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME
+ + "(c.config VARCHAR, i.info VARCHAR) "
+ + "WHERE c.config IS NOT NULL AND i.info IS NOT NULL";
+ verifySQLWithCount(sql, (numEntity / 4),
+ "Number of entities with both config and info should be ");
+ // Check relatesToEntities and isRelatedToEntities
+ sql = "SELECT COUNT(entityid) FROM "
+ + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME
+ + "(rt.testType VARCHAR, ir.testType VARCHAR) "
+ + "WHERE rt.testType IS NOT NULL AND ir.testType IS NOT NULL";
+ verifySQLWithCount(sql, numEntity - 2,
+ "Number of entities with both relatesTo and isRelatedTo should be ");
+ // Check event
+ sql = "SELECT COUNT(entityid) FROM "
+ + PhoenixTimelineWriterImpl.EVENT_TABLE_NAME;
+ verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
+ // Check metrics
+ sql = "SELECT COUNT(entityid) FROM "
+ + PhoenixTimelineWriterImpl.METRIC_TABLE_NAME;
+ verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ // Note: it is assumed that we're working on a test only cluster, or else
+ // this cleanup process will drop the entity table.
+ writer.dropTable(PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME);
+ writer.dropTable(PhoenixTimelineWriterImpl.EVENT_TABLE_NAME);
+ writer.dropTable(PhoenixTimelineWriterImpl.METRIC_TABLE_NAME);
+ writer.serviceStop();
+ }
+
+ private static PhoenixTimelineWriterImpl createPhoenixWriter(
+ YarnConfiguration conf) throws Exception{
+ PhoenixTimelineWriterImpl myWriter = new PhoenixTimelineWriterImpl();
+ myWriter.serviceInit(conf);
+ return myWriter;
+ }
+
+ private void verifySQLWithCount(String sql, int targetCount, String message)
+ throws Exception{
+ try (
+ Statement stmt =
+ writer.connectionCache.get(Thread.currentThread()).createStatement();
+ ResultSet rs = stmt.executeQuery(sql)) {
+ assertTrue("Result set empty on statement " + sql, rs.next());
+ assertNotNull("Fail to execute query " + sql, rs);
+ assertEquals(message + " " + targetCount, targetCount, rs.getInt(1));
+ } catch (SQLException se) {
+ fail("SQL exception on query: " + sql
+ + " With exception message: " + se.getLocalizedMessage());
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
new file mode 100644
index 0000000..c802509
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+
+public class TestTimelineWriterImpl {
+ static TimelineEntities getStandardTestTimelineEntities(int listSize) {
+ TimelineEntities te = new TimelineEntities();
+ for (int i = 0; i < listSize; i++) {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "hello" + i;
+ String type = "testType";
+ entity.setId(id);
+ entity.setType(type);
+ entity.setCreatedTime(1425016501000L + i);
+ entity.setModifiedTime(1425016502000L + i);
+ if (i > 0) {
+ entity.addRelatesToEntity(type, "hello" + i);
+ entity.addRelatesToEntity(type, "hello" + (i - 1));
+ }
+ if (i < listSize - 1) {
+ entity.addIsRelatedToEntity(type, "hello" + i);
+ entity.addIsRelatedToEntity(type, "hello" + (i + 1));
+ }
+ int category = i % 4;
+ switch (category) {
+ case 0:
+ entity.addConfig("config", "config" + i);
+ // Fall through deliberately
+ case 1:
+ entity.addInfo("info", "info text");
+ // Fall through deliberately
+ case 2:
+ break;
+ case 3:
+ entity.addConfig("config", "config" + i);
+ TimelineEvent event = new TimelineEvent();
+ event.setId("test event");
+ event.setTimestamp(1425016501100L + i);
+ event.addInfo("test_info", "content for " + entity.getId());
+ event.addInfo("test_info1", "content1");
+ entity.addEvent(event);
+ TimelineMetric metric = new TimelineMetric();
+ metric.setId("HDFS_BYTES_READ");
+ metric.setSingleData(8000 + i);
+ entity.addMetric(metric);
+ break;
+ }
+ te.addEntity(entity);
+ }
+ return te;
+ }
+}