diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3bb73f5..6c35e9e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1974,6 +1974,18 @@ public static boolean isAclEnabled(Configuration conf) { public static final String TIMELINE_SERVICE_READER_CLASS = TIMELINE_SERVICE_PREFIX + "reader.class"; + /** + * default schema prefix for hbase tables + */ + public static final String DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX = + "dev."; + + /** + * config param name to override schema prefix + */ + public static final String TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME = + TIMELINE_SERVICE_PREFIX + "schema.prefix"; + /** The setting that controls how often the timeline collector flushes the * timeline writer. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java new file mode 100644 index 0000000..f9a4a7e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; + +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +public class TestHBaseTimelineStorageSchema { + private static HBaseTestingUtility util; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + util.startMiniCluster(); + } + + private static void createSchema(Configuration conf) throws IOException { + TimelineSchemaCreator.createAllTables(conf, false); + } + + @Test + public void createWithDefaultPrefix() throws IOException { + Configuration hbaseConf = util.getConfiguration(); + createSchema(hbaseConf); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + + TableName entityTableName = BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(entityTableName)); + assertTrue(entityTableName.getNameAsString().startsWith( + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX)); + Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME)); + assertNotNull(entityTable); + + TableName flowRunTableName = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(flowRunTableName)); + assertTrue(flowRunTableName.getNameAsString().startsWith( + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX)); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + assertNotNull(flowRunTable); + } + + @Test + public void createWithSetPrefix() throws IOException { + Configuration hbaseConf = util.getConfiguration(); + String prefix = "unit-test."; + hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME, + prefix); + createSchema(hbaseConf); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + + TableName entityTableName = BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(entityTableName)); + assertTrue(entityTableName.getNameAsString().startsWith(prefix)); + Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME)); + assertNotNull(entityTable); + + TableName flowRunTableName = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(flowRunTableName)); + assertTrue(flowRunTableName.getNameAsString().startsWith(prefix)); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + assertNotNull(flowRunTable); + + // create another set with a diff prefix + hbaseConf + .unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME); + prefix = "yet-another-unit-test."; + hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME, + prefix); + createSchema(hbaseConf); + entityTableName = BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(entityTableName)); + assertTrue(entityTableName.getNameAsString().startsWith(prefix)); + entityTable = conn.getTable(BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME)); + assertNotNull(entityTable); + + flowRunTableName = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(flowRunTableName)); + assertTrue(flowRunTableName.getNameAsString().startsWith(prefix)); + flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + assertNotNull(flowRunTable); + hbaseConf + .unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 1906574..552f5d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.junit.AfterClass; @@ -58,7 +59,7 @@ import org.junit.Test; /** - * Tests the FlowRun and FlowActivity Tables. + * Tests the FlowRun and FlowActivity Tables */ public class TestHBaseStorageFlowActivity { @@ -114,7 +115,7 @@ public void testWriteFlowRunMinMax() throws Exception { String appName = "application_100000000000_1111"; long minStartTs = 1424995200300L; long greaterStartTs = 1424995200300L + 864000L; - long endTs = 1424995200300L + 86000000L; + long endTs = 1424995200300L + 86000000L;; TimelineEntity entityMinStartTime = TestFlowDataGenerator .getEntityMinStartTime(minStartTs); @@ -155,8 +156,9 @@ public void testWriteFlowRunMinMax() throws Exception { Connection conn = ConnectionFactory.createConnection(c1); // check in flow activity table - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable( + BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); byte[] startRow = new FlowActivityRowKey(cluster, minStartTs, user, flow).getRowKey(); Get g = new Get(startRow); @@ -209,7 +211,7 @@ public void testWriteFlowRunMinMax() throws Exception { /** * Write 1 application entity and checks the record for today in the flow - * activity table. + * activity table */ @Test public void testWriteFlowActivityOneFlow() throws Exception { @@ -286,8 +288,9 @@ private void checkFlowActivityTable(String cluster, String user, String flow, .getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable( + BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; for (Result result : scanner) { @@ -313,10 +316,10 @@ private void checkFlowActivityTable(String cluster, String user, String flow, /** * Writes 3 applications each with a different run id and version for the same - * {cluster, user, flow}. + * {cluster, user, flow} * * They should be getting inserted into one record in the flow activity table - * with 3 columns, one per run id. + * with 3 columns, one per run id */ @Test public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException { @@ -425,12 +428,12 @@ private void checkFlowActivityTableSeveralRuns(String cluster, String user, s.setStartRow(startRow); String clusterStop = cluster + "1"; byte[] stopRow = - new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow) - .getRowKey(); + new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable( + BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; for (Result result : scanner) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 74b9e50..c32c6a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -61,15 +61,17 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; /** - * Tests the FlowRun and FlowActivity Tables. + * Tests the FlowRun and FlowActivity Tables */ public class TestHBaseStorageFlowRun { @@ -94,8 +96,8 @@ private static void createSchema() throws IOException { @Test public void checkCoProcessorOff() throws IOException, InterruptedException { Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + TableName table = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); Connection conn = null; conn = ConnectionFactory.createConnection(hbaseConf); Admin admin = conn.getAdmin(); @@ -109,14 +111,14 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + assertTrue(FlowRunTable.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } } - table = TableName.valueOf(hbaseConf.get( + table = BaseTable.getTableName(hbaseConf, FlowActivityTable.TABLE_NAME_CONF_NAME, - FlowActivityTable.DEFAULT_TABLE_NAME)); + FlowActivityTable.DEFAULT_TABLE_NAME); if (admin.tableExists(table)) { // check the regions. // check in flow activity table @@ -124,14 +126,13 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } } - table = TableName.valueOf(hbaseConf.get( - EntityTable.TABLE_NAME_CONF_NAME, - EntityTable.DEFAULT_TABLE_NAME)); + table = BaseTable.getTableName(hbaseConf, EntityTable.TABLE_NAME_CONF_NAME, + EntityTable.DEFAULT_TABLE_NAME); if (admin.tableExists(table)) { // check the regions. // check in entity run table @@ -139,7 +140,7 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } } @@ -220,8 +221,8 @@ public void testWriteFlowRunMinMax() throws Exception { Connection conn = ConnectionFactory.createConnection(c1); // check in flow run table - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); @@ -356,24 +357,22 @@ public void testWriteFlowRunMetricsOneFlow() throws Exception { /* * checks the batch limits on a scan */ - void checkFlowRunTableBatchLimit(String cluster, String user, + void checkFlowRunTableBatchLimit(String cluster, String user, String flow, long runid, Configuration c1) throws IOException { Scan s = new Scan(); s.addFamily(FlowRunColumnFamily.INFO.getBytes()); - byte[] startRow = - new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); s.setStartRow(startRow); // set a batch limit int batchLimit = 2; s.setBatch(batchLimit); String clusterStop = cluster + "1"; - byte[] stopRow = - new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); + byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn - .getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int loopCount = 0; @@ -517,8 +516,8 @@ private void checkFlowRunTable(String cluster, String user, String flow, new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; @@ -782,8 +781,8 @@ private void checkMinMaxFlush(Configuration c1, long minTS, long startTs, boolean checkMax) throws IOException { Connection conn = ConnectionFactory.createConnection(c1); // check in flow run table - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 3094088..9bf8677 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; @@ -63,14 +64,14 @@ import org.junit.Test; /** - * Tests the FlowRun and FlowActivity Tables. + * Tests the FlowRun and FlowActivity Tables */ public class TestHBaseStorageFlowRunCompaction { private static HBaseTestingUtility util; - private static final String METRIC_1 = "MAP_SLOT_MILLIS"; - private static final String METRIC_2 = "HDFS_BYTES_READ"; + private static final String metric1 = "MAP_SLOT_MILLIS"; + private static final String metric2 = "HDFS_BYTES_READ"; private final byte[] aRowKey = Bytes.toBytes("a"); private final byte[] aFamily = Bytes.toBytes("family"); @@ -89,8 +90,8 @@ private static void createSchema() throws IOException { TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); } - /** Writes non numeric data into flow run table - * reads it back. + /** writes non numeric data into flow run table + * reads it back * * @throws Exception */ @@ -106,11 +107,10 @@ public void testWriteNonNumericData() throws Exception { p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, valueBytes); Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); Connection conn = null; conn = ConnectionFactory.createConnection(hbaseConf); - Table flowRunTable = conn.getTable(table); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); flowRunTable.put(p); Get g = new Get(rowKeyBytes); @@ -156,11 +156,10 @@ public void testWriteScanBatchLimit() throws Exception { value4Bytes); Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); Connection conn = null; conn = ConnectionFactory.createConnection(hbaseConf); - Table flowRunTable = conn.getTable(table); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); flowRunTable.put(p); String rowKey2 = "nonNumericRowKey2"; @@ -262,7 +261,7 @@ public void testWriteScanBatchLimit() throws Exception { .getFamilyMap(FlowRunColumnFamily.INFO.getBytes()); // we expect all back in one next call assertEquals(4, values.size()); - System.out.println(" values size " + values.size() + " " + batchLimit); + System.out.println(" values size " + values.size() + " " + batchLimit ); rowCount++; } // should get back 1 row with each invocation @@ -321,16 +320,16 @@ public void testWriteFlowRunCompaction() throws Exception { } // check in flow run table - HRegionServer server = util.getRSForFirstRegionInTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); - List regions = server.getOnlineRegions(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); - assertTrue("Didn't find any regions for primary table!", - regions.size() > 0); + HRegionServer server = util.getRSForFirstRegionInTable( + BaseTable.getTableName(c1, FlowRunTable.TABLE_NAME_CONF_NAME, + FlowRunTable.DEFAULT_TABLE_NAME)); + List regions = server.getOnlineRegions(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + assertTrue("Didn't find any regions for primary table!", regions.size() > 0); // flush and compact all the regions of the primary table for (Region region : regions) { - region.flush(true); - region.compact(true); + region.flush(true); + region.compact(true); } // check flow run for one flow many apps @@ -349,8 +348,8 @@ private void checkFlowRunTable(String cluster, String user, String flow, new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; @@ -364,13 +363,13 @@ private void checkFlowRunTable(String cluster, String user, String flow, rowCount++; // check metric1 byte[] q = ColumnHelper.getColumnQualifier( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_1); + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric1); assertTrue(values.containsKey(q)); assertEquals(141, Bytes.toLong(values.get(q))); // check metric2 q = ColumnHelper.getColumnQualifier( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), METRIC_2); + FlowRunColumnPrefix.METRIC.getColumnPrefixBytes(), metric2); assertTrue(values.containsKey(q)); assertEquals(57, Bytes.toLong(values.get(q))); } @@ -386,7 +385,7 @@ private FlowScanner getFlowScannerForTestingCompaction() { // okay to pass in nulls for the constructor arguments // because all we want to do is invoke the process summation FlowScanner fs = new FlowScanner(null, null, - (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION + (request.isMajor() == true ? FlowScannerOperation.MAJOR_COMPACTION : FlowScannerOperation.MINOR_COMPACTION)); assertNotNull(fs); return fs; @@ -405,7 +404,7 @@ public void checkProcessSummationMoreCellsSumFinal2() long currentTimestamp = System.currentTimeMillis(); long cell1Ts = 1200120L; long cell2Ts = TimestampGenerator.getSupplementedTimestamp( - System.currentTimeMillis(), "application_123746661110_11202"); + System.currentTimeMillis(),"application_123746661110_11202"); long cell3Ts = 1277719L; long cell4Ts = currentTimestamp - 10; @@ -572,8 +571,7 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { // of type SUM and SUM_FINAL // NOT cells of SUM_FINAL will expire @Test - public void checkProcessSummationMoreCellsSumFinalVariedTags() - throws IOException { + public void checkProcessSummationMoreCellsSumFinalVariedTags() throws IOException { FlowScanner fs = getFlowScannerForTestingCompaction(); int countFinal = 20100; int countNotFinal = 1000; @@ -587,8 +585,7 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() long cellTsFinalStart = 10001120L; long cellTsFinal = cellTsFinalStart; - long cellTsFinalStartNotExpire = - TimestampGenerator.getSupplementedTimestamp( + long cellTsFinalStartNotExpire = TimestampGenerator.getSupplementedTimestamp( System.currentTimeMillis(), "application_10266666661166_118821"); long cellTsFinalNotExpire = cellTsFinalStartNotExpire; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java index 8581aa4..e109f6c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.yarn.conf.YarnConfiguration; /** * Implements behavior common to tables used in the timeline service storage. It @@ -114,16 +115,41 @@ public Result getResult(Configuration hbaseConf, Connection conn, Get get) } /** + * Get the table name for the input table. + * + * @param hbaseConf HBase configuration from which table name will be fetched. + * @return A {@link TableName} object. + */ + public static TableName getTableName(Configuration conf, String tableName) { + String tableSchemaPrefix = conf.get( + YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX); + return TableName.valueOf(tableSchemaPrefix + tableName); + } + + /** * Get the table name for this table. * * @param hbaseConf HBase configuration from which table name will be fetched. * @return A {@link TableName} object. */ - public TableName getTableName(Configuration hbaseConf) { - TableName table = - TableName.valueOf(hbaseConf.get(tableNameConfName, defaultTableName)); - return table; + public TableName getTableName(Configuration conf) { + String tableName = conf.get(tableNameConfName, defaultTableName); + return getTableName(conf, tableName); + } + /** + * Get the table name based on the input config parameters. + * + * @param hbaseConf HBase configuration from which table name will be fetched. + * @param tableNameInConf the table name parameter in conf. + * @param defaultTable the default table name. + * @return A {@link TableName} object. + */ + public static TableName getTableName(Configuration conf, + String tableNameInConf, String defaultTableName) { + String tableName = conf.get(tableNameInConf, defaultTableName); + return getTableName(conf, tableName); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index aa9a793..fad0dae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -566,21 +567,4 @@ public static String getAggregationCompactionDimension(List tags) { } return appId; } - - public static boolean isFlowRunTable(HRegionInfo hRegionInfo, - Configuration conf) { - String regionTableName = hRegionInfo.getTable().getNameAsString(); - String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME, - FlowRunTable.DEFAULT_TABLE_NAME); - if (LOG.isDebugEnabled()) { - LOG.debug("regionTableName=" + regionTableName); - } - if (flowRunTableName.equalsIgnoreCase(regionTableName)) { - if (LOG.isDebugEnabled()) { - LOG.debug(" table is the flow run table!! " + flowRunTableName); - } - return true; - } - return false; - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index a9dcfaa..5c7b069 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -71,7 +71,7 @@ public void start(CoprocessorEnvironment e) throws IOException { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; this.region = env.getRegion(); - isFlowRunRegion = TimelineStorageUtils.isFlowRunTable( + isFlowRunRegion = FlowRunTable.isFlowRunTable( region.getRegionInfo(), env.getConfiguration()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java index 547bef0..4cd581b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -138,4 +139,23 @@ public void createTable(Admin admin, Configuration hbaseConf) LOG.info("Status of table creation for " + table.getNameAsString() + "=" + admin.tableExists(table)); } + + public static boolean isFlowRunTable(HRegionInfo hRegionInfo, + Configuration conf) { + String regionTableName = hRegionInfo.getTable().getNameAsString(); + if (LOG.isDebugEnabled()) { + LOG.debug("regionTableName=" + regionTableName); + } + String flowRunTableName = BaseTable.getTableName(conf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME) + .getNameAsString(); + if (flowRunTableName.equalsIgnoreCase(regionTableName)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" table is the flow run table!! " + + flowRunTableName); + } + return true; + } + return false; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md index b6a0da4..00da4f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/TimelineServiceV2.md @@ -143,6 +143,7 @@ New configuration parameters that are introduced with v.2 are marked bold. | **`yarn.timeline-service.writer.class`** | The class for the backend storage writer. Defaults to HBase storage writer. | | **`yarn.timeline-service.reader.class`** | The class for the backend storage reader. Defaults to HBase storage reader. | | **`yarn.system-metrics-publisher.enabled`** | The setting that controls whether yarn system metrics is published on the Timeline service or not by RM And NM. Defaults to `false`. | +| **`yarn.timeline-service.schema.prefix`** | The schema prefix for hbase tables. Defaults to ".dev". | #### Advanced configuration @@ -194,7 +195,8 @@ Finally, run the schema creator tool to create the necessary tables: The `TimelineSchemaCreator` tool supports a few options that may come handy especially when you are testing. For example, you can use `-skipExistingTable` (`-s` for short) to skip existing tables -and continue to create other tables rather than failing the schema creation. +and continue to create other tables rather than failing the schema creation. By default, the tables +will have a schema prefix of "dev." #### Enabling Timeline Service v.2 Following are the basic configurations to start Timeline service v.2: