org.apache.hadoop
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
deleted file mode 100644
index 130cb6c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixOfflineAggregationWriterImpl.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
-import org.apache.hadoop.yarn.server.timelineservice.storage.common.OfflineAggregationInfo;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-import org.apache.phoenix.util.PropertiesUtil;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-/**
- * Offline aggregation Phoenix storage. This storage currently consists of two
- * aggregation tables, one for flow level aggregation and one for user level
- * aggregation.
- *
- * Example table record:
- *
- *
- * |---------------------------|
- * | Primary | Column Family|
- * | key | metrics |
- * |---------------------------|
- * | row_key | metricId1: |
- * | | metricValue1 |
- * | | @timestamp1 |
- * | | |
- * | | metriciD1: |
- * | | metricValue2 |
- * | | @timestamp2 |
- * | | |
- * | | metricId2: |
- * | | metricValue1 |
- * | | @timestamp2 |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * | | |
- * |---------------------------|
- *
- *
- * For the flow aggregation table, the primary key contains user, cluster, and
- * flow id. For user aggregation table,the primary key is user.
- *
- * Metrics column family stores all aggregated metrics for each record.
- */
-@Private
-@Unstable
-public class PhoenixOfflineAggregationWriterImpl
- extends OfflineAggregationWriter {
-
- private static final Log LOG
- = LogFactory.getLog(PhoenixOfflineAggregationWriterImpl.class);
- private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
- = "timeline_cf_placeholder";
-
- /** Default Phoenix JDBC driver name. */
- private static final String DRIVER_CLASS_NAME
- = "org.apache.phoenix.jdbc.PhoenixDriver";
-
- /** Default Phoenix timeline config column family. */
- private static final String METRIC_COLUMN_FAMILY = "m.";
- /** Default Phoenix timeline info column family. */
- private static final String INFO_COLUMN_FAMILY = "i.";
- /** Default separator for Phoenix storage. */
- private static final String AGGREGATION_STORAGE_SEPARATOR = ";";
-
- /** Connection string to the deployed Phoenix cluster. */
- private String connString = null;
- private Properties connProperties = new Properties();
-
- public PhoenixOfflineAggregationWriterImpl(Properties prop) {
- super(PhoenixOfflineAggregationWriterImpl.class.getName());
- connProperties = PropertiesUtil.deepCopy(prop);
- }
-
- public PhoenixOfflineAggregationWriterImpl() {
- super(PhoenixOfflineAggregationWriterImpl.class.getName());
- }
-
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- Class.forName(DRIVER_CLASS_NAME);
- // so check it here and only read in the config if it's not overridden.
- connString =
- conf.get(YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR,
- YarnConfiguration.PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT);
- super.init(conf);
- }
-
- @Override
- public TimelineWriteResponse writeAggregatedEntity(
- TimelineCollectorContext context, TimelineEntities entities,
- OfflineAggregationInfo info) throws IOException {
- TimelineWriteResponse response = new TimelineWriteResponse();
- String sql = "UPSERT INTO " + info.getTableName()
- + " (" + StringUtils.join(info.getPrimaryKeyList(), ",")
- + ", created_time, metric_names) "
- + "VALUES ("
- + StringUtils.repeat("?,", info.getPrimaryKeyList().length)
- + "?, ?)";
- if (LOG.isDebugEnabled()) {
- LOG.debug("TimelineEntity write SQL: " + sql);
- }
-
- try (Connection conn = getConnection();
- PreparedStatement ps = conn.prepareStatement(sql)) {
- for (TimelineEntity entity : entities.getEntities()) {
- HashMap formattedMetrics = new HashMap<>();
- if (entity.getMetrics() != null) {
- for (TimelineMetric m : entity.getMetrics()) {
- formattedMetrics.put(m.getId(), m);
- }
- }
- int idx = info.setStringsForPrimaryKey(ps, context, null, 1);
- ps.setLong(idx++, entity.getCreatedTime());
- ps.setString(idx++,
- StringUtils.join(formattedMetrics.keySet().toArray(),
- AGGREGATION_STORAGE_SEPARATOR));
- ps.execute();
-
- storeEntityVariableLengthFields(entity, formattedMetrics, context, conn,
- info);
-
- conn.commit();
- }
- } catch (SQLException se) {
- LOG.error("Failed to add entity to Phoenix " + se.getMessage());
- throw new IOException(se);
- } catch (Exception e) {
- LOG.error("Exception on getting connection: " + e.getMessage());
- throw new IOException(e);
- }
- return response;
- }
-
- /**
- * Create Phoenix tables for offline aggregation storage if the tables do not
- * exist.
- *
- * @throws IOException if any problem happens while creating Phoenix tables.
- */
- public void createPhoenixTables() throws IOException {
- // Create tables if necessary
- try (Connection conn = getConnection();
- Statement stmt = conn.createStatement()) {
- // Table schema defined as in YARN-3817.
- String sql = "CREATE TABLE IF NOT EXISTS "
- + OfflineAggregationInfo.FLOW_AGGREGATION_TABLE_NAME
- + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
- + "flow_name VARCHAR NOT NULL, "
- + "created_time UNSIGNED_LONG, "
- + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER
- + " VARBINARY, "
- + "metric_names VARCHAR, info_keys VARCHAR "
- + "CONSTRAINT pk PRIMARY KEY("
- + "user, cluster, flow_name))";
- stmt.executeUpdate(sql);
- sql = "CREATE TABLE IF NOT EXISTS "
- + OfflineAggregationInfo.USER_AGGREGATION_TABLE_NAME
- + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
- + "created_time UNSIGNED_LONG, "
- + METRIC_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER
- + " VARBINARY, "
- + "metric_names VARCHAR, info_keys VARCHAR "
- + "CONSTRAINT pk PRIMARY KEY(user, cluster))";
- stmt.executeUpdate(sql);
- conn.commit();
- } catch (SQLException se) {
- LOG.error("Failed in init data " + se.getLocalizedMessage());
- throw new IOException(se);
- }
- return;
- }
-
- // Utility functions
- @Private
- @VisibleForTesting
- Connection getConnection() throws IOException {
- Connection conn;
- try {
- conn = DriverManager.getConnection(connString, connProperties);
- conn.setAutoCommit(false);
- } catch (SQLException se) {
- LOG.error("Failed to connect to phoenix server! "
- + se.getLocalizedMessage());
- throw new IOException(se);
- }
- return conn;
- }
-
- // WARNING: This method will permanently drop a table!
- @Private
- @VisibleForTesting
- void dropTable(String tableName) throws Exception {
- try (Connection conn = getConnection();
- Statement stmt = conn.createStatement()) {
- String sql = "DROP TABLE " + tableName;
- stmt.executeUpdate(sql);
- } catch (SQLException se) {
- LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
- throw se;
- }
- }
-
- private static class DynamicColumns {
- static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY";
- static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR";
- private String columnFamilyPrefix;
- private String type;
- private Set columns;
-
- public DynamicColumns(String columnFamilyPrefix, String type,
- Set keyValues) {
- this.columnFamilyPrefix = columnFamilyPrefix;
- this.columns = keyValues;
- this.type = type;
- }
- }
-
- private static StringBuilder appendColumnsSQL(
- StringBuilder colNames, DynamicColumns cfInfo) {
- // Prepare the sql template by iterating through all keys
- for (K key : cfInfo.columns) {
- colNames.append(",").append(cfInfo.columnFamilyPrefix)
- .append(key.toString()).append(cfInfo.type);
- }
- return colNames;
- }
-
- private static int setValuesForColumnFamily(
- PreparedStatement ps, Map keyValues, int startPos,
- boolean converToBytes) throws SQLException {
- int idx = startPos;
- for (Map.Entry entry : keyValues.entrySet()) {
- V value = entry.getValue();
- if (value instanceof Collection) {
- ps.setString(idx++, StringUtils.join(
- (Collection) value, AGGREGATION_STORAGE_SEPARATOR));
- } else {
- if (converToBytes) {
- try {
- ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue()));
- } catch (IOException ie) {
- LOG.error("Exception in converting values into bytes "
- + ie.getMessage());
- throw new SQLException(ie);
- }
- } else {
- ps.setString(idx++, value.toString());
- }
- }
- }
- return idx;
- }
-
- private static int setBytesForColumnFamily(
- PreparedStatement ps, Map keyValues, int startPos)
- throws SQLException {
- return setValuesForColumnFamily(ps, keyValues, startPos, true);
- }
-
- private static int setStringsForColumnFamily(
- PreparedStatement ps, Map keyValues, int startPos)
- throws SQLException {
- return setValuesForColumnFamily(ps, keyValues, startPos, false);
- }
-
- private static void storeEntityVariableLengthFields(TimelineEntity entity,
- Map formattedMetrics,
- TimelineCollectorContext context, Connection conn,
- OfflineAggregationInfo aggregationInfo) throws SQLException {
- int numPlaceholders = 0;
- StringBuilder columnDefs = new StringBuilder(
- StringUtils.join(aggregationInfo.getPrimaryKeyList(), ","));
- if (formattedMetrics != null && formattedMetrics.size() > 0) {
- appendColumnsSQL(columnDefs, new DynamicColumns<>(
- METRIC_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
- formattedMetrics.keySet()));
- numPlaceholders += formattedMetrics.keySet().size();
- }
- if (numPlaceholders == 0) {
- return;
- }
- StringBuilder placeholders = new StringBuilder();
- placeholders.append(
- StringUtils.repeat("?,", aggregationInfo.getPrimaryKeyList().length));
- // numPlaceholders >= 1 now
- placeholders.append("?")
- .append(StringUtils.repeat(",?", numPlaceholders - 1));
- String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ")
- .append(aggregationInfo.getTableName()).append(" (").append(columnDefs)
- .append(") VALUES(").append(placeholders).append(")").toString();
- if (LOG.isDebugEnabled()) {
- LOG.debug("SQL statement for variable length fields: "
- + sqlVariableLengthFields);
- }
- // Use try with resource statement for the prepared statement
- try (PreparedStatement psVariableLengthFields =
- conn.prepareStatement(sqlVariableLengthFields)) {
- int idx = aggregationInfo.setStringsForPrimaryKey(
- psVariableLengthFields, context, null, 1);
- if (formattedMetrics != null && formattedMetrics.size() > 0) {
- idx = setBytesForColumnFamily(
- psVariableLengthFields, formattedMetrics, idx);
- }
- psVariableLengthFields.execute();
- }
- }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
index 33f5449..9369d6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java
@@ -59,7 +59,6 @@ private TimelineSchemaCreator() {
final static String NAME = TimelineSchemaCreator.class.getSimpleName();
private static final Log LOG = LogFactory.getLog(TimelineSchemaCreator.class);
- private static final String PHOENIX_OPTION_SHORT = "p";
private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s";
private static final String APP_TABLE_NAME_SHORT = "a";
private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f";
@@ -116,22 +115,6 @@ public static void main(String[] args) throws Exception {
exceptions.add(e);
}
- // Create Phoenix data schema if needed
- if (commandLine.hasOption(PHOENIX_OPTION_SHORT)) {
- Configuration phoenixConf = new Configuration();
- try {
- PhoenixOfflineAggregationWriterImpl phoenixWriter =
- new PhoenixOfflineAggregationWriterImpl();
- phoenixWriter.init(phoenixConf);
- phoenixWriter.start();
- phoenixWriter.createPhoenixTables();
- phoenixWriter.stop();
- LOG.info("Successfully created Phoenix offline aggregation schema. ");
- } catch (IOException e) {
- LOG.error("Error in creating phoenix tables: " + e.getMessage());
- exceptions.add(e);
- }
- }
if (exceptions.size() > 0) {
LOG.warn("Schema creation finished with the following exceptions");
for (Exception e : exceptions) {
@@ -181,11 +164,6 @@ private static CommandLine parseArgs(String[] args) throws ParseException {
// Options without an argument
// No need to set arg name since we do not need an argument here
- o = new Option(PHOENIX_OPTION_SHORT, "usePhoenix", false,
- "create Phoenix offline aggregation tables");
- o.setRequired(false);
- options.addOption(o);
-
o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable",
false, "skip existing Hbase tables and continue to create new tables");
o.setRequired(false);