diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 7b647eb..fd5a7f5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -70,12 +70,14 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.junit.After; import org.junit.AfterClass; @@ -649,8 +651,9 @@ public void testWriteApplicationToHBase() throws Exception { infoMap.putAll(infoMap1); // retrieve the row - byte[] rowKey = - ApplicationRowKey.getRowKey(cluster, user, flow, runid, appId); + ApplicationRowKey applicationRowKey = + new ApplicationRowKey(cluster, user, flow, runid, appId); + byte[] rowKey = applicationRowKey.getRowKey(); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); Connection conn = ConnectionFactory.createConnection(c1); @@ -674,7 +677,7 @@ public void testWriteApplicationToHBase() throws Exception { Map infoColumns = ApplicationColumnPrefix.INFO.readResults(result, - StringKeyConverter.getInstance()); + new StringKeyConverter()); assertEquals(infoMap, infoColumns); // Remember isRelatedTo is of type Map> @@ -710,15 +713,16 @@ public void testWriteApplicationToHBase() throws Exception { } } + KeyConverter stringKeyConverter = new StringKeyConverter(); // Configuration Map configColumns = - ApplicationColumnPrefix.CONFIG.readResults(result, - StringKeyConverter.getInstance()); + ApplicationColumnPrefix.CONFIG + .readResults(result, stringKeyConverter); assertEquals(conf, configColumns); NavigableMap> metricsResult = - ApplicationColumnPrefix.METRIC.readResultsWithTimestamps( - result, StringKeyConverter.getInstance()); + ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result, + stringKeyConverter); NavigableMap metricMap = metricsResult.get(m1.getId()); matchMetrics(metricValues, metricMap); @@ -908,7 +912,8 @@ public void testWriteEntityToHBase() throws Exception { // scan the table and see that entity exists Scan s = new Scan(); byte[] startRow = - EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName); + new EntityRowKeyPrefix(cluster, user, flow, runid, appName) + .getRowKeyPrefix(); s.setStartRow(startRow); s.setMaxVersions(Integer.MAX_VALUE); Connection conn = ConnectionFactory.createConnection(c1); @@ -916,6 +921,7 @@ public void testWriteEntityToHBase() throws Exception { int rowCount = 0; int colCount = 0; + KeyConverter stringKeyConverter = new StringKeyConverter(); for (Result result : scanner) { if (result != null && !result.isEmpty()) { rowCount++; @@ -936,7 +942,7 @@ public void testWriteEntityToHBase() throws Exception { Map infoColumns = EntityColumnPrefix.INFO.readResults(result, - StringKeyConverter.getInstance()); + new StringKeyConverter()); assertEquals(infoMap, infoColumns); // Remember isRelatedTo is of type Map> @@ -975,13 +981,12 @@ public void testWriteEntityToHBase() throws Exception { // Configuration Map configColumns = - EntityColumnPrefix.CONFIG.readResults(result, - StringKeyConverter.getInstance()); + EntityColumnPrefix.CONFIG.readResults(result, stringKeyConverter); assertEquals(conf, configColumns); NavigableMap> metricsResult = - EntityColumnPrefix.METRIC.readResultsWithTimestamps( - result, StringKeyConverter.getInstance()); + EntityColumnPrefix.METRIC.readResultsWithTimestamps(result, + stringKeyConverter); NavigableMap metricMap = metricsResult.get(m1.getId()); matchMetrics(metricValues, metricMap); @@ -1116,8 +1121,9 @@ public void testEvents() throws IOException { hbi.stop(); // retrieve the row - byte[] rowKey = - ApplicationRowKey.getRowKey(cluster, user, flow, runid, appName); + ApplicationRowKey applicationRowKey = + new ApplicationRowKey(cluster, user, flow, runid, appName); + byte[] rowKey = applicationRowKey.getRowKey(); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); Connection conn = ConnectionFactory.createConnection(c1); @@ -1132,7 +1138,7 @@ public void testEvents() throws IOException { Map eventsResult = ApplicationColumnPrefix.EVENT.readResults(result, - EventColumnNameConverter.getInstance()); + new EventColumnNameConverter()); // there should be only one event assertEquals(1, eventsResult.size()); for (Map.Entry e : eventsResult.entrySet()) { @@ -1212,7 +1218,8 @@ public void testEventsWithEmptyInfo() throws IOException { String appName = ApplicationId.newInstance(System.currentTimeMillis() + 9000000L, 1).toString(); byte[] startRow = - EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName); + new EntityRowKeyPrefix(cluster, user, flow, runid, appName) + .getRowKeyPrefix(); hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); hbi.stop(); // scan the table and see that entity exists @@ -1234,7 +1241,7 @@ public void testEventsWithEmptyInfo() throws IOException { Map eventsResult = EntityColumnPrefix.EVENT.readResults(result, - EventColumnNameConverter.getInstance()); + new EventColumnNameConverter()); // there should be only one event assertEquals(1, eventsResult.size()); for (Map.Entry e : eventsResult.entrySet()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 589b78d..37490ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -158,7 +158,7 @@ public void testWriteFlowRunMinMax() throws Exception { Table table1 = conn.getTable(TableName .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); byte[] startRow = - FlowActivityRowKey.getRowKey(cluster, minStartTs, user, flow); + new FlowActivityRowKey(cluster, minStartTs, user, flow).getRowKey(); Get g = new Get(startRow); Result r1 = table1.get(g); assertNotNull(r1); @@ -278,11 +278,12 @@ private void checkFlowActivityTable(String cluster, String user, String flow, Scan s = new Scan(); s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); byte[] startRow = - FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow); + new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey(); s.setStartRow(startRow); String clusterStop = cluster + "1"; byte[] stopRow = - FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow); + new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow) + .getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); Table table1 = conn.getTable(TableName @@ -420,11 +421,11 @@ private void checkFlowActivityTableSeveralRuns(String cluster, String user, Scan s = new Scan(); s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); byte[] startRow = - FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow); + new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey(); s.setStartRow(startRow); String clusterStop = cluster + "1"; byte[] stopRow = - FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow); + new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); Table table1 = conn.getTable(TableName diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index a443b50..328b25a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -59,8 +59,8 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; @@ -224,7 +224,7 @@ public void testWriteFlowRunMinMax() throws Exception { .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); Get g = new Get(startRow); g.addColumn(FlowRunColumnFamily.INFO.getBytes(), FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); @@ -354,10 +354,11 @@ private void checkFlowRunTable(String cluster, String user, String flow, long runid, Configuration c1) throws IOException { Scan s = new Scan(); s.addFamily(FlowRunColumnFamily.INFO.getBytes()); - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); s.setStartRow(startRow); String clusterStop = cluster + "1"; - byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid); + byte[] stopRow = + new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); Table table1 = conn.getTable(TableName @@ -629,7 +630,7 @@ private void checkMinMaxFlush(Configuration c1, long minTS, long startTs, .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); Get g = new Get(startRow); g.addColumn(FlowRunColumnFamily.INFO.getBytes(), FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 6b0ee5c..e1bef53 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -19,24 +19,24 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertNotEquals; import java.io.IOException; -import java.util.Map; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; -import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -46,21 +46,21 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; /** * Tests the FlowRun and FlowActivity Tables @@ -194,10 +194,11 @@ private void checkFlowRunTable(String cluster, String user, String flow, long runid, Configuration c1, int valueCount) throws IOException { Scan s = new Scan(); s.addFamily(FlowRunColumnFamily.INFO.getBytes()); - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); s.setStartRow(startRow); String clusterStop = cluster + "1"; - byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid); + byte[] stopRow = + new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); Table table1 = conn.getTable(TableName @@ -302,8 +303,9 @@ public void checkProcessSummationMoreCellsSumFinal2() cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); currentColumnCells.add(c4); - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + List cells = + fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should be getting back 4 cells @@ -387,8 +389,9 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { cellTsNotFinal++; } - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + List cells = + fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should be getting back count + 1 cells @@ -489,8 +492,9 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() throws IOExceptio cellTsNotFinal++; } - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + List cells = + fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should be getting back @@ -554,7 +558,7 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException { 130L, Bytes.toBytes(cellValue2), tagByteArray); currentColumnCells.add(c2); List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should be getting back two cells @@ -602,7 +606,7 @@ public void testProcessSummationOneCellSumFinal() throws IOException { currentColumnCells.add(c1); List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should not get the same cell back // but we get back the flow cell @@ -639,7 +643,7 @@ public void testProcessSummationOneCell() throws IOException { currentTimestamp, Bytes.toBytes(1110L), tagByteArray); currentColumnCells.add(c1); List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + new LongConverter(), currentTimestamp); assertNotNull(cells); // we expect the same cell back assertEquals(1, cells.size()); @@ -653,15 +657,19 @@ public void testProcessSummationEmpty() throws IOException { FlowScanner fs = getFlowScannerForTestingCompaction(); long currentTimestamp = System.currentTimeMillis(); + LongConverter longConverter = new LongConverter(); + SortedSet currentColumnCells = null; - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + List cells = + fs.processSummationMajorCompaction(currentColumnCells, longConverter, + currentTimestamp); assertNotNull(cells); assertEquals(0, cells.size()); currentColumnCells = new TreeSet(KeyValue.COMPARATOR); - cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + cells = + fs.processSummationMajorCompaction(currentColumnCells, longConverter, + currentTimestamp); assertNotNull(cells); assertEquals(0, cells.size()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java index 036746b..cccae26 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java @@ -31,14 +31,9 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; @@ -209,39 +204,6 @@ private static SingleColumnValueFilter createHBaseSingleColValueFilter( return singleColValFilter; } - private static byte[] createColQualifierPrefix(ColumnPrefix colPrefix, - String column) { - if (colPrefix == ApplicationColumnPrefix.EVENT || - colPrefix == EntityColumnPrefix.EVENT) { - return EventColumnNameConverter.getInstance().encode( - new EventColumnName(column, null, null)); - } else { - return StringKeyConverter.getInstance().encode(column); - } - } - - /** - * Create a filter list of qualifier filters based on passed set of columns. - * - * @param Describes the type of column prefix. - * @param colPrefix Column Prefix. - * @param columns set of column qualifiers. - * @return filter list. - */ - public static FilterList createFiltersFromColumnQualifiers( - ColumnPrefix colPrefix, Set columns) { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - for (String column : columns) { - // For columns which have compound column qualifiers (eg. events), we need - // to include the required separator. - byte[] compoundColQual = createColQualifierPrefix(colPrefix, column); - list.addFilter(new QualifierFilter(CompareOp.EQUAL, - new BinaryPrefixComparator( - colPrefix.getColumnPrefixBytes(compoundColQual)))); - } - return list; - } - /** * Fetch columns from filter list containing exists and multivalue equality * filters. This is done to fetch only required columns from back-end and diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index f8b5a65..9750d64 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; @@ -86,6 +87,17 @@ private TypedBufferedMutator flowActivityTable; private TypedBufferedMutator flowRunTable; + /** + * Used to convert strings key components to and from storage format. + */ + private final KeyConverter stringKeyConverter = + new StringKeyConverter(); + + /** + * Used to convert Long key components to and from storage format. + */ + protected final KeyConverter longKeyConverter = new LongKeyConverter(); + public HBaseTimelineWriterImpl() { super(HBaseTimelineWriterImpl.class.getName()); } @@ -139,11 +151,17 @@ public TimelineWriteResponse write(String clusterId, String userId, // if the entity is the application, the destination is the application // table boolean isApplication = TimelineStorageUtils.isApplicationEntity(te); - byte[] rowKey = isApplication ? - ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId, - appId) : - EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId, - te.getType(), te.getId()); + byte[] rowKey; + if (isApplication) { + ApplicationRowKey applicationRowKey = + new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId); + rowKey = applicationRowKey.getRowKey(); + } else { + EntityRowKey entityRowKey = + new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, + te.getType(), te.getId()); + rowKey = entityRowKey.getRowKey(); + } storeInfo(rowKey, te, flowVersion, isApplication); storeEvents(rowKey, te.getEvents(), isApplication); @@ -155,8 +173,9 @@ public TimelineWriteResponse write(String clusterId, String userId, TimelineEvent event = TimelineStorageUtils.getApplicationEvent(te, ApplicationMetricsConstants.CREATED_EVENT_TYPE); if (event != null) { - onApplicationCreated(clusterId, userId, flowName, flowVersion, - flowRunId, appId, te, event.getTimestamp()); + FlowRunRowKey flowRunRowKey = new FlowRunRowKey(clusterId, userId, flowName, flowRunId); + AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(clusterId, appId); + onApplicationCreated(flowRunRowKey, appToFlowRowKey, appId, userId, flowVersion, te, event.getTimestamp()); } // if it's an application entity, store metrics storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId, @@ -174,55 +193,44 @@ public TimelineWriteResponse write(String clusterId, String userId, return putStatus; } - private void onApplicationCreated(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntity te, long appCreatedTimeStamp) throws IOException { + private void onApplicationCreated(FlowRunRowKey flowRunRowKey, + AppToFlowRowKey appToFlowRowKey, String appId, String userId, + String flowVersion, TimelineEntity te, long appCreatedTimeStamp) throws IOException { + + String flowName = flowRunRowKey.getFlowName(); + Long flowRunId = flowRunRowKey.getFlowRunId(); + // store in App to flow table - storeInAppToFlowTable(clusterId, userId, flowName, flowRunId, appId, te); + byte[] rowKey = appToFlowRowKey.getRowKey(); + AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); + AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId); + AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId); + // store in flow run table - storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion, - flowRunId, appId, te); - // store in flow activity table - storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, - flowRunId, appId, appCreatedTimeStamp); - } + storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te); - /* - * updates the {@link FlowActivityTable} with the Application TimelineEntity - * information - */ - private void storeInFlowActivityTable(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - long activityTimeStamp) throws IOException { - byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, activityTimeStamp, - userId, flowName); - byte[] qualifier = LongKeyConverter.getInstance().encode(flowRunId); - FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, - null, flowVersion, + // store in flow activity table + byte[] flowActivityRowKeyBytes = + new FlowActivityRowKey(flowRunRowKey.getClusterId(), + appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName) + .getRowKey(); + byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); + FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes, + flowActivityTable, qualifier, null, flowVersion, AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); } /* * updates the {@link FlowRunTable} with Application Created information */ - private void storeAppCreatedInFlowRunTable(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, + private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey, String appId, TimelineEntity te) throws IOException { - byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, - flowRunId); + byte[] rowKey = flowRunRowKey.getRowKey(); FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null, te.getCreatedTime(), AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); } - private void storeInAppToFlowTable(String clusterId, String userId, - String flowName, long flowRunId, String appId, TimelineEntity te) - throws IOException { - byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); - AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); - AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId); - AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId); - } /* * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an @@ -236,8 +244,13 @@ private void onApplicationFinished(String clusterId, String userId, appId, te, appFinishedTimeStamp); // indicate in the flow activity table that the app has finished - storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, - flowRunId, appId, appFinishedTimeStamp); + byte[] rowKey = + new FlowActivityRowKey(clusterId, appFinishedTimeStamp, userId, + flowName).getRowKey(); + byte[] qualifier = longKeyConverter.encode(flowRunId); + FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, + null, flowVersion, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); } /* @@ -247,7 +260,7 @@ private void storeAppFinishedInFlowRunTable(String clusterId, String userId, String flowName, long flowRunId, String appId, TimelineEntity te, long appFinishedTimeStamp) throws IOException { byte[] rowKey = - FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId); + new FlowRunRowKey(clusterId, userId, flowName, flowRunId).getRowKey(); Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId); FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null, @@ -269,8 +282,8 @@ private void storeFlowMetricsAppRunning(String clusterId, String userId, throws IOException { Set metrics = te.getMetrics(); if (metrics != null) { - byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, - flowRunId); + byte[] rowKey = new FlowRunRowKey(clusterId, userId, flowName, + flowRunId).getRowKey(); storeFlowMetrics(rowKey, metrics, AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId), AggregationOperation.SUM.getAttribute()); @@ -280,8 +293,7 @@ private void storeFlowMetricsAppRunning(String clusterId, String userId, private void storeFlowMetrics(byte[] rowKey, Set metrics, Attribute... attributes) throws IOException { for (TimelineMetric metric : metrics) { - byte[] metricColumnQualifier = - StringKeyConverter.getInstance().encode(metric.getId()); + byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId()); Map timeseries = metric.getValues(); for (Map.Entry timeseriesEntry : timeseries.entrySet()) { Long timestamp = timeseriesEntry.getKey(); @@ -320,8 +332,8 @@ private void storeRelations(byte[] rowKey, TimelineEntity te, String compoundValue = Separator.VALUES.joinEncoded(connectedEntity.getValue()); columnPrefix.store(rowKey, table, - StringKeyConverter.getInstance().encode(connectedEntity.getKey()), - null, compoundValue); + stringKeyConverter.encode(connectedEntity.getKey()), null, + compoundValue); } } @@ -341,7 +353,7 @@ private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, if (info != null) { for (Map.Entry entry : info.entrySet()) { ApplicationColumnPrefix.INFO.store(rowKey, applicationTable, - StringKeyConverter.getInstance().encode(entry.getKey()), null, + stringKeyConverter.encode(entry.getKey()), null, entry.getValue()); } } @@ -355,7 +367,7 @@ private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, if (info != null) { for (Map.Entry entry : info.entrySet()) { EntityColumnPrefix.INFO.store(rowKey, entityTable, - StringKeyConverter.getInstance().encode(entry.getKey()), null, + stringKeyConverter.encode(entry.getKey()), null, entry.getValue()); } } @@ -371,8 +383,7 @@ private void storeConfig(byte[] rowKey, Map config, return; } for (Map.Entry entry : config.entrySet()) { - byte[] configKey = - StringKeyConverter.getInstance().encode(entry.getKey()); + byte[] configKey = stringKeyConverter.encode(entry.getKey()); if (isApplication) { ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable, configKey, null, entry.getValue()); @@ -392,7 +403,7 @@ private void storeMetrics(byte[] rowKey, Set metrics, if (metrics != null) { for (TimelineMetric metric : metrics) { byte[] metricColumnQualifier = - StringKeyConverter.getInstance().encode(metric.getId()); + stringKeyConverter.encode(metric.getId()); Map timeseries = metric.getValues(); for (Map.Entry timeseriesEntry : timeseries.entrySet()) { Long timestamp = timeseriesEntry.getKey(); @@ -425,8 +436,7 @@ private void storeEvents(byte[] rowKey, Set events, "! Using the current timestamp"); eventTimestamp = System.currentTimeMillis(); } - EventColumnNameConverter converter = - EventColumnNameConverter.getInstance(); + EventColumnNameConverter converter = new EventColumnNameConverter(); Map eventInfo = event.getInfo(); if ((eventInfo == null) || (eventInfo.size() == 0)) { byte[] columnQualifierBytes = converter.encode( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java index 80fcf8c..dde3911 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java @@ -45,7 +45,7 @@ * When the application was created. */ CREATED_TIME(ApplicationColumnFamily.INFO, "created_time", - LongConverter.getInstance()), + new LongConverter()), /** * The version of the flow that this app belongs to. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index 0febc67..42488f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -67,8 +67,7 @@ /** * Metrics are stored with the metric name as the column name. */ - METRIC(ApplicationColumnFamily.METRICS, null, - LongConverter.getInstance()); + METRIC(ApplicationColumnFamily.METRICS, null, new LongConverter()); private final ColumnHelper column; private final ColumnFamily columnFamily; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java index e476b21..663f6cf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.application; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; + + /** * Represents a rowkey for the application table. */ @@ -27,6 +30,8 @@ private final String flowName; private final Long flowRunId; private final String appId; + private final KeyConverter appRowKeyConverter = + new ApplicationRowKeyConverter(); public ApplicationRowKey(String clusterId, String userId, String flowName, Long flowRunId, String appId) { @@ -58,37 +63,6 @@ public String getAppId() { } /** - * Constructs a row key prefix for the application table as follows: - * {@code clusterId!userName!flowName!}. - * - * @param clusterId Cluster Id. - * @param userId User Id. - * @param flowName Flow Name. - * @return byte array with the row key prefix - */ - public static byte[] getRowKeyPrefix(String clusterId, String userId, - String flowName) { - return ApplicationRowKeyConverter.getInstance().encode( - new ApplicationRowKey(clusterId, userId, flowName, null, null)); - } - - /** - * Constructs a row key prefix for the application table as follows: - * {@code clusterId!userName!flowName!flowRunId!}. - * - * @param clusterId Cluster Id. - * @param userId User Id. - * @param flowName Flow Name. - * @param flowRunId Run Id for the flow. - * @return byte array with the row key prefix - */ - public static byte[] getRowKeyPrefix(String clusterId, String userId, - String flowName, Long flowRunId) { - return ApplicationRowKeyConverter.getInstance().encode( - new ApplicationRowKey(clusterId, userId, flowName, flowRunId, null)); - } - - /** * Constructs a row key for the application table as follows: * {@code clusterId!userName!flowName!flowRunId!AppId}. * @@ -99,10 +73,8 @@ public String getAppId() { * @param appId App Id. * @return byte array with the row key */ - public static byte[] getRowKey(String clusterId, String userId, - String flowName, Long flowRunId, String appId) { - return ApplicationRowKeyConverter.getInstance().encode( - new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId)); + public byte[] getRowKey() { + return appRowKeyConverter.encode(this); } /** @@ -112,6 +84,6 @@ public String getAppId() { * @return An ApplicationRowKey object. */ public static ApplicationRowKey parseRowKey(byte[] rowKey) { - return ApplicationRowKeyConverter.getInstance().decode(rowKey); + return new ApplicationRowKeyConverter().decode(rowKey); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java index 3b054a5..1e71f26 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java @@ -32,14 +32,14 @@ */ public final class ApplicationRowKeyConverter implements KeyConverter { - private static final ApplicationRowKeyConverter INSTANCE = - new ApplicationRowKeyConverter(); - public static ApplicationRowKeyConverter getInstance() { - return INSTANCE; - } + private final static KeyConverter appIdKeyConverter = + new AppIdKeyConverter(); - private ApplicationRowKeyConverter() { + /** + * Intended for use in ApplicationRowKey only. + */ + public ApplicationRowKeyConverter() { } // Application row key is of the form @@ -92,7 +92,7 @@ private ApplicationRowKeyConverter() { if (rowKey.getAppId() == null || rowKey.getAppId().isEmpty()) { return Separator.QUALIFIERS.join(first, second, Separator.EMPTY_BYTES); } - byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId()); + byte[] third = appIdKeyConverter.encode(rowKey.getAppId()); return Separator.QUALIFIERS.join(first, second, third); } @@ -124,7 +124,7 @@ public ApplicationRowKey decode(byte[] rowKey) { Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); Long flowRunId = TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); - String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]); + String appId = appIdKeyConverter.decode(rowKeyComponents[4]); return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java index 6a38e32..cb4a1dd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; + /** * Represents a rowkey for the app_flow table. */ public class AppToFlowRowKey { private final String clusterId; private final String appId; + private final KeyConverter appToFlowRowKeyConverter = + new AppToFlowRowKeyConverter(); public AppToFlowRowKey(String clusterId, String appId) { this.clusterId = clusterId; @@ -45,9 +49,8 @@ public String getAppId() { * @param appId Application Id. * @return byte array with the row key */ - public static byte[] getRowKey(String clusterId, String appId) { - return AppToFlowRowKeyConverter.getInstance().encode( - new AppToFlowRowKey(clusterId, appId)); + public byte[] getRowKey() { + return appToFlowRowKeyConverter.encode(this); } /** @@ -57,6 +60,6 @@ public String getAppId() { * @return an AppToFlowRowKey object. */ public static AppToFlowRowKey parseRowKey(byte[] rowKey) { - return AppToFlowRowKeyConverter.getInstance().decode(rowKey); + return new AppToFlowRowKeyConverter().decode(rowKey); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java index 0f0b879d..b237e54 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java @@ -31,14 +31,14 @@ */ public final class AppToFlowRowKeyConverter implements KeyConverter { - private static final AppToFlowRowKeyConverter INSTANCE = - new AppToFlowRowKeyConverter(); - public static AppToFlowRowKeyConverter getInstance() { - return INSTANCE; - } + private final static KeyConverter appIdKeyConverter = + new AppIdKeyConverter(); - private AppToFlowRowKeyConverter() { + /** + * Intended for use in AppToFlowRowKey only. + */ + public AppToFlowRowKeyConverter() { } // App to flow row key is of the form clusterId!appId with the 2 segments @@ -66,7 +66,7 @@ private AppToFlowRowKeyConverter() { public byte[] encode(AppToFlowRowKey rowKey) { byte[] first = Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); - byte[] second = AppIdKeyConverter.getInstance().encode(rowKey.getAppId()); + byte[] second = appIdKeyConverter.encode(rowKey.getAppId()); return Separator.QUALIFIERS.join(first, second); } @@ -90,7 +90,7 @@ public AppToFlowRowKey decode(byte[] rowKey) { } String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]), Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[1]); + String appId = appIdKeyConverter.decode(rowKeyComponents[1]); return new AppToFlowRowKey(clusterId, appId); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java index a173b0f..3cd837e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java @@ -28,13 +28,8 @@ * (long - 8 bytes) followed by sequence id section of app id (int - 4 bytes). */ public final class AppIdKeyConverter implements KeyConverter { - private static final AppIdKeyConverter INSTANCE = new AppIdKeyConverter(); - public static AppIdKeyConverter getInstance() { - return INSTANCE; - } - - private AppIdKeyConverter() { + public AppIdKeyConverter() { } /* diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java index 6018f86..7a3a75c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java @@ -26,6 +26,8 @@ private final String id; private final Long timestamp; private final String infoKey; + private final KeyConverter eventColumnNameConverter = + new EventColumnNameConverter(); public EventColumnName(String id, Long timestamp, String infoKey) { this.id = id; @@ -45,4 +47,13 @@ public String getInfoKey() { return infoKey; } + /** + * @return a byte array with each components/fields separated by + * Separator#VALUES. This leads to an event column name of the form + * eventId=timestamp=infokey. + */ + public byte[] getColumnQualifier() { + return eventColumnNameConverter.encode(this); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java index 32ef1c3..13872e3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnNameConverter.java @@ -30,14 +30,8 @@ */ public final class EventColumnNameConverter implements KeyConverter { - private static final EventColumnNameConverter INSTANCE = - new EventColumnNameConverter(); - public static EventColumnNameConverter getInstance() { - return INSTANCE; - } - - private EventColumnNameConverter() { + public EventColumnNameConverter() { } // eventId=timestamp=infokey are of types String, Long String diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java index 48c56f9..85ad66f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java @@ -27,13 +27,8 @@ * decodes a set of bytes as a Long. */ public final class LongConverter implements NumericValueConverter { - private static final LongConverter INSTANCE = new LongConverter(); - private LongConverter() { - } - - public static LongConverter getInstance() { - return INSTANCE; + public LongConverter() { } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java index 3954145..23f50d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java @@ -23,13 +23,13 @@ * Encodes and decodes column names / row keys which are long. */ public final class LongKeyConverter implements KeyConverter { - private static final LongKeyConverter INSTANCE = new LongKeyConverter(); - public static LongKeyConverter getInstance() { - return INSTANCE; - } + /** + * To delegate the actual work to. + */ + private static final LongConverter longConverter = new LongConverter(); - private LongKeyConverter() { + public LongKeyConverter() { } /* @@ -44,7 +44,7 @@ private LongKeyConverter() { try { // IOException will not be thrown here as we are explicitly passing // Long. - return LongConverter.getInstance().encodeValue(key); + return longConverter.encodeValue(key); } catch (IOException e) { return null; } @@ -60,7 +60,7 @@ private LongKeyConverter() { @Override public Long decode(byte[] bytes) { try { - return (Long) LongConverter.getInstance().decodeValue(bytes); + return (Long) longConverter.decodeValue(bytes); } catch (IOException e) { return null; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java index b0f6d55..282848e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java @@ -24,13 +24,8 @@ * added later, if required in the associated ColumnPrefix implementations. */ public final class StringKeyConverter implements KeyConverter { - private static final StringKeyConverter INSTANCE = new StringKeyConverter(); - public static StringKeyConverter getInstance() { - return INSTANCE; - } - - private StringKeyConverter() { + public StringKeyConverter() { } /* diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index d52a5d7..e7214ae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.common; import java.io.IOException; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -52,7 +51,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; @@ -182,21 +180,6 @@ public static boolean isApplicationFinished(TimelineEntity te) { } /** - * Check if we have a certain field amongst fields to retrieve. This method - * checks against {@link Field#ALL} as well because that would mean field - * passed needs to be matched. - * - * @param fieldsToRetrieve fields to be retrieved. - * @param requiredField fields to be checked in fieldsToRetrieve. - * @return true if has the required field, false otherwise. - */ - public static boolean hasField(EnumSet fieldsToRetrieve, - Field requiredField) { - return fieldsToRetrieve.contains(Field.ALL) || - fieldsToRetrieve.contains(requiredField); - } - - /** * Checks if the input TimelineEntity object is an ApplicationEntity. * * @param te TimelineEntity object. @@ -647,60 +630,6 @@ public static String getAggregationCompactionDimension(List tags) { } /** - * Helper method for reading relationship. - * - * @param Describes the type of column prefix. - * @param entity entity to fill. - * @param result result from HBase. - * @param prefix column prefix. - * @param isRelatedTo if true, means relationship is to be added to - * isRelatedTo, otherwise its added to relatesTo. - * @throws IOException if any problem is encountered while reading result. - */ - public static void readRelationship( - TimelineEntity entity, Result result, ColumnPrefix prefix, - boolean isRelatedTo) throws IOException { - // isRelatedTo and relatesTo are of type Map> - Map columns = - prefix.readResults(result, StringKeyConverter.getInstance()); - for (Map.Entry column : columns.entrySet()) { - for (String id : Separator.VALUES.splitEncoded( - column.getValue().toString())) { - if (isRelatedTo) { - entity.addIsRelatedToEntity(column.getKey(), id); - } else { - entity.addRelatesToEntity(column.getKey(), id); - } - } - } - } - - /** - * Helper method for reading key-value pairs for either info or config. - * - * @param Describes the type of column prefix. - * @param entity entity to fill. - * @param result result from HBase. - * @param prefix column prefix. - * @param isConfig if true, means we are reading configs, otherwise info. - * @throws IOException if any problem is encountered while reading result. - */ - public static void readKeyValuePairs( - TimelineEntity entity, Result result, ColumnPrefix prefix, - boolean isConfig) throws IOException { - // info and configuration are of type Map - Map columns = - prefix.readResults(result, StringKeyConverter.getInstance()); - if (isConfig) { - for (Map.Entry column : columns.entrySet()) { - entity.addConfig(column.getKey(), column.getValue().toString()); - } - } else { - entity.addInfo(columns); - } - } - - /** * Read events from the entity table or the application table. The column name * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted * if there is no info associated with the event. @@ -715,7 +644,7 @@ public static String getAggregationCompactionDimension(List tags) { ColumnPrefix prefix) throws IOException { Map eventsMap = new HashMap<>(); Map eventsResult = - prefix.readResults(result, EventColumnNameConverter.getInstance()); + prefix.readResults(result, new EventColumnNameConverter()); for (Map.Entry eventResult : eventsResult.entrySet()) { EventColumnName eventColumnName = eventResult.getKey(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java index 775879a..93b4b36 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java @@ -49,8 +49,7 @@ /** * When the entity was created. */ - CREATED_TIME(EntityColumnFamily.INFO, "created_time", - LongConverter.getInstance()), + CREATED_TIME(EntityColumnFamily.INFO, "created_time", new LongConverter()), /** * The version of the flow that this entity belongs to. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 02a4bb3..e410549 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -67,8 +67,7 @@ /** * Metrics are stored with the metric name as the column name. */ - METRIC(EntityColumnFamily.METRICS, null, - LongConverter.getInstance()); + METRIC(EntityColumnFamily.METRICS, null, new LongConverter()); private final ColumnHelper column; private final ColumnFamily columnFamily; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java index 6d08390..11f0c95 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKey.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.entity; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; + /** * Represents a rowkey for the entity table. */ @@ -28,6 +30,8 @@ private final String appId; private final String entityType; private final String entityId; + private final KeyConverter entityRowKeyConverter = + new EntityRowKeyConverter(); public EntityRowKey(String clusterId, String userId, String flowName, Long flowRunId, String appId, String entityType, String entityId) { @@ -69,61 +73,14 @@ public String getEntityId() { } /** - * Constructs a row key prefix for the entity table as follows: - * {@code userName!clusterId!flowName!flowRunId!AppId}. - * - * @param clusterId Context cluster id. - * @param userId User name. - * @param flowName Flow name. - * @param flowRunId Run Id for the flow. - * @param appId Application Id. - * @return byte array with the row key prefix. - */ - public static byte[] getRowKeyPrefix(String clusterId, String userId, - String flowName, Long flowRunId, String appId) { - return EntityRowKeyConverter.getInstance().encode(new EntityRowKey( - clusterId, userId, flowName, flowRunId, appId, null, null)); - } - - /** - * Constructs a row key prefix for the entity table as follows: - * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!}. - * Typically used while querying multiple entities of a particular entity - * type. - * - * @param clusterId Context cluster id. - * @param userId User name. - * @param flowName Flow name. - * @param flowRunId Run Id for the flow. - * @param appId Application Id. - * @param entityType Entity type. - * @return byte array with the row key prefix. - */ - public static byte[] getRowKeyPrefix(String clusterId, String userId, - String flowName, Long flowRunId, String appId, String entityType) { - return EntityRowKeyConverter.getInstance().encode(new EntityRowKey( - clusterId, userId, flowName, flowRunId, appId, entityType, null)); - } - - /** * Constructs a row key for the entity table as follows: * {@code userName!clusterId!flowName!flowRunId!AppId!entityType!entityId}. * Typically used while querying a specific entity. * - * @param clusterId Context cluster id. - * @param userId User name. - * @param flowName Flow name. - * @param flowRunId Run Id for the flow. - * @param appId Application Id. - * @param entityType Entity type. - * @param entityId Entity Id. * @return byte array with the row key. */ - public static byte[] getRowKey(String clusterId, String userId, - String flowName, Long flowRunId, String appId, String entityType, - String entityId) { - return EntityRowKeyConverter.getInstance().encode(new EntityRowKey( - clusterId, userId, flowName, flowRunId, appId, entityType, entityId)); + public byte[] getRowKey() { + return entityRowKeyConverter.encode(this); } /** @@ -133,6 +90,6 @@ public String getEntityId() { * @return An EntityRowKey object. */ public static EntityRowKey parseRowKey(byte[] rowKey) { - return EntityRowKeyConverter.getInstance().decode(rowKey); + return new EntityRowKeyConverter().decode(rowKey); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java index 43c0569..5f23bd3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java @@ -32,14 +32,11 @@ * {@link AppIdKeyConverter} and rest are strings. */ public final class EntityRowKeyConverter implements KeyConverter { - private static final EntityRowKeyConverter INSTANCE = - new EntityRowKeyConverter(); - public static EntityRowKeyConverter getInstance() { - return INSTANCE; - } + private final static AppIdKeyConverter appIdKeyConverter = + new AppIdKeyConverter(); - private EntityRowKeyConverter() { + public EntityRowKeyConverter() { } // Entity row key is of the form @@ -90,7 +87,7 @@ private EntityRowKeyConverter() { // time. byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong( rowKey.getFlowRunId())); - byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId()); + byte[] third = appIdKeyConverter.encode(rowKey.getAppId()); if (rowKey.getEntityType() == null) { return Separator.QUALIFIERS.join( first, second, third, Separator.EMPTY_BYTES); @@ -132,7 +129,7 @@ public EntityRowKey decode(byte[] rowKey) { Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); Long flowRunId = TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); - String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]); + String appId = appIdKeyConverter.decode(rowKeyComponents[4]); String entityType = Separator.decode(Bytes.toString(rowKeyComponents[5]), Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); String entityId =Separator.decode(Bytes.toString(rowKeyComponents[6]), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index eea38a5..f660749 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; /** @@ -28,11 +29,36 @@ private final Long dayTs; private final String userId; private final String flowName; + private final KeyConverter flowActivityRowKeyConverter = + new FlowActivityRowKeyConverter(); + /** + * @param clusterId identifying the cluster + * @param dayTs to be converted to the top of the day timestamp + * @param userId identifying user + * @param flowName identifying the flow + */ public FlowActivityRowKey(String clusterId, Long dayTs, String userId, String flowName) { + this(clusterId, dayTs, userId, flowName, true); + } + + /** + * @param clusterId + * @param timestamp + * @param userId + * @param flowName + * @param convertDayTsToTopOfDay if true and timestamp isn't null, then + * timestamp will be converted to the top-of-the day timestamp + */ + protected FlowActivityRowKey(String clusterId, Long timestamp, String userId, + String flowName, boolean convertDayTsToTopOfDay) { this.clusterId = clusterId; - this.dayTs = dayTs; + if (convertDayTsToTopOfDay && (timestamp != null)) { + this.dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(timestamp); + } else { + this.dayTs = timestamp; + } this.userId = userId; this.flowName = flowName; } @@ -54,46 +80,13 @@ public String getFlowName() { } /** - * Constructs a row key prefix for the flow activity table as follows: - * {@code clusterId!}. - * - * @param clusterId Cluster Id. - * @return byte array with the row key prefix - */ - public static byte[] getRowKeyPrefix(String clusterId) { - return FlowActivityRowKeyConverter.getInstance().encode( - new FlowActivityRowKey(clusterId, null, null, null)); - } - - /** - * Constructs a row key prefix for the flow activity table as follows: - * {@code clusterId!dayTimestamp!}. - * - * @param clusterId Cluster Id. - * @param dayTs Start of the day timestamp. - * @return byte array with the row key prefix - */ - public static byte[] getRowKeyPrefix(String clusterId, long dayTs) { - return FlowActivityRowKeyConverter.getInstance().encode( - new FlowActivityRowKey(clusterId, dayTs, null, null)); - } - - /** * Constructs a row key for the flow activity table as follows: * {@code clusterId!dayTimestamp!user!flowName}. * - * @param clusterId Cluster Id. - * @param eventTs event's TimeStamp. - * @param userId User Id. - * @param flowName Flow Name. * @return byte array for the row key */ - public static byte[] getRowKey(String clusterId, long eventTs, String userId, - String flowName) { - // convert it to Day's time stamp - eventTs = TimelineStorageUtils.getTopOfTheDayTimestamp(eventTs); - return FlowActivityRowKeyConverter.getInstance().encode( - new FlowActivityRowKey(clusterId, eventTs, userId, flowName)); + public byte[] getRowKey() { + return flowActivityRowKeyConverter.encode(this); } /** @@ -103,6 +96,6 @@ public String getFlowName() { * @return A FlowActivityRowKey object. */ public static FlowActivityRowKey parseRowKey(byte[] rowKey) { - return FlowActivityRowKeyConverter.getInstance().decode(rowKey); + return new FlowActivityRowKeyConverter().decode(rowKey); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java index 9dc4c98..6952675 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKeyConverter.java @@ -30,14 +30,8 @@ */ public final class FlowActivityRowKeyConverter implements KeyConverter { - private static final FlowActivityRowKeyConverter INSTANCE = - new FlowActivityRowKeyConverter(); - public static FlowActivityRowKeyConverter getInstance() { - return INSTANCE; - } - - private FlowActivityRowKeyConverter() { + public FlowActivityRowKeyConverter() { } // Flow activity row key is of the form clusterId!dayTimestamp!user!flowName diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java index f1553b8..2e7a9d8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -25,10 +25,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; /** @@ -41,14 +41,14 @@ * application start times. */ MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time", - AggregationOperation.GLOBAL_MIN, LongConverter.getInstance()), + AggregationOperation.GLOBAL_MIN, new LongConverter()), /** * When the flow ended. This is the maximum of currently known application end * times. */ MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time", - AggregationOperation.GLOBAL_MAX, LongConverter.getInstance()), + AggregationOperation.GLOBAL_MAX, new LongConverter()), /** * The version of the flow that this flow belongs to. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index 0f14c89..e74282a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -41,7 +41,7 @@ /** * To store flow run info values. */ - METRIC(FlowRunColumnFamily.INFO, "m", null, LongConverter.getInstance()); + METRIC(FlowRunColumnFamily.INFO, "m", null, new LongConverter()); private final ColumnHelper column; private final ColumnFamily columnFamily; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java index 925242b..fb68815 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java @@ -25,6 +25,8 @@ private final String userId; private final String flowName; private final Long flowRunId; + private final FlowRunRowKeyConverter flowRunRowKeyConverter = + new FlowRunRowKeyConverter(); public FlowRunRowKey(String clusterId, String userId, String flowName, Long flowRunId) { @@ -51,21 +53,6 @@ public Long getFlowRunId() { } /** - * Constructs a row key prefix for the flow run table as follows: { - * clusterId!userI!flowName!}. - * - * @param clusterId Cluster Id. - * @param userId User Id. - * @param flowName Flow Name. - * @return byte array with the row key prefix - */ - public static byte[] getRowKeyPrefix(String clusterId, String userId, - String flowName) { - return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey( - clusterId, userId, flowName, null)); - } - - /** * Constructs a row key for the entity table as follows: { * clusterId!userId!flowName!Inverted Flow Run Id}. * @@ -75,12 +62,11 @@ public Long getFlowRunId() { * @param flowRunId Run Id for the flow name. * @return byte array with the row key */ - public static byte[] getRowKey(String clusterId, String userId, - String flowName, Long flowRunId) { - return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey( - clusterId, userId, flowName, flowRunId)); + public byte[] getRowKey() { + return flowRunRowKeyConverter.encode(this); } + /** * Given the raw row key as bytes, returns the row key as an object. * @@ -88,7 +74,7 @@ public Long getFlowRunId() { * @return A FlowRunRowKey object. */ public static FlowRunRowKey parseRowKey(byte[] rowKey) { - return FlowRunRowKeyConverter.getInstance().decode(rowKey); + return new FlowRunRowKeyConverter().decode(rowKey); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java index 642f065..1b9126b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java @@ -30,14 +30,8 @@ */ public final class FlowRunRowKeyConverter implements KeyConverter { - private static final FlowRunRowKeyConverter INSTANCE = - new FlowRunRowKeyConverter(); - public static FlowRunRowKeyConverter getInstance() { - return INSTANCE; - } - - private FlowRunRowKeyConverter() { + public FlowRunRowKeyConverter() { } // Flow run row key is of the form diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index 8e806bc..b56176c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -46,7 +46,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.RowKeyPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; @@ -150,13 +153,13 @@ private FilterList createFilterListForColsOfInfoFamily() EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // If INFO field has to be retrieved, add a filter for fetching columns // with INFO column prefix. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { + if (hasField(fieldsToRetrieve, Field.INFO)) { infoFamilyColsFilter.addFilter( TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.EQUAL, ApplicationColumnPrefix.INFO)); } TimelineFilterList relatesTo = getFilters().getRelatesTo(); - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { + if (hasField(fieldsToRetrieve, Field.RELATES_TO)) { // If RELATES_TO field has to be retrieved, add a filter for fetching // columns with RELATES_TO column prefix. infoFamilyColsFilter.addFilter( @@ -169,12 +172,11 @@ private FilterList createFilterListForColsOfInfoFamily() // matched after fetching rows from HBase. Set relatesToCols = TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - ApplicationColumnPrefix.RELATES_TO, relatesToCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.RELATES_TO, relatesToCols)); } TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { // If IS_RELATED_TO field has to be retrieved, add a filter for fetching // columns with IS_RELATED_TO column prefix. infoFamilyColsFilter.addFilter( @@ -187,12 +189,11 @@ private FilterList createFilterListForColsOfInfoFamily() // matched after fetching rows from HBase. Set isRelatedToCols = TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols)); } TimelineFilterList eventFilters = getFilters().getEventFilters(); - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { + if (hasField(fieldsToRetrieve, Field.EVENTS)) { // If EVENTS field has to be retrieved, add a filter for fetching columns // with EVENT column prefix. infoFamilyColsFilter.addFilter( @@ -205,9 +206,8 @@ private FilterList createFilterListForColsOfInfoFamily() // fetching rows from HBase. Set eventCols = TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - ApplicationColumnPrefix.EVENT, eventCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.EVENT, eventCols)); } return infoFamilyColsFilter; } @@ -222,25 +222,25 @@ private FilterList createFilterListForColsOfInfoFamily() private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // Events not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { infoColFamilyList.addFilter( TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.NOT_EQUAL, ApplicationColumnPrefix.EVENT)); } // info not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { + if (!hasField(fieldsToRetrieve, Field.INFO)) { infoColFamilyList.addFilter( TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.NOT_EQUAL, ApplicationColumnPrefix.INFO)); } // is related to not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { infoColFamilyList.addFilter( TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.NOT_EQUAL, ApplicationColumnPrefix.IS_RELATED_TO)); } // relates to not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { infoColFamilyList.addFilter( TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.NOT_EQUAL, ApplicationColumnPrefix.RELATES_TO)); @@ -308,9 +308,10 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { protected Result getResult(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { TimelineReaderContext context = getContext(); - byte[] rowKey = - ApplicationRowKey.getRowKey(context.getClusterId(), context.getUserId(), + ApplicationRowKey applicationRowKey = + new ApplicationRowKey(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowRunId(), context.getAppId()); + byte[] rowKey = applicationRowKey.getRowKey(); Get get = new Get(rowKey); get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -345,10 +346,13 @@ protected void augmentParams(Configuration hbaseConf, Connection conn) TimelineReaderContext context = getContext(); if (isSingleEntityRead()) { // Get flow context information from AppToFlow table. - if (context.getFlowName() == null || context.getFlowRunId() == null || - context.getUserId() == null) { - FlowContext flowContext = lookupFlowContext( - context.getClusterId(), context.getAppId(), hbaseConf, conn); + if (context.getFlowName() == null || context.getFlowRunId() == null + || context.getUserId() == null) { + AppToFlowRowKey appToFlowRowKey = + new AppToFlowRowKey(context.getClusterId(), context.getAppId()); + FlowContext flowContext = + lookupFlowContext(appToFlowRowKey, + hbaseConf, conn); context.setFlowName(flowContext.getFlowName()); context.setFlowRunId(flowContext.getFlowRunId()); context.setUserId(flowContext.getUserId()); @@ -367,15 +371,12 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); TimelineReaderContext context = getContext(); - if (context.getFlowRunId() != null) { - scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId())); - } else { - scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName())); - } + // Whether or not flowRunID is null doesn't matter, the + // ApplicationRowKeyPrefix will do the right thing. + RowKeyPrefix applicationRowKeyPrefix = + new ApplicationRowKeyPrefix(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId()); + scan.setRowPrefixFilter(applicationRowKeyPrefix.getRowKeyPrefix()); FilterList newList = new FilterList(); newList.addFilter(new PageFilter(getFilters().getLimit())); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -409,15 +410,14 @@ protected TimelineEntity parseEntity(Result result) throws IOException { boolean checkIsRelatedTo = !isSingleEntityRead() && filters.getIsRelatedTo() != null && filters.getIsRelatedTo().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || - checkIsRelatedTo) { - TimelineStorageUtils.readRelationship( - entity, result, ApplicationColumnPrefix.IS_RELATED_TO, true); + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, + true); if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity, filters.getIsRelatedTo())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, + if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { entity.getIsRelatedToEntities().clear(); } @@ -430,29 +430,27 @@ protected TimelineEntity parseEntity(Result result) throws IOException { boolean checkRelatesTo = !isSingleEntityRead() && filters.getRelatesTo() != null && filters.getRelatesTo().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) || + if (hasField(fieldsToRetrieve, Field.RELATES_TO) || checkRelatesTo) { - TimelineStorageUtils.readRelationship( - entity, result, ApplicationColumnPrefix.RELATES_TO, false); + readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, + false); if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity, filters.getRelatesTo())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { entity.getRelatesToEntities().clear(); } } // fetch info if fieldsToRetrieve contains INFO or ALL. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - TimelineStorageUtils.readKeyValuePairs( - entity, result, ApplicationColumnPrefix.INFO, false); + if (hasField(fieldsToRetrieve, Field.INFO)) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); } // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) { - TimelineStorageUtils.readKeyValuePairs( - entity, result, ApplicationColumnPrefix.CONFIG, true); + if (hasField(fieldsToRetrieve, Field.CONFIGS)) { + readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); } // fetch events and match event filters if they exist. If event filters do @@ -462,7 +460,7 @@ protected TimelineEntity parseEntity(Result result) throws IOException { boolean checkEvents = !isSingleEntityRead() && filters.getEventFilters() != null && filters.getEventFilters().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) || + if (hasField(fieldsToRetrieve, Field.EVENTS) || checkEvents) { TimelineStorageUtils.readEvents( entity, result, ApplicationColumnPrefix.EVENT); @@ -470,13 +468,13 @@ protected TimelineEntity parseEntity(Result result) throws IOException { filters.getEventFilters())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { entity.getEvents().clear(); } } // fetch metrics if fieldsToRetrieve contains METRICS or ALL. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) { + if (hasField(fieldsToRetrieve, Field.METRICS)) { readMetrics(entity, result, ApplicationColumnPrefix.METRIC); } return entity; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index faecd14..f2f83d1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -35,9 +35,9 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; import com.google.common.base.Preconditions; @@ -105,15 +105,14 @@ protected ResultScanner getResults(Configuration hbaseConf, if (getFilters().getCreatedTimeBegin() == 0L && getFilters().getCreatedTimeEnd() == Long.MAX_VALUE) { // All records have to be chosen. - scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId)); + scan.setRowPrefixFilter(new FlowActivityRowKeyPrefix(clusterId) + .getRowKeyPrefix()); } else { - scan.setStartRow( - FlowActivityRowKey.getRowKeyPrefix(clusterId, - getFilters().getCreatedTimeEnd())); - scan.setStopRow( - FlowActivityRowKey.getRowKeyPrefix(clusterId, - (getFilters().getCreatedTimeBegin() <= 0 ? 0 : - (getFilters().getCreatedTimeBegin() - 1)))); + scan.setStartRow(new FlowActivityRowKeyPrefix(clusterId, getFilters() + .getCreatedTimeEnd()).getRowKeyPrefix()); + scan.setStopRow(new FlowActivityRowKeyPrefix(clusterId, (getFilters() + .getCreatedTimeBegin() <= 0 ? 0 + : (getFilters().getCreatedTimeBegin() - 1))).getRowKeyPrefix()); } // use the page filter to limit the result to the page size // the scanner may still return more than the limit; therefore we need to @@ -137,8 +136,7 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // get the list of run ids along with the version that are associated with // this flow on this day Map runIdsMap = - FlowActivityColumnPrefix.RUN_ID.readResults(result, - LongKeyConverter.getInstance()); + FlowActivityColumnPrefix.RUN_ID.readResults(result, longKeyConverter); for (Map.Entry e : runIdsMap.entrySet()) { Long runId = e.getKey(); String version = (String)e.getValue(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index e1695ef..e100d53 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -28,12 +28,12 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FamilyFilter; import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; @@ -42,12 +42,13 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.RowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; import org.apache.hadoop.yarn.webapp.BadRequestException; @@ -81,8 +82,8 @@ public FlowRunEntityReader(TimelineReaderContext ctxt, @Override protected void validateParams() { Preconditions.checkNotNull(getContext(), "context shouldn't be null"); - Preconditions.checkNotNull( - getDataToRetrieve(), "data to retrieve shouldn't be null"); + Preconditions.checkNotNull(getDataToRetrieve(), + "data to retrieve shouldn't be null"); Preconditions.checkNotNull(getContext().getClusterId(), "clusterId shouldn't be null"); Preconditions.checkNotNull(getContext().getUserId(), @@ -97,8 +98,8 @@ protected void validateParams() { if (!isSingleEntityRead() && fieldsToRetrieve != null) { for (Field field : fieldsToRetrieve) { if (field != Field.ALL && field != Field.METRICS) { - throw new BadRequestException("Invalid field " + field + - " specified while querying flow runs."); + throw new BadRequestException("Invalid field " + field + + " specified while querying flow runs."); } } } @@ -119,23 +120,22 @@ protected FilterList constructFilterListBasedOnFilters() throws IOException { Long createdTimeBegin = getFilters().getCreatedTimeBegin(); Long createdTimeEnd = getFilters().getCreatedTimeEnd(); if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createSingleColValueFiltersByRange( - FlowRunColumn.MIN_START_TIME, createdTimeBegin, createdTimeEnd)); + listBasedOnFilters.addFilter(TimelineFilterUtils + .createSingleColValueFiltersByRange(FlowRunColumn.MIN_START_TIME, + createdTimeBegin, createdTimeEnd)); } // Filter based on metric filters. TimelineFilterList metricFilters = getFilters().getMetricFilters(); if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - FlowRunColumnPrefix.METRIC, metricFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricFilters)); } return listBasedOnFilters; } /** - * Add {@link QualifierFilter} filters to filter list for each column of - * flow run table. + * Add {@link QualifierFilter} filters to filter list for each column of flow + * run table. * * @return filter list to which qualifier filters have been added. */ @@ -153,20 +153,19 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { FilterList list = new FilterList(Operator.MUST_PASS_ONE); // By default fetch everything in INFO column family. FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(FlowRunColumnFamily.INFO.getBytes())); + new FamilyFilter(CompareOp.EQUAL, new BinaryComparator( + FlowRunColumnFamily.INFO.getBytes())); TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve(); // If multiple entities have to be retrieved, check if metrics have to be // retrieved and if not, add a filter so that metrics can be excluded. // Metrics are always returned if we are reading a single entity. - if (!isSingleEntityRead() && !TimelineStorageUtils.hasField( - dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) { + if (!isSingleEntityRead() + && !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) { FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE); infoColFamilyList.addFilter(infoColumnFamily); - infoColFamilyList.addFilter( - new QualifierFilter(CompareOp.NOT_EQUAL, - new BinaryPrefixComparator( - FlowRunColumnPrefix.METRIC.getColumnPrefixBytes("")))); + infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL, + new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC + .getColumnPrefixBytes("")))); list.addFilter(infoColFamilyList); } else { // Check if metricsToRetrieve are specified and if they are, create a @@ -176,14 +175,13 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { // (in augmentParams()). TimelineFilterList metricsToRetrieve = dataToRetrieve.getMetricsToRetrieve(); - if (metricsToRetrieve != null && - !metricsToRetrieve.getFilterList().isEmpty()) { + if (metricsToRetrieve != null + && !metricsToRetrieve.getFilterList().isEmpty()) { FilterList infoColFamilyList = new FilterList(); infoColFamilyList.addFilter(infoColumnFamily); FilterList columnsList = updateFixedColumns(); - columnsList.addFilter( - TimelineFilterUtils.createHBaseFilterList( - FlowRunColumnPrefix.METRIC, metricsToRetrieve)); + columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList( + FlowRunColumnPrefix.METRIC, metricsToRetrieve)); infoColFamilyList.addFilter(columnsList); list.addFilter(infoColFamilyList); } @@ -195,9 +193,10 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { protected Result getResult(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { TimelineReaderContext context = getContext(); - byte[] rowKey = - FlowRunRowKey.getRowKey(context.getClusterId(), context.getUserId(), + FlowRunRowKey flowRunRowKey = + new FlowRunRowKey(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowRunId()); + byte[] rowKey = flowRunRowKey.getRowKey(); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -207,13 +206,14 @@ protected Result getResult(Configuration hbaseConf, Connection conn, } @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { + protected ResultScanner getResults(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { Scan scan = new Scan(); TimelineReaderContext context = getContext(); - scan.setRowPrefixFilter( - FlowRunRowKey.getRowKeyPrefix(context.getClusterId(), - context.getUserId(), context.getFlowName())); + RowKeyPrefix flowRunRowKeyPrefix = + new FlowRunRowKeyPrefix(context.getClusterId(), context.getUserId(), + context.getFlowName()); + scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix()); FilterList newList = new FilterList(); newList.addFilter(new PageFilter(getFilters().getLimit())); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -238,27 +238,27 @@ protected TimelineEntity parseEntity(Result result) throws IOException { } // read the start time - Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result); + Long startTime = (Long) FlowRunColumn.MIN_START_TIME.readResult(result); if (startTime != null) { flowRun.setStartTime(startTime.longValue()); } // read the end time if available - Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result); + Long endTime = (Long) FlowRunColumn.MAX_END_TIME.readResult(result); if (endTime != null) { flowRun.setMaxEndTime(endTime.longValue()); } // read the flow version - String version = (String)FlowRunColumn.FLOW_VERSION.readResult(result); + String version = (String) FlowRunColumn.FLOW_VERSION.readResult(result); if (version != null) { flowRun.setVersion(version); } // read metrics if its a single entity query or if METRICS are part of // fieldsToRetrieve. - if (isSingleEntityRead() || TimelineStorageUtils.hasField( - getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) { + if (isSingleEntityRead() + || hasField(getDataToRetrieve().getFieldsToRetrieve(), Field.METRICS)) { readMetrics(flowRun, result, FlowRunColumnPrefix.METRIC); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 22583b5..d4df5c2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.EnumSet; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -28,11 +29,12 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FamilyFilter; import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; @@ -40,15 +42,21 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.RowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.apache.hadoop.yarn.webapp.NotFoundException; @@ -95,32 +103,29 @@ protected FilterList constructFilterListBasedOnFilters() throws IOException { long createdTimeBegin = filters.getCreatedTimeBegin(); long createdTimeEnd = filters.getCreatedTimeEnd(); if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createSingleColValueFiltersByRange( - EntityColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd)); + listBasedOnFilters.addFilter(TimelineFilterUtils + .createSingleColValueFiltersByRange(EntityColumn.CREATED_TIME, + createdTimeBegin, createdTimeEnd)); } // Create filter list based on metric filters and add it to // listBasedOnFilters. TimelineFilterList metricFilters = filters.getMetricFilters(); if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.METRIC, metricFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.METRIC, metricFilters)); } // Create filter list based on config filters and add it to // listBasedOnFilters. TimelineFilterList configFilters = filters.getConfigFilters(); if (configFilters != null && !configFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.CONFIG, configFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.CONFIG, configFilters)); } // Create filter list based on info filters and add it to listBasedOnFilters TimelineFilterList infoFilters = filters.getInfoFilters(); if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.INFO, infoFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.INFO, infoFilters)); } return listBasedOnFilters; } @@ -130,10 +135,10 @@ protected FilterList constructFilterListBasedOnFilters() throws IOException { * * @return true if we need to fetch some of the columns, false otherwise. */ - private static boolean fetchPartialEventCols(TimelineFilterList eventFilters, + private boolean fetchPartialEventCols(TimelineFilterList eventFilters, EnumSet fieldsToRetrieve) { - return (eventFilters != null && !eventFilters.getFilterList().isEmpty() && - !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)); + return (eventFilters != null && !eventFilters.getFilterList().isEmpty() && !hasField( + fieldsToRetrieve, Field.EVENTS)); } /** @@ -141,10 +146,10 @@ private static boolean fetchPartialEventCols(TimelineFilterList eventFilters, * * @return true if we need to fetch some of the columns, false otherwise. */ - private static boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo, + private boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo, EnumSet fieldsToRetrieve) { - return (relatesTo != null && !relatesTo.getFilterList().isEmpty() && - !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)); + return (relatesTo != null && !relatesTo.getFilterList().isEmpty() && !hasField( + fieldsToRetrieve, Field.RELATES_TO)); } /** @@ -152,10 +157,10 @@ private static boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo, * * @return true if we need to fetch some of the columns, false otherwise. */ - private static boolean fetchPartialIsRelatedToCols( - TimelineFilterList isRelatedTo, EnumSet fieldsToRetrieve) { - return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() && - !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)); + private boolean fetchPartialIsRelatedToCols(TimelineFilterList isRelatedTo, + EnumSet fieldsToRetrieve) { + return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() && !hasField( + fieldsToRetrieve, Field.IS_RELATED_TO)); } /** @@ -163,19 +168,20 @@ private static boolean fetchPartialIsRelatedToCols( * relatesto and isrelatedto from info family. * * @return true, if we need to fetch only some of the columns, false if we - * need to fetch all the columns under info column family. + * need to fetch all the columns under info column family. */ protected boolean fetchPartialColsFromInfoFamily() { EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); TimelineEntityFilters filters = getFilters(); - return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) || - fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) || - fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), fieldsToRetrieve); + return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) + || fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) + || fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), + fieldsToRetrieve); } /** - * Check if we need to create filter list based on fields. We need to create - * a filter list iff all fields need not be retrieved or we have some specific + * Check if we need to create filter list based on fields. We need to create a + * filter list iff all fields need not be retrieved or we have some specific * fields or metrics to retrieve. We also need to create a filter list if we * have relationships(relatesTo/isRelatedTo) and event filters specified for * the query. @@ -188,22 +194,24 @@ protected boolean needCreateFilterListBasedOnFields() { // be retrieved, also check if we have some metrics or configs to // retrieve specified for the query because then a filter list will have // to be created. - boolean flag = !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) || - (dataToRetrieve.getConfsToRetrieve() != null && - !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) || - (dataToRetrieve.getMetricsToRetrieve() != null && - !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()); + boolean flag = + !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) + || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve + .getConfsToRetrieve().getFilterList().isEmpty()) + || (dataToRetrieve.getMetricsToRetrieve() != null && !dataToRetrieve + .getMetricsToRetrieve().getFilterList().isEmpty()); // Filters need to be checked only if we are reading multiple entities. If // condition above is false, we check if there are relationships(relatesTo/ // isRelatedTo) and event filters specified for the query. if (!flag && !isSingleEntityRead()) { TimelineEntityFilters filters = getFilters(); - flag = (filters.getEventFilters() != null && - !filters.getEventFilters().getFilterList().isEmpty()) || - (filters.getIsRelatedTo() != null && - !filters.getIsRelatedTo().getFilterList().isEmpty()) || - (filters.getRelatesTo() != null && - !filters.getRelatesTo().getFilterList().isEmpty()); + flag = + (filters.getEventFilters() != null && !filters.getEventFilters() + .getFilterList().isEmpty()) + || (filters.getIsRelatedTo() != null && !filters.getIsRelatedTo() + .getFilterList().isEmpty()) + || (filters.getRelatesTo() != null && !filters.getRelatesTo() + .getFilterList().isEmpty()); } return flag; } @@ -216,8 +224,8 @@ protected boolean needCreateFilterListBasedOnFields() { */ protected void updateFixedColumns(FilterList list) { for (EntityColumn column : EntityColumn.values()) { - list.addFilter(new QualifierFilter(CompareOp.EQUAL, - new BinaryComparator(column.getColumnQualifierBytes()))); + list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator( + column.getColumnQualifierBytes()))); } } @@ -226,30 +234,29 @@ protected void updateFixedColumns(FilterList list) { * qualifiers in the info column family will be returned in result. * * @param isApplication If true, it means operations are to be performed for - * application table, otherwise for entity table. + * application table, otherwise for entity table. * @return filter list. * @throws IOException if any problem occurs while creating filter list. */ - private FilterList createFilterListForColsOfInfoFamily() - throws IOException { + private FilterList createFilterListForColsOfInfoFamily() throws IOException { FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE); // Add filters for each column in entity table. updateFixedColumns(infoFamilyColsFilter); EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // If INFO field has to be retrieved, add a filter for fetching columns // with INFO column prefix. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( + if (hasField(fieldsToRetrieve, Field.INFO)) { + infoFamilyColsFilter + .addFilter(TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.EQUAL, EntityColumnPrefix.INFO)); } TimelineFilterList relatesTo = getFilters().getRelatesTo(); - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { + if (hasField(fieldsToRetrieve, Field.RELATES_TO)) { // If RELATES_TO field has to be retrieved, add a filter for fetching // columns with RELATES_TO column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, EntityColumnPrefix.RELATES_TO)); + infoFamilyColsFilter.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.EQUAL, + EntityColumnPrefix.RELATES_TO)); } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) { // Even if fields to retrieve does not contain RELATES_TO, we still // need to have a filter to fetch some of the column qualifiers if @@ -257,17 +264,16 @@ private FilterList createFilterListForColsOfInfoFamily() // matched after fetching rows from HBase. Set relatesToCols = TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - EntityColumnPrefix.RELATES_TO, relatesToCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + EntityColumnPrefix.RELATES_TO, relatesToCols)); } TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { // If IS_RELATED_TO field has to be retrieved, add a filter for fetching // columns with IS_RELATED_TO column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, EntityColumnPrefix.IS_RELATED_TO)); + infoFamilyColsFilter.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.EQUAL, + EntityColumnPrefix.IS_RELATED_TO)); } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) { // Even if fields to retrieve does not contain IS_RELATED_TO, we still // need to have a filter to fetch some of the column qualifiers if @@ -275,27 +281,25 @@ private FilterList createFilterListForColsOfInfoFamily() // matched after fetching rows from HBase. Set isRelatedToCols = TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols)); } TimelineFilterList eventFilters = getFilters().getEventFilters(); - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { + if (hasField(fieldsToRetrieve, Field.EVENTS)) { // If EVENTS field has to be retrieved, add a filter for fetching columns // with EVENT column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( + infoFamilyColsFilter + .addFilter(TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.EQUAL, EntityColumnPrefix.EVENT)); - } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){ + } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()) { // Even if fields to retrieve does not contain EVENTS, we still need to // have a filter to fetch some of the column qualifiers on the basis of // event filters specified. Event filters will then be matched after // fetching rows from HBase. Set eventCols = TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - EntityColumnPrefix.EVENT, eventCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + EntityColumnPrefix.EVENT, eventCols)); } return infoFamilyColsFilter; } @@ -310,28 +314,28 @@ private FilterList createFilterListForColsOfInfoFamily() private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // Events not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, EntityColumnPrefix.EVENT)); + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.EVENT)); } // info not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, EntityColumnPrefix.INFO)); + if (!hasField(fieldsToRetrieve, Field.INFO)) { + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.INFO)); } // is related to not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, EntityColumnPrefix.IS_RELATED_TO)); + if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.IS_RELATED_TO)); } // relates to not required. - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, EntityColumnPrefix.RELATES_TO)); + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.RELATES_TO)); } } @@ -348,18 +352,18 @@ private void updateFilterForConfsAndMetricsToRetrieve( // CONFS to fields to retrieve in augmentParams() even if not specified. if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) { // Create a filter list for configs. - listBasedOnFields.addFilter(TimelineFilterUtils. - createFilterForConfsOrMetricsToRetrieve( - dataToRetrieve.getConfsToRetrieve(), - EntityColumnFamily.CONFIGS, EntityColumnPrefix.CONFIG)); + listBasedOnFields.addFilter(TimelineFilterUtils + .createFilterForConfsOrMetricsToRetrieve( + dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS, + EntityColumnPrefix.CONFIG)); } // Please note that if metricsToRetrieve is specified, we would have added // METRICS to fields to retrieve in augmentParams() even if not specified. if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) { // Create a filter list for metrics. - listBasedOnFields.addFilter(TimelineFilterUtils. - createFilterForConfsOrMetricsToRetrieve( + listBasedOnFields.addFilter(TimelineFilterUtils + .createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getMetricsToRetrieve(), EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC)); } @@ -375,8 +379,8 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { FilterList infoColFamilyList = new FilterList(); // By default fetch everything in INFO column family. FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(EntityColumnFamily.INFO.getBytes())); + new FamilyFilter(CompareOp.EQUAL, new BinaryComparator( + EntityColumnFamily.INFO.getBytes())); infoColFamilyList.addFilter(infoColumnFamily); if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) { // We can fetch only some of the columns from info family. @@ -394,27 +398,26 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { /** * Looks up flow context from AppToFlow table. * - * @param clusterId Cluster Id. - * @param appId App Id. + * @param appToFlowRowKey to identify Cluster and App Ids. * @param hbaseConf HBase configuration. * @param conn HBase Connection. * @return flow context information. * @throws IOException if any problem occurs while fetching flow information. */ - protected FlowContext lookupFlowContext(String clusterId, String appId, + protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey, Configuration hbaseConf, Connection conn) throws IOException { - byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); + byte[] rowKey = appToFlowRowKey.getRowKey(); Get get = new Get(rowKey); Result result = appToFlowTable.getResult(hbaseConf, conn, get); if (result != null && !result.isEmpty()) { - return new FlowContext( - AppToFlowColumn.USER_ID.readResult(result).toString(), - AppToFlowColumn.FLOW_ID.readResult(result).toString(), - ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); + return new FlowContext(AppToFlowColumn.USER_ID.readResult(result) + .toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(), + ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); } else { throw new NotFoundException( - "Unable to find the context flow ID and flow run ID for clusterId=" + - clusterId + ", appId=" + appId); + "Unable to find the context flow ID and flow run ID for clusterId=" + + appToFlowRowKey.getClusterId() + ", appId=" + + appToFlowRowKey.getAppId()); } } @@ -425,17 +428,21 @@ protected FlowContext lookupFlowContext(String clusterId, String appId, private final String userId; private final String flowName; private final Long flowRunId; + public FlowContext(String user, String flowName, Long flowRunId) { this.userId = user; this.flowName = flowName; this.flowRunId = flowRunId; } + protected String getUserId() { return userId; } + protected String getFlowName() { return flowName; } + protected Long getFlowRunId() { return flowRunId; } @@ -444,8 +451,8 @@ protected Long getFlowRunId() { @Override protected void validateParams() { Preconditions.checkNotNull(getContext(), "context shouldn't be null"); - Preconditions.checkNotNull( - getDataToRetrieve(), "data to retrieve shouldn't be null"); + Preconditions.checkNotNull(getDataToRetrieve(), + "data to retrieve shouldn't be null"); Preconditions.checkNotNull(getContext().getClusterId(), "clusterId shouldn't be null"); Preconditions.checkNotNull(getContext().getAppId(), @@ -463,11 +470,13 @@ protected void augmentParams(Configuration hbaseConf, Connection conn) throws IOException { TimelineReaderContext context = getContext(); // In reality all three should be null or neither should be null - if (context.getFlowName() == null || context.getFlowRunId() == null || - context.getUserId() == null) { + if (context.getFlowName() == null || context.getFlowRunId() == null + || context.getUserId() == null) { // Get flow context information from AppToFlow table. - FlowContext flowContext = lookupFlowContext( - context.getClusterId(), context.getAppId(), hbaseConf, conn); + AppToFlowRowKey appToFlowRowKey = + new AppToFlowRowKey(context.getClusterId(), context.getAppId()); + FlowContext flowContext = + lookupFlowContext(appToFlowRowKey, hbaseConf, conn); context.setFlowName(flowContext.flowName); context.setFlowRunId(flowContext.flowRunId); context.setUserId(flowContext.userId); @@ -485,9 +494,9 @@ protected Result getResult(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { TimelineReaderContext context = getContext(); byte[] rowKey = - EntityRowKey.getRowKey(context.getClusterId(), context.getUserId(), + new EntityRowKey(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowRunId(), context.getAppId(), - context.getEntityType(), context.getEntityId()); + context.getEntityType(), context.getEntityId()).getRowKey(); Get get = new Get(rowKey); get.setMaxVersions(getDataToRetrieve().getMetricsLimit()); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -497,15 +506,15 @@ protected Result getResult(Configuration hbaseConf, Connection conn, } @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { + protected ResultScanner getResults(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { // Scan through part of the table to find the entities belong to one app // and one type Scan scan = new Scan(); TimelineReaderContext context = getContext(); - scan.setRowPrefixFilter(EntityRowKey.getRowKeyPrefix( - context.getClusterId(), context.getUserId(), context.getFlowName(), - context.getFlowRunId(), context.getAppId(), context.getEntityType())); + RowKeyPrefix entityRowKeyPrefix = new EntityRowKeyPrefix(context.getClusterId(), context.getUserId(), context.getFlowName(), + context.getFlowRunId(), context.getAppId(), context.getEntityType()); + scan.setRowPrefixFilter(entityRowKeyPrefix.getRowKeyPrefix()); scan.setMaxVersions(getDataToRetrieve().getMetricsLimit()); if (filterList != null && !filterList.getFilters().isEmpty()) { scan.setFilter(filterList); @@ -535,18 +544,16 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // locally as relevant HBase filters to filter out rows on the basis of // isRelatedTo are not set in HBase scan. boolean checkIsRelatedTo = - !isSingleEntityRead() && filters.getIsRelatedTo() != null && - filters.getIsRelatedTo().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || - checkIsRelatedTo) { - TimelineStorageUtils.readRelationship( - entity, result, EntityColumnPrefix.IS_RELATED_TO, true); - if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity, - filters.getIsRelatedTo())) { + !isSingleEntityRead() && filters.getIsRelatedTo() != null + && filters.getIsRelatedTo().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || checkIsRelatedTo) { + readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); + if (checkIsRelatedTo + && !TimelineStorageUtils.matchIsRelatedTo(entity, + filters.getIsRelatedTo())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, - Field.IS_RELATED_TO)) { + if (!hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { entity.getIsRelatedToEntities().clear(); } } @@ -556,31 +563,29 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // locally as relevant HBase filters to filter out rows on the basis of // relatesTo are not set in HBase scan. boolean checkRelatesTo = - !isSingleEntityRead() && filters.getRelatesTo() != null && - filters.getRelatesTo().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) || - checkRelatesTo) { - TimelineStorageUtils.readRelationship( - entity, result, EntityColumnPrefix.RELATES_TO, false); - if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity, - filters.getRelatesTo())) { + !isSingleEntityRead() && filters.getRelatesTo() != null + && filters.getRelatesTo().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.RELATES_TO) + || checkRelatesTo) { + readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); + if (checkRelatesTo + && !TimelineStorageUtils.matchRelatesTo(entity, + filters.getRelatesTo())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { + if (!hasField(fieldsToRetrieve, Field.RELATES_TO)) { entity.getRelatesToEntities().clear(); } } // fetch info if fieldsToRetrieve contains INFO or ALL. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - TimelineStorageUtils.readKeyValuePairs( - entity, result, EntityColumnPrefix.INFO, false); + if (hasField(fieldsToRetrieve, Field.INFO)) { + readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); } // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) { - TimelineStorageUtils.readKeyValuePairs( - entity, result, EntityColumnPrefix.CONFIG, true); + if (hasField(fieldsToRetrieve, Field.CONFIGS)) { + readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); } // fetch events and match event filters if they exist. If event filters do @@ -588,24 +593,109 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // as relevant HBase filters to filter out rows on the basis of events // are not set in HBase scan. boolean checkEvents = - !isSingleEntityRead() && filters.getEventFilters() != null && - filters.getEventFilters().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) || - checkEvents) { + !isSingleEntityRead() && filters.getEventFilters() != null + && filters.getEventFilters().getFilterList().size() > 0; + if (hasField(fieldsToRetrieve, Field.EVENTS) + || checkEvents) { TimelineStorageUtils.readEvents(entity, result, EntityColumnPrefix.EVENT); - if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity, - filters.getEventFilters())) { + if (checkEvents + && !TimelineStorageUtils.matchEventFilters(entity, + filters.getEventFilters())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { + if (!hasField(fieldsToRetrieve, Field.EVENTS)) { entity.getEvents().clear(); } } // fetch metrics if fieldsToRetrieve contains METRICS or ALL. - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.METRICS)) { + if (hasField(fieldsToRetrieve, Field.METRICS)) { readMetrics(entity, result, EntityColumnPrefix.METRIC); } return entity; } + + /** + * Helper method for reading key-value pairs for either info or config. + * + * @param Describes the type of column prefix. + * @param entity entity to fill. + * @param result result from HBase. + * @param prefix column prefix. + * @param isConfig if true, means we are reading configs, otherwise info. + * @throws IOException if any problem is encountered while reading result. + */ + protected void readKeyValuePairs(TimelineEntity entity, Result result, + ColumnPrefix prefix, boolean isConfig) throws IOException { + // info and configuration are of type Map + Map columns = + prefix.readResults(result, stringKeyConverter); + if (isConfig) { + for (Map.Entry column : columns.entrySet()) { + entity.addConfig(column.getKey(), column.getValue().toString()); + } + } else { + entity.addInfo(columns); + } + } + + /** + * Helper method for reading relationship. + * + * @param Describes the type of column prefix. + * @param entity entity to fill. + * @param result result from HBase. + * @param prefix column prefix. + * @param isRelatedTo if true, means relationship is to be added to + * isRelatedTo, otherwise its added to relatesTo. + * @throws IOException if any problem is encountered while reading result. + */ + protected void readRelationship(TimelineEntity entity, Result result, + ColumnPrefix prefix, boolean isRelatedTo) throws IOException { + // isRelatedTo and relatesTo are of type Map> + Map columns = + prefix.readResults(result, stringKeyConverter); + for (Map.Entry column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded(column.getValue() + .toString())) { + if (isRelatedTo) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + public byte[] createColQualifierPrefix(ColumnPrefix colPrefix, + String column) { + if (colPrefix == ApplicationColumnPrefix.EVENT + || colPrefix == EntityColumnPrefix.EVENT) { + return new EventColumnName(column, null, null).getColumnQualifier(); + } else { + return stringKeyConverter.encode(column); + } + } + + /** + * Create a filter list of qualifier filters based on passed set of columns. + * + * @param Describes the type of column prefix. + * @param colPrefix Column Prefix. + * @param columns set of column qualifiers. + * @return filter list. + */ + protected FilterList createFiltersFromColumnQualifiers( + ColumnPrefix colPrefix, Set columns) { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + for (String column : columns) { + // For columns which have compound column qualifiers (eg. events), we need + // to include the required separator. + byte[] compoundColQual = createColQualifierPrefix(colPrefix, column); + list.addFilter(new QualifierFilter(CompareOp.EQUAL, + new BinaryPrefixComparator(colPrefix + .getColumnPrefixBytes(compoundColQual)))); + } + return list; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index 852834e..17b8206 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.reader; import java.io.IOException; +import java.util.EnumSet; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; @@ -36,8 +37,11 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; /** @@ -68,6 +72,17 @@ private boolean sortedKeys = false; /** + * Used to convert strings key components to and from storage format. + */ + protected final KeyConverter stringKeyConverter = + new StringKeyConverter(); + + /** + * Used to convert Long key components to and from storage format. + */ + protected final KeyConverter longKeyConverter = new LongKeyConverter(); + + /** * Instantiates a reader for multiple-entity reads. * * @param ctxt Reader context which defines the scope in which query has to be @@ -331,7 +346,7 @@ protected void readMetrics(TimelineEntity entity, Result result, ColumnPrefix columnPrefix) throws IOException { NavigableMap> metricsResult = columnPrefix.readResultsWithTimestamps( - result, StringKeyConverter.getInstance()); + result, stringKeyConverter); for (Map.Entry> metricResult: metricsResult.entrySet()) { TimelineMetric metric = new TimelineMetric(); @@ -359,4 +374,19 @@ public boolean isSingleEntityRead() { protected void setTable(BaseTable baseTable) { this.table = baseTable; } + + /** + * Check if we have a certain field amongst fields to retrieve. This method + * checks against {@link Field#ALL} as well because that would mean field + * passed needs to be matched. + * + * @param fieldsToRetrieve fields to be retrieved. + * @param requiredField fields to be checked in fieldsToRetrieve. + * @return true if has the required field, false otherwise. + */ + protected boolean hasField(EnumSet fieldsToRetrieve, + Field requiredField) { + return fieldsToRetrieve.contains(Field.ALL) || + fieldsToRetrieve.contains(requiredField); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java index 74e4b5d..0f9f7a8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java @@ -26,24 +26,26 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyConverter; import org.junit.Test; public class TestKeyConverters { private final static String QUALIFIER_SEP = Separator.QUALIFIERS.getValue(); - private final static byte[] QUALIFIER_SEP_BYTES = - Bytes.toBytes(QUALIFIER_SEP); + private final static byte[] QUALIFIER_SEP_BYTES = Bytes + .toBytes(QUALIFIER_SEP); private final static String CLUSTER = "cl" + QUALIFIER_SEP + "uster"; private final static String USER = QUALIFIER_SEP + "user"; - private final static String FLOW_NAME = - "dummy_" + QUALIFIER_SEP + "flow" + QUALIFIER_SEP; + private final static String FLOW_NAME = "dummy_" + QUALIFIER_SEP + "flow" + + QUALIFIER_SEP; private final static Long FLOW_RUN_ID; private final static String APPLICATION_ID; static { @@ -53,7 +55,7 @@ int sepByteLen = QUALIFIER_SEP_BYTES.length; if (sepByteLen <= byteArr.length) { for (int i = 0; i < sepByteLen; i++) { - byteArr[i] = (byte)(longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]); + byteArr[i] = (byte) (longMaxByteArr[i] - QUALIFIER_SEP_BYTES[i]); } } FLOW_RUN_ID = Bytes.toLong(byteArr); @@ -62,8 +64,7 @@ if (sepByteLen <= byteArr.length) { for (int i = 0; i < sepByteLen; i++) { byteArr[byteArr.length - sepByteLen + i] = - (byte)(longMaxByteArr[byteArr.length - sepByteLen + i] - - QUALIFIER_SEP_BYTES[i]); + (byte) (longMaxByteArr[byteArr.length - sepByteLen + i] - QUALIFIER_SEP_BYTES[i]); } } clusterTs = Bytes.toLong(byteArr); @@ -74,63 +75,70 @@ private static void verifyRowPrefixBytes(byte[] byteRowKeyPrefix) { int sepLen = QUALIFIER_SEP_BYTES.length; for (int i = 0; i < sepLen; i++) { - assertTrue("Row key prefix not encoded properly.", - byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] == - QUALIFIER_SEP_BYTES[i]); + assertTrue( + "Row key prefix not encoded properly.", + byteRowKeyPrefix[byteRowKeyPrefix.length - sepLen + i] == QUALIFIER_SEP_BYTES[i]); } } @Test public void testFlowActivityRowKeyConverter() { - Long ts = TimelineStorageUtils.getTopOfTheDayTimestamp(1459900830000L); - byte[] byteRowKey = FlowActivityRowKeyConverter.getInstance().encode( - new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME)); - FlowActivityRowKey rowKey = - FlowActivityRowKeyConverter.getInstance().decode(byteRowKey); + Long ts = 1459900830000L; + Long dayTimestamp = TimelineStorageUtils.getTopOfTheDayTimestamp(ts); + byte[] byteRowKey = + new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey(); + FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey); assertEquals(CLUSTER, rowKey.getClusterId()); - assertEquals(ts, rowKey.getDayTimestamp()); + assertEquals(dayTimestamp, rowKey.getDayTimestamp()); assertEquals(USER, rowKey.getUserId()); assertEquals(FLOW_NAME, rowKey.getFlowName()); - byte[] byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode( - new FlowActivityRowKey(CLUSTER, null, null, null)); - byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); + byte[] byteRowKeyPrefix = + new FlowActivityRowKeyPrefix(CLUSTER).getRowKeyPrefix(); + byte[][] splits = + Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); assertEquals(2, splits.length); assertEquals(0, splits[1].length); assertEquals(CLUSTER, Separator.QUALIFIERS.decode(Bytes.toString(splits[0]))); verifyRowPrefixBytes(byteRowKeyPrefix); - byteRowKeyPrefix = FlowActivityRowKeyConverter.getInstance().encode( - new FlowActivityRowKey(CLUSTER, ts, null, null)); - splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE }); + byteRowKeyPrefix = + new FlowActivityRowKeyPrefix(CLUSTER, ts).getRowKeyPrefix(); + splits = + Separator.QUALIFIERS.split(byteRowKeyPrefix, + new int[] { Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + Separator.VARIABLE_SIZE }); assertEquals(3, splits.length); assertEquals(0, splits[2].length); assertEquals(CLUSTER, Separator.QUALIFIERS.decode(Bytes.toString(splits[0]))); - assertEquals(ts, (Long) TimelineStorageUtils.invertLong( - Bytes.toLong(splits[1]))); + assertEquals(ts, + (Long) TimelineStorageUtils.invertLong(Bytes.toLong(splits[1]))); verifyRowPrefixBytes(byteRowKeyPrefix); } @Test public void testFlowRunRowKeyConverter() { - byte[] byteRowKey = FlowRunRowKeyConverter.getInstance().encode( - new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID)); - FlowRunRowKey rowKey = - FlowRunRowKeyConverter.getInstance().decode(byteRowKey); + FlowRunRowKeyConverter flowRunRowKeyConverter = + new FlowRunRowKeyConverter(); + byte[] byteRowKey = + flowRunRowKeyConverter.encode(new FlowRunRowKey(CLUSTER, USER, + FLOW_NAME, FLOW_RUN_ID)); + FlowRunRowKey rowKey = flowRunRowKeyConverter.decode(byteRowKey); assertEquals(CLUSTER, rowKey.getClusterId()); assertEquals(USER, rowKey.getUserId()); assertEquals(FLOW_NAME, rowKey.getFlowName()); assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); - byte[] byteRowKeyPrefix = FlowRunRowKeyConverter.getInstance().encode( - new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null)); - byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); + byte[] byteRowKeyPrefix = + flowRunRowKeyConverter.encode(new FlowRunRowKey(CLUSTER, USER, + FLOW_NAME, null)); + byte[][] splits = + Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); assertEquals(4, splits.length); assertEquals(0, splits[3].length); assertEquals(FLOW_NAME, @@ -140,37 +148,40 @@ public void testFlowRunRowKeyConverter() { @Test public void testApplicationRowKeyConverter() { - byte[] byteRowKey = ApplicationRowKeyConverter.getInstance().encode( - new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, - APPLICATION_ID)); - ApplicationRowKey rowKey = - ApplicationRowKeyConverter.getInstance().decode(byteRowKey); + final ApplicationRowKeyConverter appRowKeyConverter = + new ApplicationRowKeyConverter(); + byte[] byteRowKey = + appRowKeyConverter.encode(new ApplicationRowKey(CLUSTER, USER, + FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID)); + ApplicationRowKey rowKey = appRowKeyConverter.decode(byteRowKey); assertEquals(CLUSTER, rowKey.getClusterId()); assertEquals(USER, rowKey.getUserId()); assertEquals(FLOW_NAME, rowKey.getFlowName()); assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); assertEquals(APPLICATION_ID, rowKey.getAppId()); - byte[] byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode( - new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, null)); + byte[] byteRowKeyPrefix = + new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID) + .getRowKeyPrefix(); byte[][] splits = - Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, - Separator.VARIABLE_SIZE }); + Separator.QUALIFIERS.split(byteRowKeyPrefix, + new int[] { Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + Separator.VARIABLE_SIZE }); assertEquals(5, splits.length); assertEquals(0, splits[4].length); assertEquals(FLOW_NAME, Separator.QUALIFIERS.decode(Bytes.toString(splits[2]))); - assertEquals(FLOW_RUN_ID, (Long)TimelineStorageUtils.invertLong( - Bytes.toLong(splits[3]))); + assertEquals(FLOW_RUN_ID, + (Long) TimelineStorageUtils.invertLong(Bytes.toLong(splits[3]))); verifyRowPrefixBytes(byteRowKeyPrefix); - byteRowKeyPrefix = ApplicationRowKeyConverter.getInstance().encode( - new ApplicationRowKey(CLUSTER, USER, FLOW_NAME, null, null)); - splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); + byteRowKeyPrefix = + new ApplicationRowKeyPrefix(CLUSTER, USER, FLOW_NAME).getRowKeyPrefix(); + splits = + Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); assertEquals(4, splits.length); assertEquals(0, splits[3].length); assertEquals(FLOW_NAME, @@ -182,11 +193,12 @@ public void testApplicationRowKeyConverter() { public void testEntityRowKeyConverter() { String entityId = "!ent!ity!!id!"; String entityType = "entity!Type"; - byte[] byteRowKey = EntityRowKeyConverter.getInstance().encode( + byte[] byteRowKey = new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, - entityType, entityId)); - EntityRowKey rowKey = - EntityRowKeyConverter.getInstance().decode(byteRowKey); + entityType, entityId).getRowKey(); + KeyConverter entityRowKeyConverter = + new EntityRowKeyConverter(); + EntityRowKey rowKey = entityRowKeyConverter.decode(byteRowKey); assertEquals(CLUSTER, rowKey.getClusterId()); assertEquals(USER, rowKey.getUserId()); assertEquals(FLOW_NAME, rowKey.getFlowName()); @@ -195,49 +207,54 @@ public void testEntityRowKeyConverter() { assertEquals(entityType, rowKey.getEntityType()); assertEquals(entityId, rowKey.getEntityId()); - byte[] byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode( - new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, - entityType, null)); + byte[] byteRowKeyPrefix = + new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, + APPLICATION_ID, entityType).getRowKeyPrefix(); byte[][] splits = - Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, - AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE }); + Separator.QUALIFIERS.split( + byteRowKeyPrefix, + new int[] { Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE }); assertEquals(7, splits.length); assertEquals(0, splits[6].length); - assertEquals(APPLICATION_ID, - AppIdKeyConverter.getInstance().decode(splits[4])); - assertEquals(entityType, Separator.QUALIFIERS.decode( - Bytes.toString(splits[5]))); + assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4])); + assertEquals(entityType, + Separator.QUALIFIERS.decode(Bytes.toString(splits[5]))); verifyRowPrefixBytes(byteRowKeyPrefix); - byteRowKeyPrefix = EntityRowKeyConverter.getInstance().encode( - new EntityRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, APPLICATION_ID, - null, null)); - splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, - AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE }); + byteRowKeyPrefix = + new EntityRowKeyPrefix(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID, + APPLICATION_ID).getRowKeyPrefix(); + splits = + Separator.QUALIFIERS.split( + byteRowKeyPrefix, + new int[] { Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + AppIdKeyConverter.getKeySize(), Separator.VARIABLE_SIZE }); assertEquals(6, splits.length); assertEquals(0, splits[5].length); - assertEquals(APPLICATION_ID, - AppIdKeyConverter.getInstance().decode(splits[4])); + AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter(); + assertEquals(APPLICATION_ID, appIdKeyConverter.decode(splits[4])); verifyRowPrefixBytes(byteRowKeyPrefix); } @Test public void testAppToFlowRowKeyConverter() { - byte[] byteRowKey = AppToFlowRowKeyConverter.getInstance().encode( - new AppToFlowRowKey(CLUSTER, APPLICATION_ID)); - AppToFlowRowKey rowKey = - AppToFlowRowKeyConverter.getInstance().decode(byteRowKey); + AppToFlowRowKeyConverter appToFlowRowKeyConverter = + new AppToFlowRowKeyConverter(); + byte[] byteRowKey = + appToFlowRowKeyConverter.encode(new AppToFlowRowKey(CLUSTER, + APPLICATION_ID)); + AppToFlowRowKey rowKey = appToFlowRowKeyConverter.decode(byteRowKey); assertEquals(CLUSTER, rowKey.getClusterId()); assertEquals(APPLICATION_ID, rowKey.getAppId()); } @Test public void testAppIdKeyConverter() { + AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter(); long currentTs = System.currentTimeMillis(); ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1); ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2); @@ -245,18 +262,19 @@ public void testAppIdKeyConverter() { String appIdStr1 = appId1.toString(); String appIdStr2 = appId2.toString(); String appIdStr3 = appId3.toString(); - byte[] appIdBytes1 = AppIdKeyConverter.getInstance().encode(appIdStr1); - byte[] appIdBytes2 = AppIdKeyConverter.getInstance().encode(appIdStr2); - byte[] appIdBytes3 = AppIdKeyConverter.getInstance().encode(appIdStr3); + byte[] appIdBytes1 = appIdKeyConverter.encode(appIdStr1); + byte[] appIdBytes2 = appIdKeyConverter.encode(appIdStr2); + byte[] appIdBytes3 = appIdKeyConverter.encode(appIdStr3); // App ids' should be encoded in a manner wherein descending order // is maintained. - assertTrue("Ordering of app ids' is incorrect", - Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 && - Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 && - Bytes.compareTo(appIdBytes2, appIdBytes3) > 0); - String decodedAppId1 = AppIdKeyConverter.getInstance().decode(appIdBytes1); - String decodedAppId2 = AppIdKeyConverter.getInstance().decode(appIdBytes2); - String decodedAppId3 = AppIdKeyConverter.getInstance().decode(appIdBytes3); + assertTrue( + "Ordering of app ids' is incorrect", + Bytes.compareTo(appIdBytes1, appIdBytes2) > 0 + && Bytes.compareTo(appIdBytes1, appIdBytes3) > 0 + && Bytes.compareTo(appIdBytes2, appIdBytes3) > 0); + String decodedAppId1 = appIdKeyConverter.decode(appIdBytes1); + String decodedAppId2 = appIdKeyConverter.decode(appIdBytes2); + String decodedAppId3 = appIdKeyConverter.decode(appIdBytes3); assertTrue("Decoded app id is not same as the app id encoded", appIdStr1.equals(decodedAppId1)); assertTrue("Decoded app id is not same as the app id encoded", @@ -273,19 +291,21 @@ public void testEventColumnNameConverter() { Bytes.createMaxByteArray(Bytes.SIZEOF_LONG - valSepBytes.length); byte[] ts = Bytes.add(valSepBytes, maxByteArr); Long eventTs = Bytes.toLong(ts); - byte[] byteEventColName = EventColumnNameConverter.getInstance().encode( - new EventColumnName(eventId, eventTs, null)); + byte[] byteEventColName = + new EventColumnName(eventId, eventTs, null).getColumnQualifier(); + KeyConverter eventColumnNameConverter = + new EventColumnNameConverter(); EventColumnName eventColName = - EventColumnNameConverter.getInstance().decode(byteEventColName); + eventColumnNameConverter.decode(byteEventColName); assertEquals(eventId, eventColName.getId()); assertEquals(eventTs, eventColName.getTimestamp()); assertNull(eventColName.getInfoKey()); String infoKey = "f=oo_event_in=fo=_key"; - byteEventColName = EventColumnNameConverter.getInstance().encode( - new EventColumnName(eventId, eventTs, infoKey)); - eventColName = - EventColumnNameConverter.getInstance().decode(byteEventColName); + byteEventColName = + eventColumnNameConverter.encode(new EventColumnName(eventId, eventTs, + infoKey)); + eventColName = eventColumnNameConverter.decode(byteEventColName); assertEquals(eventId, eventColName.getId()); assertEquals(eventTs, eventColName.getTimestamp()); assertEquals(infoKey, eventColName.getInfoKey());