diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index f974aee..2aad7bd 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -120,6 +120,19 @@
mockito-all
test
+
+
+ org.apache.phoenix
+ phoenix-core
+ 4.3.0
+
+
+
+ jline
+ jline
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 677feb1..87dd78f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -27,12 +27,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
/**
* Service that handles writes to the timeline service and writes them to the
@@ -46,7 +42,7 @@
public abstract class TimelineCollector extends CompositeService {
private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
- private TimelineWriter writer;
+ private TimelineCollectorManager manager;
public TimelineCollector(String name) {
super(name);
@@ -55,11 +51,6 @@ public TimelineCollector(String name) {
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
- writer = ReflectionUtils.newInstance(conf.getClass(
- YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
- FileSystemTimelineWriterImpl.class,
- TimelineWriter.class), conf);
- writer.init(conf);
}
@Override
@@ -70,11 +61,10 @@ protected void serviceStart() throws Exception {
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
- writer.stop();
}
- public TimelineWriter getWriter() {
- return writer;
+ void attachTo(TimelineCollectorManager m) {
+ this.manager = m;
}
/**
@@ -99,7 +89,7 @@ public TimelineWriteResponse putEntities(TimelineEntities entities,
}
TimelineCollectorContext context = getTimelineEntityContext();
- return writer.write(context.getClusterId(), context.getUserId(),
+ return manager.getWriter().write(context.getClusterId(), context.getUserId(),
context.getFlowId(), context.getFlowRunId(), context.getAppId(),
entities);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index 5f23c25..517f399 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -35,8 +35,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -46,6 +46,8 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -81,6 +83,8 @@
private InetSocketAddress nmCollectorServiceAddress;
+ private TimelineWriter writer;
+
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
static TimelineCollectorManager getInstance() {
@@ -99,7 +103,12 @@ public void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
-
+ writer = ReflectionUtils.newInstance(
+ conf.getClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+ FileSystemTimelineWriterImpl.class, TimelineWriter.class),
+ conf);
+ writer.init(conf);
+ super.serviceInit(conf);
}
@Override
@@ -114,6 +123,7 @@ protected void serviceStop() throws Exception {
if (timelineRestServer != null) {
timelineRestServer.stop();
}
+ writer.stop();
super.serviceStop();
}
@@ -138,6 +148,7 @@ public TimelineCollector putIfAbsent(ApplicationId appId,
// cleaned up when the parent shuts down
collector.init(getConfig());
collector.start();
+ collector.attachTo(this);
collectors.put(id, collector);
LOG.info("the collector for " + id + " was added");
collectorInTable = collector;
@@ -206,6 +217,13 @@ public boolean containsKey(String id) {
}
/**
+ * Returns the timeline writer of this manager
+ */
+ TimelineWriter getWriter() {
+ return writer;
+ }
+
+ /**
* Launch the REST web server for this collector manager
*/
private void startWebApp() {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
new file mode 100644
index 0000000..2bb1732
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+@Private
+@Unstable
+public class PhoenixTimelineWriterImpl extends AbstractService
+ implements TimelineWriter {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixTimelineWriterImpl.class);
+ /** Default Phoenix JDBC driver name */
+ static final String DRIVER_CLASS_NAME = "org.apache.phoenix.jdbc.PhoenixDriver";
+ /** Default Phoenix timeline entity table name */
+ static final String ENTITY_TABLE_NAME = "TIMELINE_ENTITY";
+
+ /** Connection string to the deployed Phoenix cluster */
+ static final String CONN_STRING = "jdbc:phoenix:localhost:2181:/hbase";
+
+ private Connection conn = null;
+
+ PhoenixTimelineWriterImpl() {
+ super((PhoenixTimelineWriterImpl.class.getName()));
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ initConnection();
+ super.init(conf);
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ try {
+ conn.close();
+ } catch (SQLException se) {
+ LOG.error("Failed to close the phoenix connection! "
+ + se.getLocalizedMessage());
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public TimelineWriteResponse write(String clusterId, String userId,
+ String flowId, String flowRunId, String appId,
+ TimelineEntities entities) throws IOException{
+ TimelineWriteResponse response = new TimelineWriteResponse();
+ try {
+ // Insert
+ String sql = "UPSERT INTO " + ENTITY_TABLE_NAME
+ + " (cluster, user, flow, run, appid, type, entityid, creationtime, "
+ + "modifiedtime, configs) "
+ + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ PreparedStatement ps = conn.prepareStatement(sql);
+ for (TimelineEntity entity : entities.getEntities()) {
+ ps.setString(1, clusterId);
+ ps.setString(2, userId);
+ ps.setString(3, flowId);
+ ps.setString(4, flowRunId);
+ ps.setString(5, appId);
+ ps.setString(6, entity.getType());
+ ps.setString(7, entity.getId());
+ ps.setLong(8, entity.getCreatedTime());
+ ps.setLong(9, entity.getModifiedTime());
+ ps.setString(10, entity.getConfigs().toString());
+ ps.addBatch();
+ }
+ ps.executeBatch();
+ ps.close();
+ conn.commit();
+ } catch (SQLException se) {
+ LOG.error("Failed to add entity to Phoenix " + se.getMessage());
+ }
+ return response;
+ }
+
+ /**
+ * Aggregates the entity information to the timeline store based on which
+ * track this entity is to be rolled up to The tracks along which aggregations
+ * are to be done are given by {@link TimelineAggregationTrack}
+ *
+ * Any errors occurring for individual write request objects will be reported
+ * in the response.
+ *
+ * @param data
+ * a {@link TimelineEntity} object
+ * a {@link TimelineAggregationTrack} enum value
+ * @return a {@link TimelineWriteResponse} object.
+ * @throws IOException
+ */
+ public TimelineWriteResponse aggregate(TimelineEntity data,
+ TimelineAggregationTrack track) throws IOException {
+ return null;
+
+ }
+
+ private void initConnection() {
+ try {
+ Class.forName(DRIVER_CLASS_NAME);
+ conn = DriverManager.getConnection(CONN_STRING);
+ // Initialize cluster for Phoenix storage
+ initializeData();
+ } catch (SQLException se) {
+ LOG.error("Failed to connect to phoenix server! "
+ + se.getLocalizedMessage());
+ } catch (ClassNotFoundException e) {
+ LOG.error("Class not found! " + e.getLocalizedMessage());
+ }
+ }
+
+ private void initializeData() {
+ try {
+ // Create tables if necessary
+ Statement stmt = conn.createStatement();
+ // Table schema defined as in YARN-3134
+ String sql = "CREATE TABLE IF NOT EXISTS " + ENTITY_TABLE_NAME
+ + "(cluster VARCHAR NOT NULL, user VARCHAR NOT NULL, "
+ + "flow VARCHAR NOT NULL, run VARCHAR NOT NULL, "
+ + "appid VARCHAR NOT NULL, type VARCHAR NOT NULL, "
+ + "entityid VARCHAR NOT NULL, "
+ + "creationtime UNSIGNED_LONG, modifiedtime UNSIGNED_LONG, "
+ + "parent VARCHAR, queue VARCHAR, configs VARCHAR "
+ + "CONSTRAINT pk PRIMARY KEY(cluster, user, flow, run, appid, type, "
+ + "entityid))";
+ stmt.executeUpdate(sql);
+ stmt.close();
+ conn.commit();
+ } catch (SQLException se) {
+ LOG.error("Failed in init data " + se.getLocalizedMessage());
+ }
+ return;
+ }
+
+ @Private
+ @VisibleForTesting
+ ResultSet executeQuery(String sql) {
+ ResultSet rs = null;
+ try {
+ Statement stmt = conn.createStatement();
+ rs = stmt.executeQuery(sql);
+ } catch (SQLException se) {
+ LOG.error("SQL exception! " + se.getLocalizedMessage());
+ }
+ return rs;
+ }
+
+ @Private
+ @VisibleForTesting
+ void dropEntityTable() {
+ try {
+ // Drop table
+ Statement stmt = conn.createStatement();
+ String sql = "DROP TABLE " + ENTITY_TABLE_NAME;
+ stmt.executeUpdate(sql);
+ stmt.close();
+ } catch (SQLException se) {
+ LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
new file mode 100644
index 0000000..3f76488
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.ResultSet;
+
+public class TestPhoenixTimelineWriterImpl {
+ private PhoenixTimelineWriterImpl writer;
+
+ @Before
+ public void setup() throws Exception {
+ // TODO: launch a miniphoenix cluster, or else we're directly operating on
+ // the active Phoenix cluster
+ YarnConfiguration conf = new YarnConfiguration();
+ writer = createPhoenixWriter(conf);
+ }
+
+ @Test
+ public void testPhoenixReaderBasic() throws Exception {
+ // Set up a list of timeline entities and write them back to Phoenix
+ int numEntity = 10;
+ TimelineEntities te =
+ TestTimelineWriterImpl.getStandardTestTimelineEntities(numEntity);
+ writer.write("cluster_1", "user1", "testFlow", "flowrun1", "app_test_1", te);
+ // Verify if we're doing the right thing
+ String sql = "SELECT COUNT(entityid) FROM "
+ + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME;
+ ResultSet rs = writer.executeQuery(sql);
+ assertTrue("Result set empty!", rs.next());
+ assertNotNull("Fail to execute query " + sql, rs);
+ assertEquals("Number of entities should be " + numEntity,
+ numEntity, rs.getInt(1));
+ rs.close();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ // Note: it is assumed that we're working on a test only cluster, or else
+ // this cleanup process will drop the entity table. Remove this line if we're
+ // using a miniphoenix cluster
+ writer.dropEntityTable();
+ writer.serviceStop();
+ }
+
+ private static PhoenixTimelineWriterImpl createPhoenixWriter(
+ YarnConfiguration conf) throws Exception{
+ PhoenixTimelineWriterImpl myWriter = new PhoenixTimelineWriterImpl();
+ myWriter.serviceInit(conf);
+ return myWriter;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
new file mode 100644
index 0000000..7762b2a
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+
+public class TestTimelineWriterImpl {
+ static TimelineEntities getStandardTestTimelineEntities(int listSize) {
+ TimelineEntities te = new TimelineEntities();
+ for (int i = 0; i < listSize; i++) {
+ TimelineEntity entity = new TimelineEntity();
+ String id = "hello" + i;
+ String type = "world";
+ entity.setId(id);
+ entity.setType(type);
+ entity.setCreatedTime(1425016501000L + i);
+ entity.setModifiedTime(1425016502000L + i);
+ te.addEntity(entity);
+ }
+ return te;
+ }
+}