diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index ff2aa4f..987d02c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1965,6 +1965,18 @@ public static boolean isAclEnabled(Configuration conf) { public static final String TIMELINE_SERVICE_READER_CLASS = TIMELINE_SERVICE_PREFIX + "reader.class"; + /** + * default schema prefix for hbase tables + */ + public static final String DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX = + "dev."; + + /** + * config param name to override schema prefix + */ + public static final String TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME = + TIMELINE_SERVICE_PREFIX + "schema.prefix"; + /** The setting that controls how often the timeline collector flushes the * timeline writer. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java new file mode 100644 index 0000000..f9a4a7e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorageSchema.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; + +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +public class TestHBaseTimelineStorageSchema { + private static HBaseTestingUtility util; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + util = new HBaseTestingUtility(); + util.startMiniCluster(); + } + + private static void createSchema(Configuration conf) throws IOException { + TimelineSchemaCreator.createAllTables(conf, false); + } + + @Test + public void createWithDefaultPrefix() throws IOException { + Configuration hbaseConf = util.getConfiguration(); + createSchema(hbaseConf); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + + TableName entityTableName = BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(entityTableName)); + assertTrue(entityTableName.getNameAsString().startsWith( + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX)); + Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME)); + assertNotNull(entityTable); + + TableName flowRunTableName = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(flowRunTableName)); + assertTrue(flowRunTableName.getNameAsString().startsWith( + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX)); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + assertNotNull(flowRunTable); + } + + @Test + public void createWithSetPrefix() throws IOException { + Configuration hbaseConf = util.getConfiguration(); + String prefix = "unit-test."; + hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME, + prefix); + createSchema(hbaseConf); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + + TableName entityTableName = BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(entityTableName)); + assertTrue(entityTableName.getNameAsString().startsWith(prefix)); + Table entityTable = conn.getTable(BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME)); + assertNotNull(entityTable); + + TableName flowRunTableName = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(flowRunTableName)); + assertTrue(flowRunTableName.getNameAsString().startsWith(prefix)); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + assertNotNull(flowRunTable); + + // create another set with a diff prefix + hbaseConf + .unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME); + prefix = "yet-another-unit-test."; + hbaseConf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME, + prefix); + createSchema(hbaseConf); + entityTableName = BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(entityTableName)); + assertTrue(entityTableName.getNameAsString().startsWith(prefix)); + entityTable = conn.getTable(BaseTable.getTableName(hbaseConf, + EntityTable.TABLE_NAME_CONF_NAME, EntityTable.DEFAULT_TABLE_NAME)); + assertNotNull(entityTable); + + flowRunTableName = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); + assertTrue(admin.tableExists(flowRunTableName)); + assertTrue(flowRunTableName.getNameAsString().startsWith(prefix)); + flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + assertNotNull(flowRunTable); + hbaseConf + .unset(YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java index 0535a13..a8f0f47 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestFlowDataGenerator.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + import java.util.HashMap; import java.util.HashSet; import java.util.Map; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 37490ff..552f5d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.junit.AfterClass; @@ -155,8 +156,9 @@ public void testWriteFlowRunMinMax() throws Exception { Connection conn = ConnectionFactory.createConnection(c1); // check in flow activity table - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable( + BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); byte[] startRow = new FlowActivityRowKey(cluster, minStartTs, user, flow).getRowKey(); Get g = new Get(startRow); @@ -286,8 +288,9 @@ private void checkFlowActivityTable(String cluster, String user, String flow, .getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable( + BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; for (Result result : scanner) { @@ -428,8 +431,9 @@ private void checkFlowActivityTableSeveralRuns(String cluster, String user, new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable( + BaseTable.getTableName(c1, FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; for (Result result : scanner) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 6c4c810..c32c6a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -61,9 +61,11 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -94,8 +96,8 @@ private static void createSchema() throws IOException { @Test public void checkCoProcessorOff() throws IOException, InterruptedException { Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + TableName table = BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME); Connection conn = null; conn = ConnectionFactory.createConnection(hbaseConf); Admin admin = conn.getAdmin(); @@ -109,14 +111,14 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + assertTrue(FlowRunTable.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } } - table = TableName.valueOf(hbaseConf.get( + table = BaseTable.getTableName(hbaseConf, FlowActivityTable.TABLE_NAME_CONF_NAME, - FlowActivityTable.DEFAULT_TABLE_NAME)); + FlowActivityTable.DEFAULT_TABLE_NAME); if (admin.tableExists(table)) { // check the regions. // check in flow activity table @@ -124,14 +126,13 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } } - table = TableName.valueOf(hbaseConf.get( - EntityTable.TABLE_NAME_CONF_NAME, - EntityTable.DEFAULT_TABLE_NAME)); + table = BaseTable.getTableName(hbaseConf, EntityTable.TABLE_NAME_CONF_NAME, + EntityTable.DEFAULT_TABLE_NAME); if (admin.tableExists(table)) { // check the regions. // check in entity run table @@ -139,7 +140,7 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + assertFalse(FlowRunTable.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } } @@ -220,8 +221,8 @@ public void testWriteFlowRunMinMax() throws Exception { Connection conn = ConnectionFactory.createConnection(c1); // check in flow run table - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); @@ -370,8 +371,8 @@ void checkFlowRunTableBatchLimit(String cluster, String user, byte[] stopRow = new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn - .getTable(TableName.valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int loopCount = 0; @@ -515,8 +516,8 @@ private void checkFlowRunTable(String cluster, String user, String flow, new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; @@ -780,8 +781,8 @@ private void checkMinMaxFlush(Configuration c1, long minTS, long startTs, boolean checkMax) throws IOException { Connection conn = ConnectionFactory.createConnection(c1); // check in flow run table - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 71523b8..9bf8677 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; @@ -106,11 +107,10 @@ public void testWriteNonNumericData() throws Exception { p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, valueBytes); Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); Connection conn = null; conn = ConnectionFactory.createConnection(hbaseConf); - Table flowRunTable = conn.getTable(table); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); flowRunTable.put(p); Get g = new Get(rowKeyBytes); @@ -156,11 +156,10 @@ public void testWriteScanBatchLimit() throws Exception { value4Bytes); Configuration hbaseConf = util.getConfiguration(); - TableName table = TableName.valueOf(hbaseConf.get( - FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); Connection conn = null; conn = ConnectionFactory.createConnection(hbaseConf); - Table flowRunTable = conn.getTable(table); + Table flowRunTable = conn.getTable(BaseTable.getTableName(hbaseConf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); flowRunTable.put(p); String rowKey2 = "nonNumericRowKey2"; @@ -321,10 +320,11 @@ public void testWriteFlowRunCompaction() throws Exception { } // check in flow run table - HRegionServer server = util.getRSForFirstRegionInTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); - List regions = server.getOnlineRegions(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + HRegionServer server = util.getRSForFirstRegionInTable( + BaseTable.getTableName(c1, FlowRunTable.TABLE_NAME_CONF_NAME, + FlowRunTable.DEFAULT_TABLE_NAME)); + List regions = server.getOnlineRegions(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); assertTrue("Didn't find any regions for primary table!", regions.size() > 0); // flush and compact all the regions of the primary table for (Region region : regions) { @@ -348,8 +348,8 @@ private void checkFlowRunTable(String cluster, String user, String flow, new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); - Table table1 = conn.getTable(TableName - .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); + Table table1 = conn.getTable(BaseTable.getTableName(c1, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); ResultScanner scanner = table1.getScanner(s); int rowCount = 0; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java index 8581aa4..e109f6c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BaseTable.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.yarn.conf.YarnConfiguration; /** * Implements behavior common to tables used in the timeline service storage. It @@ -114,16 +115,41 @@ public Result getResult(Configuration hbaseConf, Connection conn, Get get) } /** + * Get the table name for the input table. + * + * @param hbaseConf HBase configuration from which table name will be fetched. + * @return A {@link TableName} object. + */ + public static TableName getTableName(Configuration conf, String tableName) { + String tableSchemaPrefix = conf.get( + YarnConfiguration.TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX_NAME, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_HBASE_SCHEMA_PREFIX); + return TableName.valueOf(tableSchemaPrefix + tableName); + } + + /** * Get the table name for this table. * * @param hbaseConf HBase configuration from which table name will be fetched. * @return A {@link TableName} object. */ - public TableName getTableName(Configuration hbaseConf) { - TableName table = - TableName.valueOf(hbaseConf.get(tableNameConfName, defaultTableName)); - return table; + public TableName getTableName(Configuration conf) { + String tableName = conf.get(tableNameConfName, defaultTableName); + return getTableName(conf, tableName); + } + /** + * Get the table name based on the input config parameters. + * + * @param hbaseConf HBase configuration from which table name will be fetched. + * @param tableNameInConf the table name parameter in conf. + * @param defaultTable the default table name. + * @return A {@link TableName} object. + */ + public static TableName getTableName(Configuration conf, + String tableNameInConf, String defaultTableName) { + String tableName = conf.get(tableNameInConf, defaultTableName); + return getTableName(conf, tableName); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index aa9a793..fad0dae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -566,21 +567,4 @@ public static String getAggregationCompactionDimension(List tags) { } return appId; } - - public static boolean isFlowRunTable(HRegionInfo hRegionInfo, - Configuration conf) { - String regionTableName = hRegionInfo.getTable().getNameAsString(); - String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME, - FlowRunTable.DEFAULT_TABLE_NAME); - if (LOG.isDebugEnabled()) { - LOG.debug("regionTableName=" + regionTableName); - } - if (flowRunTableName.equalsIgnoreCase(regionTableName)) { - if (LOG.isDebugEnabled()) { - LOG.debug(" table is the flow run table!! " + flowRunTableName); - } - return true; - } - return false; - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index a9dcfaa..5c7b069 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -71,7 +71,7 @@ public void start(CoprocessorEnvironment e) throws IOException { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; this.region = env.getRegion(); - isFlowRunRegion = TimelineStorageUtils.isFlowRunTable( + isFlowRunRegion = FlowRunTable.isFlowRunTable( region.getRegionInfo(), env.getConfiguration()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java index 547bef0..4cd581b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunTable.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -138,4 +139,23 @@ public void createTable(Admin admin, Configuration hbaseConf) LOG.info("Status of table creation for " + table.getNameAsString() + "=" + admin.tableExists(table)); } + + public static boolean isFlowRunTable(HRegionInfo hRegionInfo, + Configuration conf) { + String regionTableName = hRegionInfo.getTable().getNameAsString(); + if (LOG.isDebugEnabled()) { + LOG.debug("regionTableName=" + regionTableName); + } + String flowRunTableName = BaseTable.getTableName(conf, + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME) + .getNameAsString(); + if (flowRunTableName.equalsIgnoreCase(regionTableName)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" table is the flow run table!! " + + flowRunTableName); + } + return true; + } + return false; + } }