diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java index b185448..f5865ce 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServicesHBaseStorage.java @@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; import org.junit.After; import org.junit.AfterClass; @@ -78,7 +78,7 @@ private static HBaseTestingUtility util; private static long ts = System.currentTimeMillis(); private static long dayTs = - TimelineStorageUtils.getTopOfTheDayTimestamp(ts); + HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts); @BeforeClass public static void setup() throws Exception { @@ -963,7 +963,7 @@ public void testGetFlows() throws Exception { assertEquals(1, entities.size()); long firstFlowActivity = - TimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L); + HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(1425016501000L); DateFormat fmt = TimelineReaderWebServices.DATE_FORMAT.get(); uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 1906574..0dcc357 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -172,7 +172,7 @@ public void testWriteFlowRunMinMax() throws Exception { assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(flow, flowActivityRowKey.getFlowName()); - Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs); + Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(minStartTs); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(1, values.size()); checkFlowActivityRunId(runid, flowVersion, values); @@ -303,7 +303,7 @@ private void checkFlowActivityTable(String cluster, String user, String flow, assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(flow, flowActivityRowKey.getFlowName()); - Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); + Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); assertEquals(1, values.size()); checkFlowActivityRunId(runid, flowVersion, values); @@ -388,7 +388,7 @@ public void testFlowActivityTableOneFlowMultipleRunIds() throws IOException { assertEquals(user, flowActivity.getUser()); assertEquals(flow, flowActivity.getFlowName()); long dayTs = - TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); + HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); assertEquals(dayTs, flowActivity.getDate().getTime()); Set flowRuns = flowActivity.getFlowRuns(); assertEquals(3, flowRuns.size()); @@ -443,7 +443,7 @@ private void checkFlowActivityTableSeveralRuns(String cluster, String user, assertEquals(cluster, flowActivityRowKey.getClusterId()); assertEquals(user, flowActivityRowKey.getUserId()); assertEquals(flow, flowActivityRowKey.getFlowName()); - Long dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); + Long dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(appCreatedTime); assertEquals(dayTs, flowActivityRowKey.getDayTimestamp()); Map values = result diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 74b9e50..dff1b33 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -62,7 +62,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -109,7 +109,7 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + assertTrue(HBaseTimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } } @@ -124,7 +124,7 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + assertFalse(HBaseTimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } } @@ -139,7 +139,7 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); List regions = server.getOnlineRegions(table); for (Region region : regions) { - assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + assertFalse(HBaseTimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 30940886..f4aa958 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; import org.junit.AfterClass; import org.junit.Assert; @@ -417,7 +417,7 @@ public void checkProcessSummationMoreCellsSumFinal2() tags.add(t); byte[] tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL - Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray); currentColumnCells.add(c1); @@ -427,7 +427,7 @@ public void checkProcessSummationMoreCellsSumFinal2() tags.add(t); tagByteArray = Tag.fromList(tags); // create a cell with a recent timestamp and attribute SUM_FINAL - Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray); currentColumnCells.add(c2); @@ -437,7 +437,7 @@ public void checkProcessSummationMoreCellsSumFinal2() tags.add(t); tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp but has attribute SUM - Cell c3 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + Cell c3 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray); currentColumnCells.add(c3); @@ -447,7 +447,7 @@ public void checkProcessSummationMoreCellsSumFinal2() tags.add(t); tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp but has attribute SUM - Cell c4 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + Cell c4 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); currentColumnCells.add(c4); @@ -517,7 +517,7 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { tags.add(t); byte[] tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); currentColumnCells.add(c1); cellTsFinal++; @@ -531,7 +531,7 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { tags.add(t); byte[] tagByteArray = Tag.fromList(tags); // create a cell with attribute SUM - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); currentColumnCells.add(c1); cellTsNotFinal++; @@ -608,7 +608,7 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() tags.add(t); byte[] tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); currentColumnCells.add(c1); cellTsFinal++; @@ -622,7 +622,7 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() tags.add(t); byte[] tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray); currentColumnCells.add(c1); cellTsFinalNotExpire++; @@ -636,7 +636,7 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() tags.add(t); byte[] tagByteArray = Tag.fromList(tags); // create a cell with attribute SUM - c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); currentColumnCells.add(c1); cellTsNotFinal++; @@ -693,7 +693,7 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException { SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); // create a cell with a VERY old timestamp and attribute SUM_FINAL - Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, 120L, Bytes.toBytes(cellValue1), tagByteArray); currentColumnCells.add(c1); @@ -704,7 +704,7 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException { tagByteArray = Tag.fromList(tags); // create a cell with a VERY old timestamp but has attribute SUM - Cell c2 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, 130L, Bytes.toBytes(cellValue2), tagByteArray); currentColumnCells.add(c2); List cells = fs.processSummationMajorCompaction(currentColumnCells, @@ -751,7 +751,7 @@ public void testProcessSummationOneCellSumFinal() throws IOException { SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); // create a cell with a VERY old timestamp - Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, 120L, Bytes.toBytes(1110L), tagByteArray); currentColumnCells.add(c1); @@ -789,7 +789,7 @@ public void testProcessSummationOneCell() throws IOException { SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); - Cell c1 = TimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, + Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, currentTimestamp, Bytes.toBytes(1110L), tagByteArray); currentColumnCells.add(c1); List cells = fs.processSummationMajorCompaction(currentColumnCells, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java index 4cb46e6..dd402bc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java @@ -54,7 +54,7 @@ public AppIdKeyConverter() { byte[] clusterTs = Bytes.toBytes( LongConverter.invertLong(appId.getClusterTimestamp())); System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG); - byte[] seqId = Bytes.toBytes(TimelineStorageUtils.invertInt(appId.getId())); + byte[] seqId = Bytes.toBytes(HBaseTimelineStorageUtils.invertInt(appId.getId())); System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT); return appIdBytes; } @@ -79,7 +79,7 @@ public String decode(byte[] appIdBytes) { } long clusterTs = LongConverter.invertLong( Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG)); - int seqId = TimelineStorageUtils.invertInt( + int seqId = HBaseTimelineStorageUtils.invertInt( Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT)); return ApplicationId.newInstance(clusterTs, seqId).toString(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java new file mode 100644 index 0000000..a9bf76b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java @@ -0,0 +1,222 @@ +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public final class HBaseTimelineStorageUtils{ + + private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class); + + /** milliseconds in one day. */ + public static final long MILLIS_ONE_DAY = 86400000L; + + private HBaseTimelineStorageUtils() { + } + + /** + * Combines the input array of attributes and the input aggregation operation + * into a new array of attributes. + * + * @param attributes Attributes to be combined. + * @param aggOp Aggregation operation. + * @return array of combined attributes. + */ + public static Attribute[] combineAttributes(Attribute[] attributes, + AggregationOperation aggOp) { + int newLength = getNewLengthCombinedAttributes(attributes, aggOp); + Attribute[] combinedAttributes = new Attribute[newLength]; + + if (attributes != null) { + System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length); + } + + if (aggOp != null) { + Attribute a2 = aggOp.getAttribute(); + combinedAttributes[newLength - 1] = a2; + } + return combinedAttributes; + } + + /** + * Returns a number for the new array size. The new array is the combination + * of input array of attributes and the input aggregation operation. + * + * @param attributes Attributes. + * @param aggOp Aggregation operation. + * @return the size for the new array + */ + private static int getNewLengthCombinedAttributes(Attribute[] attributes, + AggregationOperation aggOp) { + int oldLength = getAttributesLength(attributes); + int aggLength = getAppOpLength(aggOp); + return oldLength + aggLength; + } + + private static int getAppOpLength(AggregationOperation aggOp) { + if (aggOp != null) { + return 1; + } + return 0; + } + + private static int getAttributesLength(Attribute[] attributes) { + if (attributes != null) { + return attributes.length; + } + return 0; + } + + /** + * Returns the first seen aggregation operation as seen in the list of input + * tags or null otherwise. + * + * @param tags list of HBase tags. + * @return AggregationOperation + */ + public static AggregationOperation getAggregationOperationFromTagsList( + List tags) { + for (AggregationOperation aggOp : AggregationOperation.values()) { + for (Tag tag : tags) { + if (tag.getType() == aggOp.getTagType()) { + return aggOp; + } + } + } + return null; + } + + /** + * Creates a {@link Tag} from the input attribute. + * + * @param attribute Attribute from which tag has to be fetched. + * @return a HBase Tag. + */ + public static Tag getTagFromAttribute(Map.Entry attribute) { + // attribute could be either an Aggregation Operation or + // an Aggregation Dimension + // Get the Tag type from either + AggregationOperation aggOp = AggregationOperation + .getAggregationOperation(attribute.getKey()); + if (aggOp != null) { + Tag t = new Tag(aggOp.getTagType(), attribute.getValue()); + return t; + } + + AggregationCompactionDimension aggCompactDim = + AggregationCompactionDimension.getAggregationCompactionDimension( + attribute.getKey()); + if (aggCompactDim != null) { + Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue()); + return t; + } + return null; + } + + /** + * creates a new cell based on the input cell but with the new value. + * + * @param origCell Original cell + * @param newValue new cell value + * @return cell + * @throws IOException while creating new cell. + */ + public static Cell createNewCell(Cell origCell, byte[] newValue) + throws IOException { + return CellUtil.createCell(CellUtil.cloneRow(origCell), + CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), + origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); + } + + /** + * creates a cell with the given inputs. + * + * @param row row of the cell to be created + * @param family column family name of the new cell + * @param qualifier qualifier for the new cell + * @param ts timestamp of the new cell + * @param newValue value of the new cell + * @param tags tags in the new cell + * @return cell + * @throws IOException while creating the cell. + */ + public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier, + long ts, byte[] newValue, byte[] tags) throws IOException { + return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put, + newValue, tags); + } + + /** + * returns app id from the list of tags. + * + * @param tags cell tags to be looked into + * @return App Id as the AggregationCompactionDimension + */ + public static String getAggregationCompactionDimension(List tags) { + String appId = null; + for (Tag t : tags) { + if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t + .getType()) { + appId = Bytes.toString(t.getValue()); + return appId; + } + } + return appId; + } + + public static boolean isFlowRunTable(HRegionInfo hRegionInfo, + Configuration conf) { + String regionTableName = hRegionInfo.getTable().getNameAsString(); + String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME, + FlowRunTable.DEFAULT_TABLE_NAME); + if (LOG.isDebugEnabled()) { + LOG.debug("regionTableName=" + regionTableName); + } + if (flowRunTableName.equalsIgnoreCase(regionTableName)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" table is the flow run table!! " + flowRunTableName); + } + return true; + } + return false; + } + + /** + * Converts an int into it's inverse int to be used in (row) keys + * where we want to have the largest int value in the top of the table + * (scans start at the largest int first). + * + * @param key value to be inverted so that the latest version will be first in + * a scan. + * @return inverted int + */ + public static int invertInt(int key) { + return Integer.MAX_VALUE - key; + } + + /** + * returns the timestamp of that day's start (which is midnight 00:00:00 AM) + * for a given input timestamp. + * + * @param ts Timestamp. + * @return timestamp of that day's beginning (midnight) + */ + public static long getTopOfTheDayTimestamp(long ts) { + long dayTimestamp = ts - (ts % MILLIS_ONE_DAY); + return dayTimestamp; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index aa9a793..7f7d640 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -20,22 +20,11 @@ import java.io.IOException; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; @@ -47,10 +36,6 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; /** * A bunch of utility functions used across TimelineReader and TimelineWriter. @@ -61,135 +46,6 @@ private TimelineStorageUtils() { } - private static final Log LOG = LogFactory.getLog(TimelineStorageUtils.class); - - /** milliseconds in one day. */ - public static final long MILLIS_ONE_DAY = 86400000L; - - /** - * Converts an int into it's inverse int to be used in (row) keys - * where we want to have the largest int value in the top of the table - * (scans start at the largest int first). - * - * @param key value to be inverted so that the latest version will be first in - * a scan. - * @return inverted int - */ - public static int invertInt(int key) { - return Integer.MAX_VALUE - key; - } - - /** - * returns the timestamp of that day's start (which is midnight 00:00:00 AM) - * for a given input timestamp. - * - * @param ts Timestamp. - * @return timestamp of that day's beginning (midnight) - */ - public static long getTopOfTheDayTimestamp(long ts) { - long dayTimestamp = ts - (ts % MILLIS_ONE_DAY); - return dayTimestamp; - } - - /** - * Combines the input array of attributes and the input aggregation operation - * into a new array of attributes. - * - * @param attributes Attributes to be combined. - * @param aggOp Aggregation operation. - * @return array of combined attributes. - */ - public static Attribute[] combineAttributes(Attribute[] attributes, - AggregationOperation aggOp) { - int newLength = getNewLengthCombinedAttributes(attributes, aggOp); - Attribute[] combinedAttributes = new Attribute[newLength]; - - if (attributes != null) { - System.arraycopy(attributes, 0, combinedAttributes, 0, attributes.length); - } - - if (aggOp != null) { - Attribute a2 = aggOp.getAttribute(); - combinedAttributes[newLength - 1] = a2; - } - return combinedAttributes; - } - - /** - * Returns a number for the new array size. The new array is the combination - * of input array of attributes and the input aggregation operation. - * - * @param attributes Attributes. - * @param aggOp Aggregation operation. - * @return the size for the new array - */ - private static int getNewLengthCombinedAttributes(Attribute[] attributes, - AggregationOperation aggOp) { - int oldLength = getAttributesLength(attributes); - int aggLength = getAppOpLength(aggOp); - return oldLength + aggLength; - } - - private static int getAppOpLength(AggregationOperation aggOp) { - if (aggOp != null) { - return 1; - } - return 0; - } - - private static int getAttributesLength(Attribute[] attributes) { - if (attributes != null) { - return attributes.length; - } - return 0; - } - - /** - * Returns the first seen aggregation operation as seen in the list of input - * tags or null otherwise. - * - * @param tags list of HBase tags. - * @return AggregationOperation - */ - public static AggregationOperation getAggregationOperationFromTagsList( - List tags) { - for (AggregationOperation aggOp : AggregationOperation.values()) { - for (Tag tag : tags) { - if (tag.getType() == aggOp.getTagType()) { - return aggOp; - } - } - } - return null; - } - - /** - * Creates a {@link Tag} from the input attribute. - * - * @param attribute Attribute from which tag has to be fetched. - * @return a HBase Tag. - */ - public static Tag getTagFromAttribute(Entry attribute) { - // attribute could be either an Aggregation Operation or - // an Aggregation Dimension - // Get the Tag type from either - AggregationOperation aggOp = AggregationOperation - .getAggregationOperation(attribute.getKey()); - if (aggOp != null) { - Tag t = new Tag(aggOp.getTagType(), attribute.getValue()); - return t; - } - - AggregationCompactionDimension aggCompactDim = - AggregationCompactionDimension.getAggregationCompactionDimension( - attribute.getKey()); - if (aggCompactDim != null) { - Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue()); - return t; - } - return null; - } - /** * Matches key-values filter. Used for relatesTo/isRelatedTo filters. * @@ -516,71 +372,4 @@ public static boolean isIntegralValue(Object obj) { (obj instanceof Long); } - /** - * creates a new cell based on the input cell but with the new value. - * - * @param origCell Original cell - * @param newValue new cell value - * @return cell - * @throws IOException while creating new cell. - */ - public static Cell createNewCell(Cell origCell, byte[] newValue) - throws IOException { - return CellUtil.createCell(CellUtil.cloneRow(origCell), - CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), - origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); - } - - /** - * creates a cell with the given inputs. - * - * @param row row of the cell to be created - * @param family column family name of the new cell - * @param qualifier qualifier for the new cell - * @param ts timestamp of the new cell - * @param newValue value of the new cell - * @param tags tags in the new cell - * @return cell - * @throws IOException while creating the cell. - */ - public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier, - long ts, byte[] newValue, byte[] tags) throws IOException { - return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put, - newValue, tags); - } - - /** - * returns app id from the list of tags. - * - * @param tags cell tags to be looked into - * @return App Id as the AggregationCompactionDimension - */ - public static String getAggregationCompactionDimension(List tags) { - String appId = null; - for (Tag t : tags) { - if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t - .getType()) { - appId = Bytes.toString(t.getValue()); - return appId; - } - } - return appId; - } - - public static boolean isFlowRunTable(HRegionInfo hRegionInfo, - Configuration conf) { - String regionTableName = hRegionInfo.getTable().getNameAsString(); - String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME, - FlowRunTable.DEFAULT_TABLE_NAME); - if (LOG.isDebugEnabled()) { - LOG.debug("regionTableName=" + regionTableName); - } - if (flowRunTableName.equalsIgnoreCase(regionTableName)) { - if (LOG.isDebugEnabled()) { - LOG.debug(" table is the flow run table!! " + flowRunTableName); - } - return true; - } - return false; - } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java index 71c3d90..5251afc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityColumnPrefix.java @@ -26,9 +26,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; @@ -144,7 +144,7 @@ public void store(byte[] rowKey, } byte[] columnQualifier = getColumnPrefixBytes(qualifier); - Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( + Attribute[] combinedAttributes = HBaseTimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, combinedAttributes); @@ -269,7 +269,7 @@ public void store(byte[] rowKey, } byte[] columnQualifier = getColumnPrefixBytes(qualifier); - Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( + Attribute[] combinedAttributes = HBaseTimelineStorageUtils.combineAttributes( attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, null, inputValue, combinedAttributes); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java index d10608a..bb77e36 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowActivityRowKey.java @@ -18,10 +18,10 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; /** * Represents a rowkey for the flow activity table. @@ -59,7 +59,7 @@ protected FlowActivityRowKey(String clusterId, Long timestamp, String userId, String flowName, boolean convertDayTsToTopOfDay) { this.clusterId = clusterId; if (convertDayTsToTopOfDay && (timestamp != null)) { - this.dayTs = TimelineStorageUtils.getTopOfTheDayTimestamp(timestamp); + this.dayTs = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(timestamp); } else { this.dayTs = timestamp; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java index 2e7a9d8..5a7fb90 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumn.java @@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; @@ -113,7 +113,7 @@ public void store(byte[] rowKey, TypedBufferedMutator tableMutator, Long timestamp, Object inputValue, Attribute... attributes) throws IOException { - Attribute[] combinedAttributes = TimelineStorageUtils.combineAttributes( + Attribute[] combinedAttributes = HBaseTimelineStorageUtils.combineAttributes( attributes, aggOp); column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, inputValue, combinedAttributes); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java index e74282a..278d18e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunColumnPrefix.java @@ -26,10 +26,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; @@ -136,7 +136,7 @@ public void store(byte[] rowKey, byte[] columnQualifier = getColumnPrefixBytes(qualifier); Attribute[] combinedAttributes = - TimelineStorageUtils.combineAttributes(attributes, this.aggOp); + HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, combinedAttributes); } @@ -163,7 +163,7 @@ public void store(byte[] rowKey, byte[] columnQualifier = getColumnPrefixBytes(qualifier); Attribute[] combinedAttributes = - TimelineStorageUtils.combineAttributes(attributes, this.aggOp); + HBaseTimelineStorageUtils.combineAttributes(attributes, this.aggOp); column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, combinedAttributes); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index a9dcfaa..2be6ef8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; /** @@ -71,7 +71,7 @@ public void start(CoprocessorEnvironment e) throws IOException { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; this.region = env.getRegion(); - isFlowRunRegion = TimelineStorageUtils.isFlowRunTable( + isFlowRunRegion = HBaseTimelineStorageUtils.isFlowRunTable( region.getRegionInfo(), env.getConfiguration()); } } @@ -107,7 +107,7 @@ public void prePut(ObserverContext e, Put put, List tags = new ArrayList<>(); if ((attributes != null) && (attributes.size() > 0)) { for (Map.Entry attribute : attributes.entrySet()) { - Tag t = TimelineStorageUtils.getTagFromAttribute(attribute); + Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute); tags.add(t); } byte[] tagByteArray = Tag.fromList(tags); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 6e67722..527043e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.NumericValueConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; @@ -249,7 +249,7 @@ private AggregationOperation getCurrentAggOp(Cell cell) { List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); // We assume that all the operations for a particular column are the same - return TimelineStorageUtils.getAggregationOperationFromTagsList(tags); + return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags); } /** @@ -323,7 +323,7 @@ private void collectCells(SortedSet currentColumnCells, // only if this app has not been seen yet, add to current column cells List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); - String aggDim = TimelineStorageUtils + String aggDim = HBaseTimelineStorageUtils .getAggregationCompactionDimension(tags); if (!alreadySeenAggDim.contains(aggDim)) { // if this agg dimension has already been seen, @@ -418,7 +418,7 @@ private Cell processSummation(SortedSet currentColumnCells, sum = converter.add(sum, currentValue); } byte[] sumBytes = converter.encodeValue(sum); - Cell sumCell = TimelineStorageUtils.createNewCell(mostRecentCell, sumBytes); + Cell sumCell = HBaseTimelineStorageUtils.createNewCell(mostRecentCell, sumBytes); return sumCell; } @@ -460,7 +460,7 @@ private Cell processSummation(SortedSet currentColumnCells, // if this is the existing flow sum cell List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); - String appId = TimelineStorageUtils + String appId = HBaseTimelineStorageUtils .getAggregationCompactionDimension(tags); if (appId == FLOW_APP_ID) { sum = converter.add(sum, currentValue); @@ -502,7 +502,7 @@ private Cell processSummation(SortedSet currentColumnCells, Bytes.toBytes(FLOW_APP_ID)); tags.add(t); byte[] tagByteArray = Tag.fromList(tags); - Cell sumCell = TimelineStorageUtils.createNewCell( + Cell sumCell = HBaseTimelineStorageUtils.createNewCell( CellUtil.cloneRow(anyCell), CellUtil.cloneFamily(anyCell), CellUtil.cloneQualifier(anyCell), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java index 368b060..5beb189 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TestRowKeys.java @@ -185,7 +185,7 @@ public void testEntityRowKey() { @Test public void testFlowActivityRowKey() { Long ts = 1459900830000L; - Long dayTimestamp = TimelineStorageUtils.getTopOfTheDayTimestamp(ts); + Long dayTimestamp = HBaseTimelineStorageUtils.getTopOfTheDayTimestamp(ts); byte[] byteRowKey = new FlowActivityRowKey(CLUSTER, ts, USER, FLOW_NAME).getRowKey(); FlowActivityRowKey rowKey = FlowActivityRowKey.parseRowKey(byteRowKey);