diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index bcf2d2c..ea8aed7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -674,7 +674,7 @@ public void testWriteApplicationToHBase() throws Exception { Map infoColumns = ApplicationColumnPrefix.INFO.readResults(result, - StringKeyConverter.getInstance()); + new StringKeyConverter()); assertEquals(infoMap, infoColumns); // Remember isRelatedTo is of type Map> @@ -875,7 +875,7 @@ public void testWriteEntityToHBase() throws Exception { Map infoColumns = EntityColumnPrefix.INFO.readResults(result, - StringKeyConverter.getInstance()); + new StringKeyConverter()); assertEquals(infoMap, infoColumns); // Remember isRelatedTo is of type Map> diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 801d43c..677e374 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -59,8 +59,8 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; @@ -222,7 +222,7 @@ public void testWriteFlowRunMinMax() throws Exception { .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); Get g = new Get(startRow); g.addColumn(FlowRunColumnFamily.INFO.getBytes(), FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); @@ -346,10 +346,11 @@ private void checkFlowRunTable(String cluster, String user, String flow, long runid, Configuration c1) throws IOException { Scan s = new Scan(); s.addFamily(FlowRunColumnFamily.INFO.getBytes()); - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); s.setStartRow(startRow); String clusterStop = cluster + "1"; - byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid); + byte[] stopRow = + new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); Table table1 = conn.getTable(TableName @@ -611,7 +612,7 @@ private void checkMinMaxFlush(Configuration c1, long minTS, long startTs, .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); Get g = new Get(startRow); g.addColumn(FlowRunColumnFamily.INFO.getBytes(), FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index e7e7ba4..5196a8c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -19,24 +19,24 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertNotEquals; import java.io.IOException; -import java.util.Map; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; -import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -46,21 +46,21 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; /** * Tests the FlowRun and FlowActivity Tables @@ -192,10 +192,11 @@ private void checkFlowRunTable(String cluster, String user, String flow, long runid, Configuration c1, int valueCount) throws IOException { Scan s = new Scan(); s.addFamily(FlowRunColumnFamily.INFO.getBytes()); - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); s.setStartRow(startRow); String clusterStop = cluster + "1"; - byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid); + byte[] stopRow = + new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); Table table1 = conn.getTable(TableName @@ -300,8 +301,9 @@ public void checkProcessSummationMoreCellsSumFinal2() cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); currentColumnCells.add(c4); - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + List cells = + fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should be getting back 4 cells @@ -385,8 +387,9 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { cellTsNotFinal++; } - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + List cells = + fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should be getting back count + 1 cells @@ -487,8 +490,9 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() throws IOExceptio cellTsNotFinal++; } - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + List cells = + fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should be getting back @@ -552,7 +556,7 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException { 130L, Bytes.toBytes(cellValue2), tagByteArray); currentColumnCells.add(c2); List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should be getting back two cells @@ -600,7 +604,7 @@ public void testProcessSummationOneCellSumFinal() throws IOException { currentColumnCells.add(c1); List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should not get the same cell back // but we get back the flow cell @@ -637,7 +641,7 @@ public void testProcessSummationOneCell() throws IOException { currentTimestamp, Bytes.toBytes(1110L), tagByteArray); currentColumnCells.add(c1); List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + new LongConverter(), currentTimestamp); assertNotNull(cells); // we expect the same cell back assertEquals(1, cells.size()); @@ -651,15 +655,19 @@ public void testProcessSummationEmpty() throws IOException { FlowScanner fs = getFlowScannerForTestingCompaction(); long currentTimestamp = System.currentTimeMillis(); + LongConverter longConverter = new LongConverter(); + SortedSet currentColumnCells = null; - List cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + List cells = + fs.processSummationMajorCompaction(currentColumnCells, longConverter, + currentTimestamp); assertNotNull(cells); assertEquals(0, cells.size()); currentColumnCells = new TreeSet(KeyValue.COMPARATOR); - cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + cells = + fs.processSummationMajorCompaction(currentColumnCells, longConverter, + currentTimestamp); assertNotNull(cells); assertEquals(0, cells.size()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java index 036746b..cccae26 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java @@ -31,14 +31,9 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; @@ -209,39 +204,6 @@ private static SingleColumnValueFilter createHBaseSingleColValueFilter( return singleColValFilter; } - private static byte[] createColQualifierPrefix(ColumnPrefix colPrefix, - String column) { - if (colPrefix == ApplicationColumnPrefix.EVENT || - colPrefix == EntityColumnPrefix.EVENT) { - return EventColumnNameConverter.getInstance().encode( - new EventColumnName(column, null, null)); - } else { - return StringKeyConverter.getInstance().encode(column); - } - } - - /** - * Create a filter list of qualifier filters based on passed set of columns. - * - * @param Describes the type of column prefix. - * @param colPrefix Column Prefix. - * @param columns set of column qualifiers. - * @return filter list. - */ - public static FilterList createFiltersFromColumnQualifiers( - ColumnPrefix colPrefix, Set columns) { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - for (String column : columns) { - // For columns which have compound column qualifiers (eg. events), we need - // to include the required separator. - byte[] compoundColQual = createColQualifierPrefix(colPrefix, column); - list.addFilter(new QualifierFilter(CompareOp.EQUAL, - new BinaryPrefixComparator( - colPrefix.getColumnPrefixBytes(compoundColQual)))); - } - return list; - } - /** * Fetch columns from filter list containing exists and multivalue equality * filters. This is done to fetch only required columns from back-end and diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index f8b5a65..67b7895 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -86,6 +86,17 @@ private TypedBufferedMutator flowActivityTable; private TypedBufferedMutator flowRunTable; + /** + * Used to convert strings key components to and from storage format. + */ + private final StringKeyConverter stringKeyConverter = + new StringKeyConverter(); + + /** + * Used to convert Long key components to and from storage format. + */ + protected final LongKeyConverter longKeyConverter = new LongKeyConverter(); + public HBaseTimelineWriterImpl() { super(HBaseTimelineWriterImpl.class.getName()); } @@ -155,8 +166,9 @@ public TimelineWriteResponse write(String clusterId, String userId, TimelineEvent event = TimelineStorageUtils.getApplicationEvent(te, ApplicationMetricsConstants.CREATED_EVENT_TYPE); if (event != null) { - onApplicationCreated(clusterId, userId, flowName, flowVersion, - flowRunId, appId, te, event.getTimestamp()); + FlowRunRowKey flowRunRowKey = new FlowRunRowKey(clusterId, userId, flowName, flowRunId); + AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(clusterId, appId); + onApplicationCreated(flowRunRowKey, appToFlowRowKey, appId, userId, flowVersion, te, event.getTimestamp()); } // if it's an application entity, store metrics storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId, @@ -174,55 +186,43 @@ public TimelineWriteResponse write(String clusterId, String userId, return putStatus; } - private void onApplicationCreated(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntity te, long appCreatedTimeStamp) throws IOException { + private void onApplicationCreated(FlowRunRowKey flowRunRowKey, + AppToFlowRowKey appToFlowRowKey, String appId, String userId, + String flowVersion, TimelineEntity te, long appCreatedTimeStamp) throws IOException { + + String flowName = flowRunRowKey.getFlowName(); + Long flowRunId = flowRunRowKey.getFlowRunId(); + // store in App to flow table - storeInAppToFlowTable(clusterId, userId, flowName, flowRunId, appId, te); + byte[] rowKey = appToFlowRowKey.getRowKey(); + AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); + AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId); + AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId); + // store in flow run table - storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion, - flowRunId, appId, te); - // store in flow activity table - storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, - flowRunId, appId, appCreatedTimeStamp); - } + storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te); - /* - * updates the {@link FlowActivityTable} with the Application TimelineEntity - * information - */ - private void storeInFlowActivityTable(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - long activityTimeStamp) throws IOException { - byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, activityTimeStamp, - userId, flowName); - byte[] qualifier = LongKeyConverter.getInstance().encode(flowRunId); - FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, - null, flowVersion, + // store in flow activity table + byte[] flowActivityRowKeyBytes = + FlowActivityRowKey.getRowKey(flowRunRowKey.getClusterId(), + appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName); + byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); + FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes, + flowActivityTable, qualifier, null, flowVersion, AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); } /* * updates the {@link FlowRunTable} with Application Created information */ - private void storeAppCreatedInFlowRunTable(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, + private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey, String appId, TimelineEntity te) throws IOException { - byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, - flowRunId); + byte[] rowKey = flowRunRowKey.getRowKey(); FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null, te.getCreatedTime(), AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); } - private void storeInAppToFlowTable(String clusterId, String userId, - String flowName, long flowRunId, String appId, TimelineEntity te) - throws IOException { - byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); - AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); - AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId); - AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId); - } /* * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an @@ -236,8 +236,12 @@ private void onApplicationFinished(String clusterId, String userId, appId, te, appFinishedTimeStamp); // indicate in the flow activity table that the app has finished - storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, - flowRunId, appId, appFinishedTimeStamp); + byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, appFinishedTimeStamp, + userId, flowName); + byte[] qualifier = longKeyConverter.encode(flowRunId); + FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, + null, flowVersion, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); } /* @@ -247,7 +251,7 @@ private void storeAppFinishedInFlowRunTable(String clusterId, String userId, String flowName, long flowRunId, String appId, TimelineEntity te, long appFinishedTimeStamp) throws IOException { byte[] rowKey = - FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId); + new FlowRunRowKey(clusterId, userId, flowName, flowRunId).getRowKey(); Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId); FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null, @@ -269,8 +273,8 @@ private void storeFlowMetricsAppRunning(String clusterId, String userId, throws IOException { Set metrics = te.getMetrics(); if (metrics != null) { - byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, - flowRunId); + byte[] rowKey = new FlowRunRowKey(clusterId, userId, flowName, + flowRunId).getRowKey(); storeFlowMetrics(rowKey, metrics, AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId), AggregationOperation.SUM.getAttribute()); @@ -280,8 +284,7 @@ private void storeFlowMetricsAppRunning(String clusterId, String userId, private void storeFlowMetrics(byte[] rowKey, Set metrics, Attribute... attributes) throws IOException { for (TimelineMetric metric : metrics) { - byte[] metricColumnQualifier = - StringKeyConverter.getInstance().encode(metric.getId()); + byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId()); Map timeseries = metric.getValues(); for (Map.Entry timeseriesEntry : timeseries.entrySet()) { Long timestamp = timeseriesEntry.getKey(); @@ -320,8 +323,8 @@ private void storeRelations(byte[] rowKey, TimelineEntity te, String compoundValue = Separator.VALUES.joinEncoded(connectedEntity.getValue()); columnPrefix.store(rowKey, table, - StringKeyConverter.getInstance().encode(connectedEntity.getKey()), - null, compoundValue); + stringKeyConverter.encode(connectedEntity.getKey()), null, + compoundValue); } } @@ -341,7 +344,7 @@ private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, if (info != null) { for (Map.Entry entry : info.entrySet()) { ApplicationColumnPrefix.INFO.store(rowKey, applicationTable, - StringKeyConverter.getInstance().encode(entry.getKey()), null, + stringKeyConverter.encode(entry.getKey()), null, entry.getValue()); } } @@ -355,7 +358,7 @@ private void storeInfo(byte[] rowKey, TimelineEntity te, String flowVersion, if (info != null) { for (Map.Entry entry : info.entrySet()) { EntityColumnPrefix.INFO.store(rowKey, entityTable, - StringKeyConverter.getInstance().encode(entry.getKey()), null, + stringKeyConverter.encode(entry.getKey()), null, entry.getValue()); } } @@ -371,8 +374,7 @@ private void storeConfig(byte[] rowKey, Map config, return; } for (Map.Entry entry : config.entrySet()) { - byte[] configKey = - StringKeyConverter.getInstance().encode(entry.getKey()); + byte[] configKey = stringKeyConverter.encode(entry.getKey()); if (isApplication) { ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable, configKey, null, entry.getValue()); @@ -392,7 +394,7 @@ private void storeMetrics(byte[] rowKey, Set metrics, if (metrics != null) { for (TimelineMetric metric : metrics) { byte[] metricColumnQualifier = - StringKeyConverter.getInstance().encode(metric.getId()); + stringKeyConverter.encode(metric.getId()); Map timeseries = metric.getValues(); for (Map.Entry timeseriesEntry : timeseries.entrySet()) { Long timestamp = timeseriesEntry.getKey(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java index 80fcf8c..dde3911 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java @@ -45,7 +45,7 @@ * When the application was created. */ CREATED_TIME(ApplicationColumnFamily.INFO, "created_time", - LongConverter.getInstance()), + new LongConverter()), /** * The version of the flow that this app belongs to. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index 0febc67..42488f4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -67,8 +67,7 @@ /** * Metrics are stored with the metric name as the column name. */ - METRIC(ApplicationColumnFamily.METRICS, null, - LongConverter.getInstance()); + METRIC(ApplicationColumnFamily.METRICS, null, new LongConverter()); private final ColumnHelper column; private final ColumnFamily columnFamily; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java index 3b054a5..b7fc727 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java @@ -39,6 +39,9 @@ public static ApplicationRowKeyConverter getInstance() { return INSTANCE; } + private final static AppIdKeyConverter appIdKeyConverter = + new AppIdKeyConverter(); + private ApplicationRowKeyConverter() { } @@ -92,7 +95,7 @@ private ApplicationRowKeyConverter() { if (rowKey.getAppId() == null || rowKey.getAppId().isEmpty()) { return Separator.QUALIFIERS.join(first, second, Separator.EMPTY_BYTES); } - byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId()); + byte[] third = appIdKeyConverter.encode(rowKey.getAppId()); return Separator.QUALIFIERS.join(first, second, third); } @@ -124,7 +127,7 @@ public ApplicationRowKey decode(byte[] rowKey) { Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); Long flowRunId = TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); - String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]); + String appId = appIdKeyConverter.decode(rowKeyComponents[4]); return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java index 6a38e32..beaf693 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java @@ -23,6 +23,8 @@ public class AppToFlowRowKey { private final String clusterId; private final String appId; + private final AppToFlowRowKeyConverter appToFlowRowKeyConverter = + new AppToFlowRowKeyConverter(); public AppToFlowRowKey(String clusterId, String appId) { this.clusterId = clusterId; @@ -45,9 +47,8 @@ public String getAppId() { * @param appId Application Id. * @return byte array with the row key */ - public static byte[] getRowKey(String clusterId, String appId) { - return AppToFlowRowKeyConverter.getInstance().encode( - new AppToFlowRowKey(clusterId, appId)); + public byte[] getRowKey() { + return appToFlowRowKeyConverter.encode(this); } /** @@ -57,6 +58,6 @@ public String getAppId() { * @return an AppToFlowRowKey object. */ public static AppToFlowRowKey parseRowKey(byte[] rowKey) { - return AppToFlowRowKeyConverter.getInstance().decode(rowKey); + return new AppToFlowRowKeyConverter().decode(rowKey); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java index 0f0b879d..5d34e00 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java @@ -31,14 +31,11 @@ */ public final class AppToFlowRowKeyConverter implements KeyConverter { - private static final AppToFlowRowKeyConverter INSTANCE = - new AppToFlowRowKeyConverter(); - public static AppToFlowRowKeyConverter getInstance() { - return INSTANCE; - } + private final static AppIdKeyConverter appIdKeyConverter = + new AppIdKeyConverter(); - private AppToFlowRowKeyConverter() { + public AppToFlowRowKeyConverter() { } // App to flow row key is of the form clusterId!appId with the 2 segments @@ -66,7 +63,7 @@ private AppToFlowRowKeyConverter() { public byte[] encode(AppToFlowRowKey rowKey) { byte[] first = Separator.encode(rowKey.getClusterId(), Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); - byte[] second = AppIdKeyConverter.getInstance().encode(rowKey.getAppId()); + byte[] second = appIdKeyConverter.encode(rowKey.getAppId()); return Separator.QUALIFIERS.join(first, second); } @@ -90,7 +87,7 @@ public AppToFlowRowKey decode(byte[] rowKey) { } String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]), Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[1]); + String appId = appIdKeyConverter.decode(rowKeyComponents[1]); return new AppToFlowRowKey(clusterId, appId); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java index a173b0f..3cd837e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java @@ -28,13 +28,8 @@ * (long - 8 bytes) followed by sequence id section of app id (int - 4 bytes). */ public final class AppIdKeyConverter implements KeyConverter { - private static final AppIdKeyConverter INSTANCE = new AppIdKeyConverter(); - public static AppIdKeyConverter getInstance() { - return INSTANCE; - } - - private AppIdKeyConverter() { + public AppIdKeyConverter() { } /* diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java index 48c56f9..85ad66f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongConverter.java @@ -27,13 +27,8 @@ * decodes a set of bytes as a Long. */ public final class LongConverter implements NumericValueConverter { - private static final LongConverter INSTANCE = new LongConverter(); - private LongConverter() { - } - - public static LongConverter getInstance() { - return INSTANCE; + public LongConverter() { } @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java index 3954145..23f50d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/LongKeyConverter.java @@ -23,13 +23,13 @@ * Encodes and decodes column names / row keys which are long. */ public final class LongKeyConverter implements KeyConverter { - private static final LongKeyConverter INSTANCE = new LongKeyConverter(); - public static LongKeyConverter getInstance() { - return INSTANCE; - } + /** + * To delegate the actual work to. + */ + private static final LongConverter longConverter = new LongConverter(); - private LongKeyConverter() { + public LongKeyConverter() { } /* @@ -44,7 +44,7 @@ private LongKeyConverter() { try { // IOException will not be thrown here as we are explicitly passing // Long. - return LongConverter.getInstance().encodeValue(key); + return longConverter.encodeValue(key); } catch (IOException e) { return null; } @@ -60,7 +60,7 @@ private LongKeyConverter() { @Override public Long decode(byte[] bytes) { try { - return (Long) LongConverter.getInstance().decodeValue(bytes); + return (Long) longConverter.decodeValue(bytes); } catch (IOException e) { return null; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java index b0f6d55..282848e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/StringKeyConverter.java @@ -24,13 +24,8 @@ * added later, if required in the associated ColumnPrefix implementations. */ public final class StringKeyConverter implements KeyConverter { - private static final StringKeyConverter INSTANCE = new StringKeyConverter(); - public static StringKeyConverter getInstance() { - return INSTANCE; - } - - private StringKeyConverter() { + public StringKeyConverter() { } /* diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index d52a5d7..6e14c9d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -647,60 +647,6 @@ public static String getAggregationCompactionDimension(List tags) { } /** - * Helper method for reading relationship. - * - * @param Describes the type of column prefix. - * @param entity entity to fill. - * @param result result from HBase. - * @param prefix column prefix. - * @param isRelatedTo if true, means relationship is to be added to - * isRelatedTo, otherwise its added to relatesTo. - * @throws IOException if any problem is encountered while reading result. - */ - public static void readRelationship( - TimelineEntity entity, Result result, ColumnPrefix prefix, - boolean isRelatedTo) throws IOException { - // isRelatedTo and relatesTo are of type Map> - Map columns = - prefix.readResults(result, StringKeyConverter.getInstance()); - for (Map.Entry column : columns.entrySet()) { - for (String id : Separator.VALUES.splitEncoded( - column.getValue().toString())) { - if (isRelatedTo) { - entity.addIsRelatedToEntity(column.getKey(), id); - } else { - entity.addRelatesToEntity(column.getKey(), id); - } - } - } - } - - /** - * Helper method for reading key-value pairs for either info or config. - * - * @param Describes the type of column prefix. - * @param entity entity to fill. - * @param result result from HBase. - * @param prefix column prefix. - * @param isConfig if true, means we are reading configs, otherwise info. - * @throws IOException if any problem is encountered while reading result. - */ - public static void readKeyValuePairs( - TimelineEntity entity, Result result, ColumnPrefix prefix, - boolean isConfig) throws IOException { - // info and configuration are of type Map - Map columns = - prefix.readResults(result, StringKeyConverter.getInstance()); - if (isConfig) { - for (Map.Entry column : columns.entrySet()) { - entity.addConfig(column.getKey(), column.getValue().toString()); - } - } else { - entity.addInfo(columns); - } - } - - /** * Read events from the entity table or the application table. The column name * is of the form "eventId=timestamp=infoKey" where "infoKey" may be omitted * if there is no info associated with the event. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java index 775879a..93b4b36 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumn.java @@ -49,8 +49,7 @@ /** * When the entity was created. */ - CREATED_TIME(EntityColumnFamily.INFO, "created_time", - LongConverter.getInstance()), + CREATED_TIME(EntityColumnFamily.INFO, "created_time", new LongConverter()), /** * The version of the flow that this entity belongs to. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java index 02a4bb3..e410549 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityColumnPrefix.java @@ -67,8 +67,7 @@ /** * Metrics are stored with the metric name as the column name. */ - METRIC(EntityColumnFamily.METRICS, null, - LongConverter.getInstance()); + METRIC(EntityColumnFamily.METRICS, null, new LongConverter()); private final ColumnHelper column; private final ColumnFamily columnFamily; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java index 43c0569..f8216c8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityRowKeyConverter.java @@ -39,7 +39,10 @@ public static EntityRowKeyConverter getInstance() { return INSTANCE; } - private EntityRowKeyConverter() { + private final static AppIdKeyConverter appIdKeyConverter = + new AppIdKeyConverter(); + + public EntityRowKeyConverter() { } // Entity row key is of the form @@ -90,7 +93,7 @@ private EntityRowKeyConverter() { // time. byte[] second = Bytes.toBytes(TimelineStorageUtils.invertLong( rowKey.getFlowRunId())); - byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId()); + byte[] third = appIdKeyConverter.encode(rowKey.getAppId()); if (rowKey.getEntityType() == null) { return Separator.QUALIFIERS.join( first, second, third, Separator.EMPTY_BYTES); @@ -132,7 +135,7 @@ public EntityRowKey decode(byte[] rowKey) { Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); Long flowRunId = TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); - String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]); + String appId = appIdKeyConverter.decode(rowKeyComponents[4]); String entityType = Separator.decode(Bytes.toString(rowKeyComponents[5]), Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); String entityId =Separator.decode(Bytes.toString(rowKeyComponents[6]), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java index f1553b8..2e7a9d8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -25,10 +25,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; /** @@ -41,14 +41,14 @@ * application start times. */ MIN_START_TIME(FlowRunColumnFamily.INFO, "min_start_time", - AggregationOperation.GLOBAL_MIN, LongConverter.getInstance()), + AggregationOperation.GLOBAL_MIN, new LongConverter()), /** * When the flow ended. This is the maximum of currently known application end * times. */ MAX_END_TIME(FlowRunColumnFamily.INFO, "max_end_time", - AggregationOperation.GLOBAL_MAX, LongConverter.getInstance()), + AggregationOperation.GLOBAL_MAX, new LongConverter()), /** * The version of the flow that this flow belongs to. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index 0f14c89..e74282a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -41,7 +41,7 @@ /** * To store flow run info values. */ - METRIC(FlowRunColumnFamily.INFO, "m", null, LongConverter.getInstance()); + METRIC(FlowRunColumnFamily.INFO, "m", null, new LongConverter()); private final ColumnHelper column; private final ColumnFamily columnFamily; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java index 925242b..fb68815 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKey.java @@ -25,6 +25,8 @@ private final String userId; private final String flowName; private final Long flowRunId; + private final FlowRunRowKeyConverter flowRunRowKeyConverter = + new FlowRunRowKeyConverter(); public FlowRunRowKey(String clusterId, String userId, String flowName, Long flowRunId) { @@ -51,21 +53,6 @@ public Long getFlowRunId() { } /** - * Constructs a row key prefix for the flow run table as follows: { - * clusterId!userI!flowName!}. - * - * @param clusterId Cluster Id. - * @param userId User Id. - * @param flowName Flow Name. - * @return byte array with the row key prefix - */ - public static byte[] getRowKeyPrefix(String clusterId, String userId, - String flowName) { - return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey( - clusterId, userId, flowName, null)); - } - - /** * Constructs a row key for the entity table as follows: { * clusterId!userId!flowName!Inverted Flow Run Id}. * @@ -75,12 +62,11 @@ public Long getFlowRunId() { * @param flowRunId Run Id for the flow name. * @return byte array with the row key */ - public static byte[] getRowKey(String clusterId, String userId, - String flowName, Long flowRunId) { - return FlowRunRowKeyConverter.getInstance().encode(new FlowRunRowKey( - clusterId, userId, flowName, flowRunId)); + public byte[] getRowKey() { + return flowRunRowKeyConverter.encode(this); } + /** * Given the raw row key as bytes, returns the row key as an object. * @@ -88,7 +74,7 @@ public Long getFlowRunId() { * @return A FlowRunRowKey object. */ public static FlowRunRowKey parseRowKey(byte[] rowKey) { - return FlowRunRowKeyConverter.getInstance().decode(rowKey); + return new FlowRunRowKeyConverter().decode(rowKey); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java index 642f065..1b9126b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyConverter.java @@ -30,14 +30,8 @@ */ public final class FlowRunRowKeyConverter implements KeyConverter { - private static final FlowRunRowKeyConverter INSTANCE = - new FlowRunRowKeyConverter(); - public static FlowRunRowKeyConverter getInstance() { - return INSTANCE; - } - - private FlowRunRowKeyConverter() { + public FlowRunRowKeyConverter() { } // Flow run row key is of the form diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java new file mode 100644 index 0000000..9866f51 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunRowKeyPrefix.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +/** + * Represents a partial rowkey (without the flowRunId) for the flow run table. + */ +public class FlowRunRowKeyPrefix extends FlowRunRowKey { + + /** + * @param clusterId + * @param userId + * @param flowName + */ + public FlowRunRowKeyPrefix(String clusterId, String userId, String flowName) { + super(clusterId, userId, flowName, null); + } + + /** + * Constructs a row key prefix for the flow run table as follows: { + * clusterId!userI!flowName!}. + * + * @param clusterId Cluster Id. + * @param userId User Id. + * @param flowName Flow Name. + * @return byte array with the row key prefix + */ + public byte[] getRowKeyPrefix() { + // We know we're a FlowRunRowKey with null florRunId, so we can simply delegate + return super.getRowKey(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java index 71c90fb..fb6fada 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; @@ -91,32 +92,29 @@ protected FilterList constructFilterListBasedOnFilters() throws IOException { long createdTimeBegin = filters.getCreatedTimeBegin(); long createdTimeEnd = filters.getCreatedTimeEnd(); if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createSingleColValueFiltersByRange( - ApplicationColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd)); + listBasedOnFilters.addFilter(TimelineFilterUtils + .createSingleColValueFiltersByRange(ApplicationColumn.CREATED_TIME, + createdTimeBegin, createdTimeEnd)); } // Create filter list based on metric filters and add it to // listBasedOnFilters. TimelineFilterList metricFilters = filters.getMetricFilters(); if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.METRIC, metricFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.METRIC, metricFilters)); } // Create filter list based on config filters and add it to // listBasedOnFilters. TimelineFilterList configFilters = filters.getConfigFilters(); if (configFilters != null && !configFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.CONFIG, configFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.CONFIG, configFilters)); } // Create filter list based on info filters and add it to listBasedOnFilters TimelineFilterList infoFilters = filters.getInfoFilters(); if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - ApplicationColumnPrefix.INFO, infoFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + ApplicationColumnPrefix.INFO, infoFilters)); } return listBasedOnFilters; } @@ -130,8 +128,8 @@ protected FilterList constructFilterListBasedOnFilters() throws IOException { @Override protected void updateFixedColumns(FilterList list) { for (ApplicationColumn column : ApplicationColumn.values()) { - list.addFilter(new QualifierFilter(CompareOp.EQUAL, - new BinaryComparator(column.getColumnQualifierBytes()))); + list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator( + column.getColumnQualifierBytes()))); } } @@ -142,8 +140,7 @@ protected void updateFixedColumns(FilterList list) { * @return filter list. * @throws IOException if any problem occurs while creating filter list. */ - private FilterList createFilterListForColsOfInfoFamily() - throws IOException { + private FilterList createFilterListForColsOfInfoFamily() throws IOException { FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE); // Add filters for each column in entity table. updateFixedColumns(infoFamilyColsFilter); @@ -151,17 +148,17 @@ private FilterList createFilterListForColsOfInfoFamily() // If INFO field has to be retrieved, add a filter for fetching columns // with INFO column prefix. if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, ApplicationColumnPrefix.INFO)); + infoFamilyColsFilter.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.EQUAL, + ApplicationColumnPrefix.INFO)); } TimelineFilterList relatesTo = getFilters().getRelatesTo(); if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { // If RELATES_TO field has to be retrieved, add a filter for fetching // columns with RELATES_TO column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, ApplicationColumnPrefix.RELATES_TO)); + infoFamilyColsFilter.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.EQUAL, + ApplicationColumnPrefix.RELATES_TO)); } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) { // Even if fields to retrieve does not contain RELATES_TO, we still // need to have a filter to fetch some of the column qualifiers if @@ -169,17 +166,16 @@ private FilterList createFilterListForColsOfInfoFamily() // matched after fetching rows from HBase. Set relatesToCols = TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - ApplicationColumnPrefix.RELATES_TO, relatesToCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.RELATES_TO, relatesToCols)); } TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { // If IS_RELATED_TO field has to be retrieved, add a filter for fetching // columns with IS_RELATED_TO column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, ApplicationColumnPrefix.IS_RELATED_TO)); + infoFamilyColsFilter.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.EQUAL, + ApplicationColumnPrefix.IS_RELATED_TO)); } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) { // Even if fields to retrieve does not contain IS_RELATED_TO, we still // need to have a filter to fetch some of the column qualifiers if @@ -187,27 +183,25 @@ private FilterList createFilterListForColsOfInfoFamily() // matched after fetching rows from HBase. Set isRelatedToCols = TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.IS_RELATED_TO, isRelatedToCols)); } TimelineFilterList eventFilters = getFilters().getEventFilters(); if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { // If EVENTS field has to be retrieved, add a filter for fetching columns // with EVENT column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, ApplicationColumnPrefix.EVENT)); - } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){ + infoFamilyColsFilter.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.EQUAL, + ApplicationColumnPrefix.EVENT)); + } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()) { // Even if fields to retrieve does not contain EVENTS, we still need to // have a filter to fetch some of the column qualifiers on the basis of // event filters specified. Event filters will then be matched after // fetching rows from HBase. Set eventCols = TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - ApplicationColumnPrefix.EVENT, eventCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + ApplicationColumnPrefix.EVENT, eventCols)); } return infoFamilyColsFilter; } @@ -223,27 +217,27 @@ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // Events not required. if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, ApplicationColumnPrefix.EVENT)); + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + ApplicationColumnPrefix.EVENT)); } // info not required. if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, ApplicationColumnPrefix.INFO)); + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + ApplicationColumnPrefix.INFO)); } // is related to not required. if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, ApplicationColumnPrefix.IS_RELATED_TO)); + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + ApplicationColumnPrefix.IS_RELATED_TO)); } // relates to not required. if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, ApplicationColumnPrefix.RELATES_TO)); + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + ApplicationColumnPrefix.RELATES_TO)); } } @@ -260,8 +254,8 @@ private void updateFilterForConfsAndMetricsToRetrieve( // CONFS to fields to retrieve in augmentParams() even if not specified. if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) { // Create a filter list for configs. - listBasedOnFields.addFilter(TimelineFilterUtils. - createFilterForConfsOrMetricsToRetrieve( + listBasedOnFields.addFilter(TimelineFilterUtils + .createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getConfsToRetrieve(), ApplicationColumnFamily.CONFIGS, ApplicationColumnPrefix.CONFIG)); } @@ -270,8 +264,8 @@ private void updateFilterForConfsAndMetricsToRetrieve( // METRICS to fields to retrieve in augmentParams() even if not specified. if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) { // Create a filter list for metrics. - listBasedOnFields.addFilter(TimelineFilterUtils. - createFilterForConfsOrMetricsToRetrieve( + listBasedOnFields.addFilter(TimelineFilterUtils + .createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getMetricsToRetrieve(), ApplicationColumnFamily.METRICS, ApplicationColumnPrefix.METRIC)); } @@ -287,8 +281,8 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { FilterList infoColFamilyList = new FilterList(); // By default fetch everything in INFO column family. FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(ApplicationColumnFamily.INFO.getBytes())); + new FamilyFilter(CompareOp.EQUAL, new BinaryComparator( + ApplicationColumnFamily.INFO.getBytes())); infoColFamilyList.addFilter(infoColumnFamily); if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) { // We can fetch only some of the columns from info family. @@ -309,8 +303,9 @@ protected Result getResult(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { TimelineReaderContext context = getContext(); byte[] rowKey = - ApplicationRowKey.getRowKey(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId(), context.getAppId()); + ApplicationRowKey.getRowKey(context.getClusterId(), + context.getUserId(), context.getFlowName(), context.getFlowRunId(), + context.getAppId()); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -322,8 +317,8 @@ protected Result getResult(Configuration hbaseConf, Connection conn, @Override protected void validateParams() { Preconditions.checkNotNull(getContext(), "context shouldn't be null"); - Preconditions.checkNotNull( - getDataToRetrieve(), "data to retrieve shouldn't be null"); + Preconditions.checkNotNull(getDataToRetrieve(), + "data to retrieve shouldn't be null"); Preconditions.checkNotNull(getContext().getClusterId(), "clusterId shouldn't be null"); Preconditions.checkNotNull(getContext().getEntityType(), @@ -345,10 +340,13 @@ protected void augmentParams(Configuration hbaseConf, Connection conn) TimelineReaderContext context = getContext(); if (isSingleEntityRead()) { // Get flow context information from AppToFlow table. - if (context.getFlowName() == null || context.getFlowRunId() == null || - context.getUserId() == null) { - FlowContext flowContext = lookupFlowContext( - context.getClusterId(), context.getAppId(), hbaseConf, conn); + if (context.getFlowName() == null || context.getFlowRunId() == null + || context.getUserId() == null) { + AppToFlowRowKey appToFlowRowKey = + new AppToFlowRowKey(context.getClusterId(), context.getAppId()); + FlowContext flowContext = + lookupFlowContext(appToFlowRowKey, + hbaseConf, conn); context.setFlowName(flowContext.getFlowName()); context.setFlowRunId(flowContext.getFlowRunId()); context.setUserId(flowContext.getUserId()); @@ -363,18 +361,17 @@ protected void augmentParams(Configuration hbaseConf, Connection conn) } @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { + protected ResultScanner getResults(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { Scan scan = new Scan(); TimelineReaderContext context = getContext(); if (context.getFlowRunId() != null) { - scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName(), context.getFlowRunId())); + scan.setRowPrefixFilter(ApplicationRowKey.getRowKeyPrefix( + context.getClusterId(), context.getUserId(), context.getFlowName(), + context.getFlowRunId())); } else { - scan.setRowPrefixFilter(ApplicationRowKey. - getRowKeyPrefix(context.getClusterId(), context.getUserId(), - context.getFlowName())); + scan.setRowPrefixFilter(ApplicationRowKey.getRowKeyPrefix( + context.getClusterId(), context.getUserId(), context.getFlowName())); } FilterList newList = new FilterList(); newList.addFilter(new PageFilter(getFilters().getLimit())); @@ -406,18 +403,18 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // locally as relevant HBase filters to filter out rows on the basis of // isRelatedTo are not set in HBase scan. boolean checkIsRelatedTo = - !isSingleEntityRead() && filters.getIsRelatedTo() != null && - filters.getIsRelatedTo().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || - checkIsRelatedTo) { - TimelineStorageUtils.readRelationship( - entity, result, ApplicationColumnPrefix.IS_RELATED_TO, true); - if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity, - filters.getIsRelatedTo())) { + !isSingleEntityRead() && filters.getIsRelatedTo() != null + && filters.getIsRelatedTo().getFilterList().size() > 0; + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) + || checkIsRelatedTo) { + readRelationship(entity, result, ApplicationColumnPrefix.IS_RELATED_TO, + true); + if (checkIsRelatedTo + && !TimelineStorageUtils.matchIsRelatedTo(entity, + filters.getIsRelatedTo())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, - Field.IS_RELATED_TO)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { entity.getIsRelatedToEntities().clear(); } } @@ -427,14 +424,15 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // locally as relevant HBase filters to filter out rows on the basis of // relatesTo are not set in HBase scan. boolean checkRelatesTo = - !isSingleEntityRead() && filters.getRelatesTo() != null && - filters.getRelatesTo().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) || - checkRelatesTo) { - TimelineStorageUtils.readRelationship( - entity, result, ApplicationColumnPrefix.RELATES_TO, false); - if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity, - filters.getRelatesTo())) { + !isSingleEntityRead() && filters.getRelatesTo() != null + && filters.getRelatesTo().getFilterList().size() > 0; + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) + || checkRelatesTo) { + readRelationship(entity, result, ApplicationColumnPrefix.RELATES_TO, + false); + if (checkRelatesTo + && !TimelineStorageUtils.matchRelatesTo(entity, + filters.getRelatesTo())) { return null; } if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { @@ -444,14 +442,12 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // fetch info if fieldsToRetrieve contains INFO or ALL. if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - TimelineStorageUtils.readKeyValuePairs( - entity, result, ApplicationColumnPrefix.INFO, false); + readKeyValuePairs(entity, result, ApplicationColumnPrefix.INFO, false); } // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) { - TimelineStorageUtils.readKeyValuePairs( - entity, result, ApplicationColumnPrefix.CONFIG, true); + readKeyValuePairs(entity, result, ApplicationColumnPrefix.CONFIG, true); } // fetch events and match event filters if they exist. If event filters do @@ -459,14 +455,15 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // as relevant HBase filters to filter out rows on the basis of events // are not set in HBase scan. boolean checkEvents = - !isSingleEntityRead() && filters.getEventFilters() != null && - filters.getEventFilters().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) || - checkEvents) { - TimelineStorageUtils.readEvents( - entity, result, ApplicationColumnPrefix.EVENT); - if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity, - filters.getEventFilters())) { + !isSingleEntityRead() && filters.getEventFilters() != null + && filters.getEventFilters().getFilterList().size() > 0; + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) + || checkEvents) { + TimelineStorageUtils.readEvents(entity, result, + ApplicationColumnPrefix.EVENT); + if (checkEvents + && !TimelineStorageUtils.matchEventFilters(entity, + filters.getEventFilters())) { return null; } if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java index faecd14..9e86cde 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; @@ -137,8 +136,7 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // get the list of run ids along with the version that are associated with // this flow on this day Map runIdsMap = - FlowActivityColumnPrefix.RUN_ID.readResults(result, - LongKeyConverter.getInstance()); + FlowActivityColumnPrefix.RUN_ID.readResults(result, longKeyConverter); for (Map.Entry e : runIdsMap.entrySet()) { Long runId = e.getKey(); String version = (String)e.getValue(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java index b2de2e5..3f7b28b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java @@ -27,12 +27,12 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FamilyFilter; import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; import com.google.common.base.Preconditions; @@ -184,9 +185,10 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { protected Result getResult(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { TimelineReaderContext context = getContext(); - byte[] rowKey = - FlowRunRowKey.getRowKey(context.getClusterId(), context.getUserId(), + FlowRunRowKey flowRunRowKey = + new FlowRunRowKey(context.getClusterId(), context.getUserId(), context.getFlowName(), context.getFlowRunId()); + byte[] rowKey = flowRunRowKey.getRowKey(); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); if (filterList != null && !filterList.getFilters().isEmpty()) { @@ -200,9 +202,10 @@ protected ResultScanner getResults(Configuration hbaseConf, Connection conn, FilterList filterList) throws IOException { Scan scan = new Scan(); TimelineReaderContext context = getContext(); - scan.setRowPrefixFilter( - FlowRunRowKey.getRowKeyPrefix(context.getClusterId(), - context.getUserId(), context.getFlowName())); + FlowRunRowKeyPrefix flowRunRowKeyPrefix = + new FlowRunRowKeyPrefix(context.getClusterId(), context.getUserId(), + context.getFlowName()); + scan.setRowPrefixFilter(flowRunRowKeyPrefix.getRowKeyPrefix()); FilterList newList = new FilterList(); newList.addFilter(new PageFilter(getFilters().getLimit())); if (filterList != null && !filterList.getFilters().isEmpty()) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java index 878695c..b1dc2d4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.EnumSet; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -28,11 +29,12 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FamilyFilter; import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.QualifierFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; @@ -40,10 +42,15 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; @@ -95,32 +102,29 @@ protected FilterList constructFilterListBasedOnFilters() throws IOException { long createdTimeBegin = filters.getCreatedTimeBegin(); long createdTimeEnd = filters.getCreatedTimeEnd(); if (createdTimeBegin != 0 || createdTimeEnd != Long.MAX_VALUE) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createSingleColValueFiltersByRange( - EntityColumn.CREATED_TIME, createdTimeBegin, createdTimeEnd)); + listBasedOnFilters.addFilter(TimelineFilterUtils + .createSingleColValueFiltersByRange(EntityColumn.CREATED_TIME, + createdTimeBegin, createdTimeEnd)); } // Create filter list based on metric filters and add it to // listBasedOnFilters. TimelineFilterList metricFilters = filters.getMetricFilters(); if (metricFilters != null && !metricFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.METRIC, metricFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.METRIC, metricFilters)); } // Create filter list based on config filters and add it to // listBasedOnFilters. TimelineFilterList configFilters = filters.getConfigFilters(); if (configFilters != null && !configFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.CONFIG, configFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.CONFIG, configFilters)); } // Create filter list based on info filters and add it to listBasedOnFilters TimelineFilterList infoFilters = filters.getInfoFilters(); if (infoFilters != null && !infoFilters.getFilterList().isEmpty()) { - listBasedOnFilters.addFilter( - TimelineFilterUtils.createHBaseFilterList( - EntityColumnPrefix.INFO, infoFilters)); + listBasedOnFilters.addFilter(TimelineFilterUtils.createHBaseFilterList( + EntityColumnPrefix.INFO, infoFilters)); } return listBasedOnFilters; } @@ -132,8 +136,8 @@ protected FilterList constructFilterListBasedOnFilters() throws IOException { */ private static boolean fetchPartialEventCols(TimelineFilterList eventFilters, EnumSet fieldsToRetrieve) { - return (eventFilters != null && !eventFilters.getFilterList().isEmpty() && - !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)); + return (eventFilters != null && !eventFilters.getFilterList().isEmpty() && !TimelineStorageUtils + .hasField(fieldsToRetrieve, Field.EVENTS)); } /** @@ -141,10 +145,10 @@ private static boolean fetchPartialEventCols(TimelineFilterList eventFilters, * * @return true if we need to fetch some of the columns, false otherwise. */ - private static boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo, - EnumSet fieldsToRetrieve) { - return (relatesTo != null && !relatesTo.getFilterList().isEmpty() && - !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)); + private static boolean fetchPartialRelatesToCols( + TimelineFilterList relatesTo, EnumSet fieldsToRetrieve) { + return (relatesTo != null && !relatesTo.getFilterList().isEmpty() && !TimelineStorageUtils + .hasField(fieldsToRetrieve, Field.RELATES_TO)); } /** @@ -154,8 +158,8 @@ private static boolean fetchPartialRelatesToCols(TimelineFilterList relatesTo, */ private static boolean fetchPartialIsRelatedToCols( TimelineFilterList isRelatedTo, EnumSet fieldsToRetrieve) { - return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() && - !TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)); + return (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty() && !TimelineStorageUtils + .hasField(fieldsToRetrieve, Field.IS_RELATED_TO)); } /** @@ -163,19 +167,20 @@ private static boolean fetchPartialIsRelatedToCols( * relatesto and isrelatedto from info family. * * @return true, if we need to fetch only some of the columns, false if we - * need to fetch all the columns under info column family. + * need to fetch all the columns under info column family. */ protected boolean fetchPartialColsFromInfoFamily() { EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); TimelineEntityFilters filters = getFilters(); - return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) || - fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) || - fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), fieldsToRetrieve); + return fetchPartialEventCols(filters.getEventFilters(), fieldsToRetrieve) + || fetchPartialRelatesToCols(filters.getRelatesTo(), fieldsToRetrieve) + || fetchPartialIsRelatedToCols(filters.getIsRelatedTo(), + fieldsToRetrieve); } /** - * Check if we need to create filter list based on fields. We need to create - * a filter list iff all fields need not be retrieved or we have some specific + * Check if we need to create filter list based on fields. We need to create a + * filter list iff all fields need not be retrieved or we have some specific * fields or metrics to retrieve. We also need to create a filter list if we * have relationships(relatesTo/isRelatedTo) and event filters specified for * the query. @@ -188,22 +193,24 @@ protected boolean needCreateFilterListBasedOnFields() { // be retrieved, also check if we have some metrics or configs to // retrieve specified for the query because then a filter list will have // to be created. - boolean flag = !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) || - (dataToRetrieve.getConfsToRetrieve() != null && - !dataToRetrieve.getConfsToRetrieve().getFilterList().isEmpty()) || - (dataToRetrieve.getMetricsToRetrieve() != null && - !dataToRetrieve.getMetricsToRetrieve().getFilterList().isEmpty()); + boolean flag = + !dataToRetrieve.getFieldsToRetrieve().contains(Field.ALL) + || (dataToRetrieve.getConfsToRetrieve() != null && !dataToRetrieve + .getConfsToRetrieve().getFilterList().isEmpty()) + || (dataToRetrieve.getMetricsToRetrieve() != null && !dataToRetrieve + .getMetricsToRetrieve().getFilterList().isEmpty()); // Filters need to be checked only if we are reading multiple entities. If // condition above is false, we check if there are relationships(relatesTo/ // isRelatedTo) and event filters specified for the query. if (!flag && !isSingleEntityRead()) { TimelineEntityFilters filters = getFilters(); - flag = (filters.getEventFilters() != null && - !filters.getEventFilters().getFilterList().isEmpty()) || - (filters.getIsRelatedTo() != null && - !filters.getIsRelatedTo().getFilterList().isEmpty()) || - (filters.getRelatesTo() != null && - !filters.getRelatesTo().getFilterList().isEmpty()); + flag = + (filters.getEventFilters() != null && !filters.getEventFilters() + .getFilterList().isEmpty()) + || (filters.getIsRelatedTo() != null && !filters.getIsRelatedTo() + .getFilterList().isEmpty()) + || (filters.getRelatesTo() != null && !filters.getRelatesTo() + .getFilterList().isEmpty()); } return flag; } @@ -216,8 +223,8 @@ protected boolean needCreateFilterListBasedOnFields() { */ protected void updateFixedColumns(FilterList list) { for (EntityColumn column : EntityColumn.values()) { - list.addFilter(new QualifierFilter(CompareOp.EQUAL, - new BinaryComparator(column.getColumnQualifierBytes()))); + list.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator( + column.getColumnQualifierBytes()))); } } @@ -226,12 +233,11 @@ protected void updateFixedColumns(FilterList list) { * qualifiers in the info column family will be returned in result. * * @param isApplication If true, it means operations are to be performed for - * application table, otherwise for entity table. + * application table, otherwise for entity table. * @return filter list. * @throws IOException if any problem occurs while creating filter list. */ - private FilterList createFilterListForColsOfInfoFamily() - throws IOException { + private FilterList createFilterListForColsOfInfoFamily() throws IOException { FilterList infoFamilyColsFilter = new FilterList(Operator.MUST_PASS_ONE); // Add filters for each column in entity table. updateFixedColumns(infoFamilyColsFilter); @@ -239,17 +245,17 @@ private FilterList createFilterListForColsOfInfoFamily() // If INFO field has to be retrieved, add a filter for fetching columns // with INFO column prefix. if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( + infoFamilyColsFilter + .addFilter(TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.EQUAL, EntityColumnPrefix.INFO)); } TimelineFilterList relatesTo = getFilters().getRelatesTo(); if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { // If RELATES_TO field has to be retrieved, add a filter for fetching // columns with RELATES_TO column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, EntityColumnPrefix.RELATES_TO)); + infoFamilyColsFilter.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.EQUAL, + EntityColumnPrefix.RELATES_TO)); } else if (relatesTo != null && !relatesTo.getFilterList().isEmpty()) { // Even if fields to retrieve does not contain RELATES_TO, we still // need to have a filter to fetch some of the column qualifiers if @@ -257,17 +263,16 @@ private FilterList createFilterListForColsOfInfoFamily() // matched after fetching rows from HBase. Set relatesToCols = TimelineFilterUtils.fetchColumnsFromFilterList(relatesTo); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - EntityColumnPrefix.RELATES_TO, relatesToCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + EntityColumnPrefix.RELATES_TO, relatesToCols)); } TimelineFilterList isRelatedTo = getFilters().getIsRelatedTo(); if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { // If IS_RELATED_TO field has to be retrieved, add a filter for fetching // columns with IS_RELATED_TO column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.EQUAL, EntityColumnPrefix.IS_RELATED_TO)); + infoFamilyColsFilter.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.EQUAL, + EntityColumnPrefix.IS_RELATED_TO)); } else if (isRelatedTo != null && !isRelatedTo.getFilterList().isEmpty()) { // Even if fields to retrieve does not contain IS_RELATED_TO, we still // need to have a filter to fetch some of the column qualifiers if @@ -275,27 +280,25 @@ private FilterList createFilterListForColsOfInfoFamily() // matched after fetching rows from HBase. Set isRelatedToCols = TimelineFilterUtils.fetchColumnsFromFilterList(isRelatedTo); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + EntityColumnPrefix.IS_RELATED_TO, isRelatedToCols)); } TimelineFilterList eventFilters = getFilters().getEventFilters(); if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { // If EVENTS field has to be retrieved, add a filter for fetching columns // with EVENT column prefix. - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( + infoFamilyColsFilter + .addFilter(TimelineFilterUtils.createHBaseQualifierFilter( CompareOp.EQUAL, EntityColumnPrefix.EVENT)); - } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()){ + } else if (eventFilters != null && !eventFilters.getFilterList().isEmpty()) { // Even if fields to retrieve does not contain EVENTS, we still need to // have a filter to fetch some of the column qualifiers on the basis of // event filters specified. Event filters will then be matched after // fetching rows from HBase. Set eventCols = TimelineFilterUtils.fetchColumnsFromFilterList(eventFilters); - infoFamilyColsFilter.addFilter( - TimelineFilterUtils.createFiltersFromColumnQualifiers( - EntityColumnPrefix.EVENT, eventCols)); + infoFamilyColsFilter.addFilter(createFiltersFromColumnQualifiers( + EntityColumnPrefix.EVENT, eventCols)); } return infoFamilyColsFilter; } @@ -311,27 +314,27 @@ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) { EnumSet fieldsToRetrieve = getDataToRetrieve().getFieldsToRetrieve(); // Events not required. if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, EntityColumnPrefix.EVENT)); + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.EVENT)); } // info not required. if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, EntityColumnPrefix.INFO)); + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.INFO)); } // is related to not required. if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, EntityColumnPrefix.IS_RELATED_TO)); + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.IS_RELATED_TO)); } // relates to not required. if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { - infoColFamilyList.addFilter( - TimelineFilterUtils.createHBaseQualifierFilter( - CompareOp.NOT_EQUAL, EntityColumnPrefix.RELATES_TO)); + infoColFamilyList.addFilter(TimelineFilterUtils + .createHBaseQualifierFilter(CompareOp.NOT_EQUAL, + EntityColumnPrefix.RELATES_TO)); } } @@ -348,18 +351,18 @@ private void updateFilterForConfsAndMetricsToRetrieve( // CONFS to fields to retrieve in augmentParams() even if not specified. if (dataToRetrieve.getFieldsToRetrieve().contains(Field.CONFIGS)) { // Create a filter list for configs. - listBasedOnFields.addFilter(TimelineFilterUtils. - createFilterForConfsOrMetricsToRetrieve( - dataToRetrieve.getConfsToRetrieve(), - EntityColumnFamily.CONFIGS, EntityColumnPrefix.CONFIG)); + listBasedOnFields.addFilter(TimelineFilterUtils + .createFilterForConfsOrMetricsToRetrieve( + dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS, + EntityColumnPrefix.CONFIG)); } // Please note that if metricsToRetrieve is specified, we would have added // METRICS to fields to retrieve in augmentParams() even if not specified. if (dataToRetrieve.getFieldsToRetrieve().contains(Field.METRICS)) { // Create a filter list for metrics. - listBasedOnFields.addFilter(TimelineFilterUtils. - createFilterForConfsOrMetricsToRetrieve( + listBasedOnFields.addFilter(TimelineFilterUtils + .createFilterForConfsOrMetricsToRetrieve( dataToRetrieve.getMetricsToRetrieve(), EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC)); } @@ -375,8 +378,8 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { FilterList infoColFamilyList = new FilterList(); // By default fetch everything in INFO column family. FamilyFilter infoColumnFamily = - new FamilyFilter(CompareOp.EQUAL, - new BinaryComparator(EntityColumnFamily.INFO.getBytes())); + new FamilyFilter(CompareOp.EQUAL, new BinaryComparator( + EntityColumnFamily.INFO.getBytes())); infoColFamilyList.addFilter(infoColumnFamily); if (!isSingleEntityRead() && fetchPartialColsFromInfoFamily()) { // We can fetch only some of the columns from info family. @@ -394,27 +397,26 @@ protected FilterList constructFilterListBasedOnFields() throws IOException { /** * Looks up flow context from AppToFlow table. * - * @param clusterId Cluster Id. - * @param appId App Id. + * @param appToFlowRowKey to identify Cluster and App Ids. * @param hbaseConf HBase configuration. * @param conn HBase Connection. * @return flow context information. * @throws IOException if any problem occurs while fetching flow information. */ - protected FlowContext lookupFlowContext(String clusterId, String appId, + protected FlowContext lookupFlowContext(AppToFlowRowKey appToFlowRowKey, Configuration hbaseConf, Connection conn) throws IOException { - byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); + byte[] rowKey = appToFlowRowKey.getRowKey(); Get get = new Get(rowKey); Result result = appToFlowTable.getResult(hbaseConf, conn, get); if (result != null && !result.isEmpty()) { - return new FlowContext( - AppToFlowColumn.USER_ID.readResult(result).toString(), - AppToFlowColumn.FLOW_ID.readResult(result).toString(), - ((Number)AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); + return new FlowContext(AppToFlowColumn.USER_ID.readResult(result) + .toString(), AppToFlowColumn.FLOW_ID.readResult(result).toString(), + ((Number) AppToFlowColumn.FLOW_RUN_ID.readResult(result)).longValue()); } else { throw new NotFoundException( - "Unable to find the context flow ID and flow run ID for clusterId=" + - clusterId + ", appId=" + appId); + "Unable to find the context flow ID and flow run ID for clusterId=" + + appToFlowRowKey.getClusterId() + ", appId=" + + appToFlowRowKey.getAppId()); } } @@ -425,17 +427,21 @@ protected FlowContext lookupFlowContext(String clusterId, String appId, private final String userId; private final String flowName; private final Long flowRunId; + public FlowContext(String user, String flowName, Long flowRunId) { this.userId = user; this.flowName = flowName; this.flowRunId = flowRunId; } + protected String getUserId() { return userId; } + protected String getFlowName() { return flowName; } + protected Long getFlowRunId() { return flowRunId; } @@ -444,8 +450,8 @@ protected Long getFlowRunId() { @Override protected void validateParams() { Preconditions.checkNotNull(getContext(), "context shouldn't be null"); - Preconditions.checkNotNull( - getDataToRetrieve(), "data to retrieve shouldn't be null"); + Preconditions.checkNotNull(getDataToRetrieve(), + "data to retrieve shouldn't be null"); Preconditions.checkNotNull(getContext().getClusterId(), "clusterId shouldn't be null"); Preconditions.checkNotNull(getContext().getAppId(), @@ -463,11 +469,13 @@ protected void augmentParams(Configuration hbaseConf, Connection conn) throws IOException { TimelineReaderContext context = getContext(); // In reality all three should be null or neither should be null - if (context.getFlowName() == null || context.getFlowRunId() == null || - context.getUserId() == null) { + if (context.getFlowName() == null || context.getFlowRunId() == null + || context.getUserId() == null) { // Get flow context information from AppToFlow table. - FlowContext flowContext = lookupFlowContext( - context.getClusterId(), context.getAppId(), hbaseConf, conn); + AppToFlowRowKey appToFlowRowKey = + new AppToFlowRowKey(context.getClusterId(), context.getAppId()); + FlowContext flowContext = + lookupFlowContext(appToFlowRowKey, hbaseConf, conn); context.setFlowName(flowContext.flowName); context.setFlowRunId(flowContext.flowRunId); context.setUserId(flowContext.userId); @@ -497,8 +505,8 @@ protected Result getResult(Configuration hbaseConf, Connection conn, } @Override - protected ResultScanner getResults(Configuration hbaseConf, - Connection conn, FilterList filterList) throws IOException { + protected ResultScanner getResults(Configuration hbaseConf, Connection conn, + FilterList filterList) throws IOException { // Scan through part of the table to find the entities belong to one app // and one type Scan scan = new Scan(); @@ -535,18 +543,17 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // locally as relevant HBase filters to filter out rows on the basis of // isRelatedTo are not set in HBase scan. boolean checkIsRelatedTo = - !isSingleEntityRead() && filters.getIsRelatedTo() != null && - filters.getIsRelatedTo().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) || - checkIsRelatedTo) { - TimelineStorageUtils.readRelationship( - entity, result, EntityColumnPrefix.IS_RELATED_TO, true); - if (checkIsRelatedTo && !TimelineStorageUtils.matchIsRelatedTo(entity, - filters.getIsRelatedTo())) { + !isSingleEntityRead() && filters.getIsRelatedTo() != null + && filters.getIsRelatedTo().getFilterList().size() > 0; + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO) + || checkIsRelatedTo) { + readRelationship(entity, result, EntityColumnPrefix.IS_RELATED_TO, true); + if (checkIsRelatedTo + && !TimelineStorageUtils.matchIsRelatedTo(entity, + filters.getIsRelatedTo())) { return null; } - if (!TimelineStorageUtils.hasField(fieldsToRetrieve, - Field.IS_RELATED_TO)) { + if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.IS_RELATED_TO)) { entity.getIsRelatedToEntities().clear(); } } @@ -556,14 +563,14 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // locally as relevant HBase filters to filter out rows on the basis of // relatesTo are not set in HBase scan. boolean checkRelatesTo = - !isSingleEntityRead() && filters.getRelatesTo() != null && - filters.getRelatesTo().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) || - checkRelatesTo) { - TimelineStorageUtils.readRelationship( - entity, result, EntityColumnPrefix.RELATES_TO, false); - if (checkRelatesTo && !TimelineStorageUtils.matchRelatesTo(entity, - filters.getRelatesTo())) { + !isSingleEntityRead() && filters.getRelatesTo() != null + && filters.getRelatesTo().getFilterList().size() > 0; + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO) + || checkRelatesTo) { + readRelationship(entity, result, EntityColumnPrefix.RELATES_TO, false); + if (checkRelatesTo + && !TimelineStorageUtils.matchRelatesTo(entity, + filters.getRelatesTo())) { return null; } if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.RELATES_TO)) { @@ -573,14 +580,12 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // fetch info if fieldsToRetrieve contains INFO or ALL. if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.INFO)) { - TimelineStorageUtils.readKeyValuePairs( - entity, result, EntityColumnPrefix.INFO, false); + readKeyValuePairs(entity, result, EntityColumnPrefix.INFO, false); } // fetch configs if fieldsToRetrieve contains CONFIGS or ALL. if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.CONFIGS)) { - TimelineStorageUtils.readKeyValuePairs( - entity, result, EntityColumnPrefix.CONFIG, true); + readKeyValuePairs(entity, result, EntityColumnPrefix.CONFIG, true); } // fetch events and match event filters if they exist. If event filters do @@ -588,13 +593,14 @@ protected TimelineEntity parseEntity(Result result) throws IOException { // as relevant HBase filters to filter out rows on the basis of events // are not set in HBase scan. boolean checkEvents = - !isSingleEntityRead() && filters.getEventFilters() != null && - filters.getEventFilters().getFilterList().size() > 0; - if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) || - checkEvents) { + !isSingleEntityRead() && filters.getEventFilters() != null + && filters.getEventFilters().getFilterList().size() > 0; + if (TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS) + || checkEvents) { TimelineStorageUtils.readEvents(entity, result, EntityColumnPrefix.EVENT); - if (checkEvents && !TimelineStorageUtils.matchEventFilters(entity, - filters.getEventFilters())) { + if (checkEvents + && !TimelineStorageUtils.matchEventFilters(entity, + filters.getEventFilters())) { return null; } if (!TimelineStorageUtils.hasField(fieldsToRetrieve, Field.EVENTS)) { @@ -608,4 +614,89 @@ protected TimelineEntity parseEntity(Result result) throws IOException { } return entity; } + + /** + * Helper method for reading key-value pairs for either info or config. + * + * @param Describes the type of column prefix. + * @param entity entity to fill. + * @param result result from HBase. + * @param prefix column prefix. + * @param isConfig if true, means we are reading configs, otherwise info. + * @throws IOException if any problem is encountered while reading result. + */ + protected void readKeyValuePairs(TimelineEntity entity, Result result, + ColumnPrefix prefix, boolean isConfig) throws IOException { + // info and configuration are of type Map + Map columns = + prefix.readResults(result, stringKeyConverter); + if (isConfig) { + for (Map.Entry column : columns.entrySet()) { + entity.addConfig(column.getKey(), column.getValue().toString()); + } + } else { + entity.addInfo(columns); + } + } + + /** + * Helper method for reading relationship. + * + * @param Describes the type of column prefix. + * @param entity entity to fill. + * @param result result from HBase. + * @param prefix column prefix. + * @param isRelatedTo if true, means relationship is to be added to + * isRelatedTo, otherwise its added to relatesTo. + * @throws IOException if any problem is encountered while reading result. + */ + protected void readRelationship(TimelineEntity entity, Result result, + ColumnPrefix prefix, boolean isRelatedTo) throws IOException { + // isRelatedTo and relatesTo are of type Map> + Map columns = + prefix.readResults(result, stringKeyConverter); + for (Map.Entry column : columns.entrySet()) { + for (String id : Separator.VALUES.splitEncoded(column.getValue() + .toString())) { + if (isRelatedTo) { + entity.addIsRelatedToEntity(column.getKey(), id); + } else { + entity.addRelatesToEntity(column.getKey(), id); + } + } + } + } + + public byte[] createColQualifierPrefix(ColumnPrefix colPrefix, + String column) { + if (colPrefix == ApplicationColumnPrefix.EVENT + || colPrefix == EntityColumnPrefix.EVENT) { + return EventColumnNameConverter.getInstance().encode( + new EventColumnName(column, null, null)); + } else { + return stringKeyConverter.encode(column); + } + } + + /** + * Create a filter list of qualifier filters based on passed set of columns. + * + * @param Describes the type of column prefix. + * @param colPrefix Column Prefix. + * @param columns set of column qualifiers. + * @return filter list. + */ + protected FilterList createFiltersFromColumnQualifiers( + ColumnPrefix colPrefix, Set columns) { + FilterList list = new FilterList(Operator.MUST_PASS_ONE); + for (String column : columns) { + // For columns which have compound column qualifiers (eg. events), we need + // to include the required separator. + byte[] compoundColQual = createColQualifierPrefix(colPrefix, column); + list.addFilter(new QualifierFilter(CompareOp.EQUAL, + new BinaryPrefixComparator(colPrefix + .getColumnPrefixBytes(compoundColQual)))); + } + return list; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java index be27643..ca40965 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; /** @@ -68,6 +69,17 @@ private boolean sortedKeys = false; /** + * Used to convert strings key components to and from storage format. + */ + protected final StringKeyConverter stringKeyConverter = + new StringKeyConverter(); + + /** + * Used to convert Long key components to and from storage format. + */ + protected final LongKeyConverter longKeyConverter = new LongKeyConverter(); + + /** * Instantiates a reader for multiple-entity reads. * * @param ctxt Reader context which defines the scope in which query has to be @@ -331,7 +343,7 @@ protected void readMetrics(TimelineEntity entity, Result result, ColumnPrefix columnPrefix) throws IOException { NavigableMap> metricsResult = columnPrefix.readResultsWithTimestamps( - result, StringKeyConverter.getInstance()); + result, stringKeyConverter); for (Map.Entry> metricResult: metricsResult.entrySet()) { TimelineMetric metric = new TimelineMetric(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java index 74e4b5d..644d674 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestKeyConverters.java @@ -117,19 +117,22 @@ public void testFlowActivityRowKeyConverter() { @Test public void testFlowRunRowKeyConverter() { - byte[] byteRowKey = FlowRunRowKeyConverter.getInstance().encode( - new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, FLOW_RUN_ID)); - FlowRunRowKey rowKey = - FlowRunRowKeyConverter.getInstance().decode(byteRowKey); + FlowRunRowKeyConverter flowRunRowKeyConverter = new FlowRunRowKeyConverter(); + byte[] byteRowKey = + flowRunRowKeyConverter.encode(new FlowRunRowKey(CLUSTER, USER, + FLOW_NAME, FLOW_RUN_ID)); + FlowRunRowKey rowKey = flowRunRowKeyConverter.decode(byteRowKey); assertEquals(CLUSTER, rowKey.getClusterId()); assertEquals(USER, rowKey.getUserId()); assertEquals(FLOW_NAME, rowKey.getFlowName()); assertEquals(FLOW_RUN_ID, rowKey.getFlowRunId()); - byte[] byteRowKeyPrefix = FlowRunRowKeyConverter.getInstance().encode( - new FlowRunRowKey(CLUSTER, USER, FLOW_NAME, null)); - byte[][] splits = Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, + byte[] byteRowKeyPrefix = + flowRunRowKeyConverter.encode(new FlowRunRowKey(CLUSTER, USER, + FLOW_NAME, null)); + byte[][] splits = + Separator.QUALIFIERS.split(byteRowKeyPrefix, new int[] { + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE }); assertEquals(4, splits.length); assertEquals(0, splits[3].length); @@ -206,8 +209,7 @@ public void testEntityRowKeyConverter() { Separator.VARIABLE_SIZE }); assertEquals(7, splits.length); assertEquals(0, splits[6].length); - assertEquals(APPLICATION_ID, - AppIdKeyConverter.getInstance().decode(splits[4])); + assertEquals(APPLICATION_ID, new AppIdKeyConverter().decode(splits[4])); assertEquals(entityType, Separator.QUALIFIERS.decode( Bytes.toString(splits[5]))); verifyRowPrefixBytes(byteRowKeyPrefix); @@ -228,16 +230,19 @@ public void testEntityRowKeyConverter() { @Test public void testAppToFlowRowKeyConverter() { - byte[] byteRowKey = AppToFlowRowKeyConverter.getInstance().encode( - new AppToFlowRowKey(CLUSTER, APPLICATION_ID)); - AppToFlowRowKey rowKey = - AppToFlowRowKeyConverter.getInstance().decode(byteRowKey); + AppToFlowRowKeyConverter appToFlowRowKeyConverter = + new AppToFlowRowKeyConverter(); + byte[] byteRowKey = + appToFlowRowKeyConverter.encode(new AppToFlowRowKey(CLUSTER, + APPLICATION_ID)); + AppToFlowRowKey rowKey = appToFlowRowKeyConverter.decode(byteRowKey); assertEquals(CLUSTER, rowKey.getClusterId()); assertEquals(APPLICATION_ID, rowKey.getAppId()); } @Test public void testAppIdKeyConverter() { + AppIdKeyConverter appIdKeyConverter = new AppIdKeyConverter(); long currentTs = System.currentTimeMillis(); ApplicationId appId1 = ApplicationId.newInstance(currentTs, 1); ApplicationId appId2 = ApplicationId.newInstance(currentTs, 2); @@ -245,9 +250,9 @@ public void testAppIdKeyConverter() { String appIdStr1 = appId1.toString(); String appIdStr2 = appId2.toString(); String appIdStr3 = appId3.toString(); - byte[] appIdBytes1 = AppIdKeyConverter.getInstance().encode(appIdStr1); - byte[] appIdBytes2 = AppIdKeyConverter.getInstance().encode(appIdStr2); - byte[] appIdBytes3 = AppIdKeyConverter.getInstance().encode(appIdStr3); + byte[] appIdBytes1 = appIdKeyConverter.encode(appIdStr1); + byte[] appIdBytes2 = appIdKeyConverter.encode(appIdStr2); + byte[] appIdBytes3 = appIdKeyConverter.encode(appIdStr3); // App ids' should be encoded in a manner wherein descending order // is maintained. assertTrue("Ordering of app ids' is incorrect",