diff --git hadoop-project/pom.xml hadoop-project/pom.xml
index 8160438..2c53318 100644
--- hadoop-project/pom.xml
+++ hadoop-project/pom.xml
@@ -45,6 +45,9 @@
2.11.0
+ 1.0.1
+ 4.5.0-SNAPSHOT
+
${project.version}
1.0.13
@@ -982,6 +985,55 @@
+
+ org.apache.hbase
+ hbase-client
+ ${hbase.version}
+
+
+ org.apache.phoenix
+ phoenix-core
+ ${phoenix.version}
+
+
+
+ jline
+ jline
+
+
+
+
+ org.apache.phoenix
+ phoenix-core
+ test-jar
+ ${phoenix.version}
+ test
+
+
+ org.apache.hbase
+ hbase-it
+ ${hbase.version}
+ test
+ tests
+
+
+ org.apache.hbase
+ hbase-testing-util
+ ${hbase.version}
+ test
+ true
+
+
+ org.jruby
+ jruby-complete
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index f62230f..1e914de 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -122,21 +122,44 @@
- org.apache.phoenix
- phoenix-core
- 4.3.0
-
-
-
- jline
- jline
-
-
+ org.apache.phoenix
+ phoenix-core
- com.google.guava
- guava
+ org.apache.hbase
+ hbase-client
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test-jar
+ test
+
+ org.apache.phoenix
+ phoenix-core
+ test-jar
+ test
+
+
+ org.apache.hbase
+ hbase-it
+ test
+ tests
+
+
+ org.apache.hbase
+ hbase-testing-util
+ test
+ true
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
index af8a233..5b4442c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
@@ -43,6 +44,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
@Private
@@ -50,6 +52,13 @@
public class PhoenixTimelineWriterImpl extends AbstractService
implements TimelineWriter {
+ public static final String TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR
+ = YarnConfiguration.TIMELINE_SERVICE_PREFIX
+ + "writer.phoenix.connectionString";
+
+ public static final String TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT
+ = "jdbc:phoenix:localhost:2181:/hbase";
+
private static final Log LOG
= LogFactory.getLog(PhoenixTimelineWriterImpl.class);
private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
@@ -90,7 +99,10 @@
private static final String PHOENIX_STORAGE_SEPARATOR = ";";
/** Connection string to the deployed Phoenix cluster */
- static final String CONN_STRING = "jdbc:phoenix:localhost:2181:/hbase";
+ @VisibleForTesting
+ String connString = null;
+ @VisibleForTesting
+ Properties connProperties = new Properties();
PhoenixTimelineWriterImpl() {
super((PhoenixTimelineWriterImpl.class.getName()));
@@ -98,6 +110,10 @@
@Override
protected void serviceInit(Configuration conf) throws Exception {
+ // so check it here and only read in the config if it's not overridden.
+ connString =
+ conf.get(TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR,
+ TIMELINE_SERVICE_PHEONIX_STORAGE_CONN_STR_DEFAULT);
createTables();
super.init(conf);
}
@@ -174,11 +190,11 @@ public TimelineWriteResponse aggregate(TimelineEntity data,
// Utility functions
@Private
@VisibleForTesting
- static Connection getConnection() throws IOException {
+ Connection getConnection() throws IOException {
Connection conn;
try {
Class.forName(DRIVER_CLASS_NAME);
- conn = DriverManager.getConnection(CONN_STRING);
+ conn = DriverManager.getConnection(connString, connProperties);
conn.setAutoCommit(false);
} catch (SQLException se) {
LOG.error("Failed to connect to phoenix server! "
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
index a55893e..dece83d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
@@ -23,30 +23,37 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
-public class TestPhoenixTimelineWriterImpl {
- private PhoenixTimelineWriterImpl writer;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
- @Before
- public void setup() throws Exception {
- // TODO: launch a miniphoenix cluster, or else we're directly operating on
- // the active Phoenix cluster
+public class TestPhoenixTimelineWriterImpl extends BaseTest {
+ private static PhoenixTimelineWriterImpl writer;
+ private static final int BATCH_SIZE = 3;
+
+ @BeforeClass
+ public static void setup() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
- writer = createPhoenixWriter(conf);
+ writer = setupPhoenixClusterAndWriterForTest(conf);
}
- @Ignore
- @Test
+ @Test(timeout = 90000)
public void testPhoenixWriterBasic() throws Exception {
// Set up a list of timeline entities and write them back to Phoenix
int numEntity = 12;
@@ -91,28 +98,48 @@ public void testPhoenixWriterBasic() throws Exception {
verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
}
- @After
- public void cleanup() throws Exception {
- // Note: it is assumed that we're working on a test only cluster, or else
- // this cleanup process will drop the entity table.
+ @AfterClass
+ public static void cleanup() throws Exception {
writer.dropTable(PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME);
writer.dropTable(PhoenixTimelineWriterImpl.EVENT_TABLE_NAME);
writer.dropTable(PhoenixTimelineWriterImpl.METRIC_TABLE_NAME);
writer.serviceStop();
+ tearDownMiniCluster();
}
- private static PhoenixTimelineWriterImpl createPhoenixWriter(
+ private static PhoenixTimelineWriterImpl setupPhoenixClusterAndWriterForTest(
YarnConfiguration conf) throws Exception{
+ Map props = new HashMap<>();
+ // Must update config before starting server
+ props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ Boolean.FALSE.toString());
+ props.put("java.security.krb5.realm", "");
+ props.put("java.security.krb5.kdc", "");
+ props.put(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER,
+ Boolean.FALSE.toString());
+ props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
+ props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
+ // Make a small batch size to test multiple calls to reserve sequences
+ props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
+ Long.toString(BATCH_SIZE));
+ // Must update config before starting server
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+
PhoenixTimelineWriterImpl myWriter = new PhoenixTimelineWriterImpl();
+ // Change connection settings for test
+ conf.set(
+ PhoenixTimelineWriterImpl.TIMELINE_SERVICE_PHOENIX_STORAGE_CONN_STR,
+ getUrl());
+ myWriter.connProperties = PropertiesUtil.deepCopy(TEST_PROPERTIES);
myWriter.serviceInit(conf);
return myWriter;
}
private void verifySQLWithCount(String sql, int targetCount, String message)
- throws Exception{
+ throws Exception {
try (
Statement stmt =
- PhoenixTimelineWriterImpl.getConnection().createStatement();
+ writer.getConnection().createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
assertTrue("Result set empty on statement " + sql, rs.next());
assertNotNull("Fail to execute query " + sql, rs);