diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml index f974aee..2aad7bd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml @@ -120,6 +120,19 @@ mockito-all test + + + org.apache.phoenix + phoenix-core + 4.3.0 + + + + jline + jline + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java index 677feb1..87dd78f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java @@ -27,12 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; /** * Service that handles writes to the timeline service and writes them to the @@ -46,7 +42,7 @@ public abstract class TimelineCollector extends CompositeService { private static final Log LOG = LogFactory.getLog(TimelineCollector.class); - private TimelineWriter writer; + private TimelineCollectorManager manager; public TimelineCollector(String name) { super(name); @@ -55,11 +51,6 @@ public TimelineCollector(String name) { @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); - writer = ReflectionUtils.newInstance(conf.getClass( - YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, - FileSystemTimelineWriterImpl.class, - TimelineWriter.class), conf); - writer.init(conf); } @Override @@ -70,11 +61,10 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { super.serviceStop(); - writer.stop(); } - public TimelineWriter getWriter() { - return writer; + void attachTo(TimelineCollectorManager m) { + this.manager = m; } /** @@ -99,7 +89,7 @@ public TimelineWriteResponse putEntities(TimelineEntities entities, } TimelineCollectorContext context = getTimelineEntityContext(); - return writer.write(context.getClusterId(), context.getUserId(), + return manager.getWriter().write(context.getClusterId(), context.getUserId(), context.getFlowId(), context.getFlowRunId(), context.getAppId(), entities); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java index 5f23c25..517f399 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java @@ -35,8 +35,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.http.lib.StaticUserWebFilter; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -46,6 +46,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.apache.hadoop.yarn.webapp.util.WebAppUtils; @@ -81,6 +83,8 @@ private InetSocketAddress nmCollectorServiceAddress; + private TimelineWriter writer; + static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; static TimelineCollectorManager getInstance() { @@ -99,7 +103,12 @@ public void serviceInit(Configuration conf) throws Exception { YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS, YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS, YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT); - + writer = ReflectionUtils.newInstance( + conf.getClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, + FileSystemTimelineWriterImpl.class, TimelineWriter.class), + conf); + writer.init(conf); + super.serviceInit(conf); } @Override @@ -114,6 +123,7 @@ protected void serviceStop() throws Exception { if (timelineRestServer != null) { timelineRestServer.stop(); } + writer.stop(); super.serviceStop(); } @@ -138,6 +148,7 @@ public TimelineCollector putIfAbsent(ApplicationId appId, // cleaned up when the parent shuts down collector.init(getConfig()); collector.start(); + collector.attachTo(this); collectors.put(id, collector); LOG.info("the collector for " + id + " was added"); collectorInTable = collector; @@ -206,6 +217,13 @@ public boolean containsKey(String id) { } /** + * Returns the timeline writer of this manager + */ + TimelineWriter getWriter() { + return writer; + } + + /** * Launch the REST web server for this collector manager */ private void startWebApp() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java new file mode 100644 index 0000000..2bb1732 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +@Private +@Unstable +public class PhoenixTimelineWriterImpl extends AbstractService + implements TimelineWriter { + + private static final Log LOG = LogFactory.getLog(PhoenixTimelineWriterImpl.class); + /** Default Phoenix JDBC driver name */ + static final String DRIVER_CLASS_NAME = "org.apache.phoenix.jdbc.PhoenixDriver"; + /** Default Phoenix timeline entity table name */ + static final String ENTITY_TABLE_NAME = "TIMELINE_ENTITY"; + + /** Connection string to the deployed Phoenix cluster */ + static final String CONN_STRING = "jdbc:phoenix:localhost:2181:/hbase"; + + private Connection conn = null; + + PhoenixTimelineWriterImpl() { + super((PhoenixTimelineWriterImpl.class.getName())); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + initConnection(); + super.init(conf); + } + + @Override + protected void serviceStop() throws Exception { + try { + conn.close(); + } catch (SQLException se) { + LOG.error("Failed to close the phoenix connection! " + + se.getLocalizedMessage()); + } + super.serviceStop(); + } + + @Override + public TimelineWriteResponse write(String clusterId, String userId, + String flowId, String flowRunId, String appId, + TimelineEntities entities) throws IOException{ + TimelineWriteResponse response = new TimelineWriteResponse(); + try { + // Insert + String sql = "UPSERT INTO " + ENTITY_TABLE_NAME + + " (cluster, user, flow, run, appid, type, entityid, creationtime, " + + "modifiedtime, configs) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + PreparedStatement ps = conn.prepareStatement(sql); + for (TimelineEntity entity : entities.getEntities()) { + ps.setString(1, clusterId); + ps.setString(2, userId); + ps.setString(3, flowId); + ps.setString(4, flowRunId); + ps.setString(5, appId); + ps.setString(6, entity.getType()); + ps.setString(7, entity.getId()); + ps.setLong(8, entity.getCreatedTime()); + ps.setLong(9, entity.getModifiedTime()); + ps.setString(10, entity.getConfigs().toString()); + ps.addBatch(); + } + ps.executeBatch(); + ps.close(); + conn.commit(); + } catch (SQLException se) { + LOG.error("Failed to add entity to Phoenix " + se.getMessage()); + } + return response; + } + + /** + * Aggregates the entity information to the timeline store based on which + * track this entity is to be rolled up to The tracks along which aggregations + * are to be done are given by {@link TimelineAggregationTrack} + * + * Any errors occurring for individual write request objects will be reported + * in the response. + * + * @param data + * a {@link TimelineEntity} object + * a {@link TimelineAggregationTrack} enum value + * @return a {@link TimelineWriteResponse} object. + * @throws IOException + */ + public TimelineWriteResponse aggregate(TimelineEntity data, + TimelineAggregationTrack track) throws IOException { + return null; + + } + + private void initConnection() { + try { + Class.forName(DRIVER_CLASS_NAME); + conn = DriverManager.getConnection(CONN_STRING); + // Initialize cluster for Phoenix storage + initializeData(); + } catch (SQLException se) { + LOG.error("Failed to connect to phoenix server! " + + se.getLocalizedMessage()); + } catch (ClassNotFoundException e) { + LOG.error("Class not found! " + e.getLocalizedMessage()); + } + } + + private void initializeData() { + try { + // Create tables if necessary + Statement stmt = conn.createStatement(); + // Table schema defined as in YARN-3134 + String sql = "CREATE TABLE IF NOT EXISTS " + ENTITY_TABLE_NAME + + "(cluster VARCHAR NOT NULL, user VARCHAR NOT NULL, " + + "flow VARCHAR NOT NULL, run VARCHAR NOT NULL, " + + "appid VARCHAR NOT NULL, type VARCHAR NOT NULL, " + + "entityid VARCHAR NOT NULL, " + + "creationtime UNSIGNED_LONG, modifiedtime UNSIGNED_LONG, " + + "parent VARCHAR, queue VARCHAR, configs VARCHAR " + + "CONSTRAINT pk PRIMARY KEY(cluster, user, flow, run, appid, type, " + + "entityid))"; + stmt.executeUpdate(sql); + stmt.close(); + conn.commit(); + } catch (SQLException se) { + LOG.error("Failed in init data " + se.getLocalizedMessage()); + } + return; + } + + @Private + @VisibleForTesting + ResultSet executeQuery(String sql) { + ResultSet rs = null; + try { + Statement stmt = conn.createStatement(); + rs = stmt.executeQuery(sql); + } catch (SQLException se) { + LOG.error("SQL exception! " + se.getLocalizedMessage()); + } + return rs; + } + + @Private + @VisibleForTesting + void dropEntityTable() { + try { + // Drop table + Statement stmt = conn.createStatement(); + String sql = "DROP TABLE " + ENTITY_TABLE_NAME; + stmt.executeUpdate(sql); + stmt.close(); + } catch (SQLException se) { + LOG.error("Failed in dropping entity table " + se.getLocalizedMessage()); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java new file mode 100644 index 0000000..3f76488 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.sql.ResultSet; + +public class TestPhoenixTimelineWriterImpl { + private PhoenixTimelineWriterImpl writer; + + @Before + public void setup() throws Exception { + // TODO: launch a miniphoenix cluster, or else we're directly operating on + // the active Phoenix cluster + YarnConfiguration conf = new YarnConfiguration(); + writer = createPhoenixWriter(conf); + } + + @Test + public void testPhoenixReaderBasic() throws Exception { + // Set up a list of timeline entities and write them back to Phoenix + int numEntity = 10; + TimelineEntities te = + TestTimelineWriterImpl.getStandardTestTimelineEntities(numEntity); + writer.write("cluster_1", "user1", "testFlow", "flowrun1", "app_test_1", te); + // Verify if we're doing the right thing + String sql = "SELECT COUNT(entityid) FROM " + + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME; + ResultSet rs = writer.executeQuery(sql); + assertTrue("Result set empty!", rs.next()); + assertNotNull("Fail to execute query " + sql, rs); + assertEquals("Number of entities should be " + numEntity, + numEntity, rs.getInt(1)); + rs.close(); + } + + @After + public void cleanup() throws Exception { + // Note: it is assumed that we're working on a test only cluster, or else + // this cleanup process will drop the entity table. Remove this line if we're + // using a miniphoenix cluster + writer.dropEntityTable(); + writer.serviceStop(); + } + + private static PhoenixTimelineWriterImpl createPhoenixWriter( + YarnConfiguration conf) throws Exception{ + PhoenixTimelineWriterImpl myWriter = new PhoenixTimelineWriterImpl(); + myWriter.serviceInit(conf); + return myWriter; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java new file mode 100644 index 0000000..7762b2a --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; + +public class TestTimelineWriterImpl { + static TimelineEntities getStandardTestTimelineEntities(int listSize) { + TimelineEntities te = new TimelineEntities(); + for (int i = 0; i < listSize; i++) { + TimelineEntity entity = new TimelineEntity(); + String id = "hello" + i; + String type = "world"; + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(1425016501000L + i); + entity.setModifiedTime(1425016502000L + i); + te.addEntity(entity); + } + return te; + } +}