diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index 2d85bab..18f975a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -32,8 +32,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Result; @@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; import org.apache.hadoop.yarn.util.ConverterUtils; /** @@ -887,4 +890,21 @@ public static String getAggregationCompactionDimension(List tags) { Set eventsSet = new HashSet<>(eventsMap.values()); entity.addEvents(eventsSet); } + + public static boolean isFlowRunTable(HRegionInfo hRegionInfo, + Configuration conf) { + String regionTableName = hRegionInfo.getTable().getNameAsString(); + String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME, + FlowRunTable.DEFAULT_TABLE_NAME); + if (LOG.isDebugEnabled()) { + LOG.debug("regionTableName=" + regionTableName); + } + if (flowRunTableName.equalsIgnoreCase(regionTableName)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" table is the flow run table!! " + flowRunTableName); + } + return true; + } + return false; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java index 3e3e3ab..b194f07 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java @@ -84,7 +84,7 @@ + ".table.metrics.ttl"; /** default value for entity table name. */ - private static final String DEFAULT_TABLE_NAME = "timelineservice.entity"; + public static final String DEFAULT_TABLE_NAME = "timelineservice.entity"; /** default TTL is 30 days for metrics timeseries. */ private static final int DEFAULT_METRICS_TTL = 2592000; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index 450640a..5edc9a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -57,6 +57,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver { private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); + private boolean isFlowRunRegion = false; private HRegion region; /** @@ -70,9 +71,15 @@ public void start(CoprocessorEnvironment e) throws IOException { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; this.region = env.getRegion(); + isFlowRunRegion = TimelineStorageUtils.isFlowRunTable( + region.getRegionInfo(), env.getConfiguration()); } } + public boolean isFlowRunRegion() { + return isFlowRunRegion; + } + /* * (non-Javadoc) * @@ -93,6 +100,9 @@ public void prePut(ObserverContext e, Put put, WALEdit edit, Durability durability) throws IOException { Map attributes = put.getAttributesMap(); + if(!isFlowRunRegion) { + return; + } // Assumption is that all the cells in a put are the same operation. List tags = new ArrayList<>(); if ((attributes != null) && (attributes.size() > 0)) { @@ -160,6 +170,10 @@ private long getCellTimestamp(long timestamp, List tags) { @Override public void preGetOp(ObserverContext e, Get get, List results) throws IOException { + if(!isFlowRunRegion) { + return; + } + Scan scan = new Scan(get); scan.setMaxVersions(); RegionScanner scanner = null; @@ -190,11 +204,14 @@ public void preGetOp(ObserverContext e, @Override public RegionScanner preScannerOpen( ObserverContext e, Scan scan, - RegionScanner s) throws IOException { - // set max versions for scan to see all - // versions to aggregate for metrics - scan.setMaxVersions(); - return s; + RegionScanner scanner) throws IOException { + + if(isFlowRunRegion) { + // set max versions for scan to see all + // versions to aggregate for metrics + scan.setMaxVersions(); + } + return scanner; } /* @@ -213,6 +230,9 @@ public RegionScanner preScannerOpen( public RegionScanner postScannerOpen( ObserverContext e, Scan scan, RegionScanner scanner) throws IOException { + if(!isFlowRunRegion) { + return scanner; + } return new FlowScanner(e.getEnvironment(), scan.getBatch(), scanner, FlowScannerOperation.READ); } @@ -221,6 +241,9 @@ public RegionScanner postScannerOpen( public InternalScanner preFlush( ObserverContext c, Store store, InternalScanner scanner) throws IOException { + if(!isFlowRunRegion) { + return scanner; + } if (LOG.isDebugEnabled()) { if (store != null) { LOG.debug("preFlush store = " + store.getColumnFamilyName() @@ -241,6 +264,9 @@ public InternalScanner preFlush( @Override public void postFlush(ObserverContext c, Store store, StoreFile resultFile) { + if(!isFlowRunRegion) { + return; + } if (LOG.isDebugEnabled()) { if (store != null) { LOG.debug("postFlush store = " + store.getColumnFamilyName() @@ -262,6 +288,9 @@ public InternalScanner preCompact( InternalScanner scanner, ScanType scanType, CompactionRequest request) throws IOException { + if(!isFlowRunRegion) { + return scanner; + } FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION; if (request != null) { requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 0ace529..398d7b4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -210,7 +210,7 @@ private boolean nextInternal(List cells, int cellLimit) if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { if (converter != null && isNumericConverter(converter)) { addedCnt += emitCells(cells, currentColumnCells, currentAggOp, - (NumericValueConverter)converter, currentTimestamp); + converter, currentTimestamp); } resetState(currentColumnCells, alreadySeenAggDim); currentColumnQualifier = newColumnQualifier; @@ -219,6 +219,7 @@ private boolean nextInternal(List cells, int cellLimit) } // No operation needs to be performed on non numeric converters. if (!isNumericConverter(converter)) { + currentColumnCells.add(cell); nextCell(cellLimit); continue; } @@ -228,7 +229,7 @@ private boolean nextInternal(List cells, int cellLimit) } if (!currentColumnCells.isEmpty()) { addedCnt += emitCells(cells, currentColumnCells, currentAggOp, - (NumericValueConverter)converter, currentTimestamp); + converter, currentTimestamp); if (LOG.isDebugEnabled()) { if (addedCnt > 0) { LOG.debug("emitted cells. " + addedCnt + " for " + this.action @@ -345,7 +346,7 @@ private void collectCells(SortedSet currentColumnCells, * parameter. */ private int emitCells(List cells, SortedSet currentColumnCells, - AggregationOperation currentAggOp, NumericValueConverter converter, + AggregationOperation currentAggOp, ValueConverter converter, long currentTimestamp) throws IOException { if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { return 0; @@ -372,12 +373,14 @@ private int emitCells(List cells, SortedSet currentColumnCells, cells.addAll(currentColumnCells); return currentColumnCells.size(); case READ: - Cell sumCell = processSummation(currentColumnCells, converter); + Cell sumCell = processSummation(currentColumnCells, + (NumericValueConverter) converter); cells.add(sumCell); return 1; case MAJOR_COMPACTION: List finalCells = processSummationMajorCompaction( - currentColumnCells, converter, currentTimestamp); + currentColumnCells, (NumericValueConverter) converter, + currentTimestamp); cells.addAll(finalCells); return finalCells.size(); default: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index a724db2..801d43c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -19,18 +19,21 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; import java.util.EnumSet; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -38,6 +41,8 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -57,6 +62,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -84,6 +91,60 @@ private static void createSchema() throws IOException { TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); } + @Test + public void checkCoProcessorOff() throws IOException, InterruptedException { + Configuration hbaseConf = util.getConfiguration(); + TableName table = TableName.valueOf(hbaseConf.get( + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + if (admin == null) { + throw new IOException("Can't check tables since admin is null"); + } + if (admin.tableExists(table)) { + // check the regions. + // check in flow run table + util.waitUntilAllRegionsAssigned(table); + HRegionServer server = util.getRSForFirstRegionInTable(table); + List regions = server.getOnlineRegions(table); + for (HRegion region : regions) { + assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + hbaseConf)); + } + } + + table = TableName.valueOf(hbaseConf.get( + FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); + if (admin.tableExists(table)) { + // check the regions. + // check in flow activity table + util.waitUntilAllRegionsAssigned(table); + HRegionServer server = util.getRSForFirstRegionInTable(table); + List regions = server.getOnlineRegions(table); + for (HRegion region : regions) { + assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + hbaseConf)); + } + } + + table = TableName.valueOf(hbaseConf.get( + EntityTable.TABLE_NAME_CONF_NAME, + EntityTable.DEFAULT_TABLE_NAME)); + if (admin.tableExists(table)) { + // check the regions. + // check in entity run table + util.waitUntilAllRegionsAssigned(table); + HRegionServer server = util.getRSForFirstRegionInTable(table); + List regions = server.getOnlineRegions(table); + for (HRegion region : regions) { + assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + hbaseConf)); + } + } + } + /** * Writes 4 timeline entities belonging to one flow run through the * {@link HBaseTimelineWriterImpl} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 2738e6a..e7e7ba4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -87,6 +89,40 @@ private static void createSchema() throws IOException { TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); } + /** writes non numeric data into flow run table + * reads it back + * + * @throws Exception + */ + @Test + public void testWriteNonNumericData() throws Exception { + String rowKey = "nonNumericRowKey"; + String column = "nonNumericColumnName"; + String value = "nonNumericValue"; + byte[] rowKeyBytes = Bytes.toBytes(rowKey); + byte[] columnNameBytes = Bytes.toBytes(column); + byte[] valueBytes = Bytes.toBytes(value); + Put p = new Put(rowKeyBytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, + valueBytes); + Configuration hbaseConf = util.getConfiguration(); + TableName table = TableName.valueOf(hbaseConf.get( + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Table flowRunTable = conn.getTable(table); + flowRunTable.put(p); + + Get g = new Get(rowKeyBytes); + Result r = flowRunTable.get(g); + assertNotNull(r); + assertTrue(r.size() >= 1); + Cell actualValue = r.getColumnLatestCell( + FlowRunColumnFamily.INFO.getBytes(), columnNameBytes); + assertNotNull(CellUtil.cloneValue(actualValue)); + assertEquals(Bytes.toString(CellUtil.cloneValue(actualValue)), value); + } + @Test public void testWriteFlowRunCompaction() throws Exception { String cluster = "kompaction_cluster1";