diff --git hadoop-project/pom.xml hadoop-project/pom.xml
index 66d2eb4..e40c93d 100644
--- hadoop-project/pom.xml
+++ hadoop-project/pom.xml
@@ -49,8 +49,8 @@
2.11.0
0.8.2.1
- 1.2.6
- 2.5.1
+ 2.0.0-alpha3
+ 3.0.0-alpha4
11.0.2
${project.version}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
index ffba1fe..21283c6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
@@ -145,6 +145,12 @@
test
+
+ org.eclipse.jetty
+ jetty-http
+ 9.3.19.v20170502
+
+
com.sun.jersey
jersey-client
@@ -222,6 +228,10 @@
org.apache.hadoop
hadoop-mapreduce-client-core
+
+ org.eclipse.jetty
+ jetty-http
+
@@ -255,6 +265,10 @@
org.apache.hadoop
hadoop-mapreduce-client-core
+
+ org.eclipse.jetty
+ jetty-http
+
@@ -293,6 +307,25 @@
test
+
+
+ org.apache.hadoop
+ hadoop-hdfs-client
+ ${hbase-compatible-hadoop.version}
+ test
+
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs-client
+ ${hbase-compatible-hadoop.version}
+ test
+ test-jar
+
+
org.apache.hbase
hbase-testing-util
@@ -335,6 +368,12 @@
jetty-webapp
test
+
+
+
+ org.mockito
+ mockito-all
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index 0ef8260..d9f5359 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -31,11 +31,13 @@
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
@@ -384,13 +386,10 @@ private void checkFlowRunTable(String cluster, String user, String flow,
private FlowScanner getFlowScannerForTestingCompaction() {
// create a FlowScanner object with the sole purpose of invoking a process
// summation;
- CompactionRequest request = new CompactionRequest();
- request.setIsMajor(true, true);
// okay to pass in nulls for the constructor arguments
// because all we want to do is invoke the process summation
FlowScanner fs = new FlowScanner(null, null,
- (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
- : FlowScannerOperation.MINOR_COMPACTION));
+ FlowScannerOperation.MAJOR_COMPACTION);
assertNotNull(fs);
return fs;
}
@@ -415,40 +414,40 @@ public void checkProcessSummationMoreCellsSumFinal2()
SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
List tags = new ArrayList<>();
- Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_1234588888_91188");
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
currentColumnCells.add(c1);
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_12700000001_29102");
tags.add(t);
- tagByteArray = Tag.fromList(tags);
+ tagByteArray = TagUtil.fromList(tags);
// create a cell with a recent timestamp and attribute SUM_FINAL
Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_191780000000001_8195");
tags.add(t);
- tagByteArray = Tag.fromList(tags);
+ tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c3 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
currentColumnCells.add(c3);
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_191780000000001_98104");
tags.add(t);
- tagByteArray = Tag.fromList(tags);
+ tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c4 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
@@ -515,10 +514,10 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
// insert SUM_FINAL cells
for (int i = 0; i < count; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
@@ -529,10 +528,10 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
// add SUM cells
for (int i = 0; i < count; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_1987650000" + i + "83_911" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with attribute SUM
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
@@ -606,10 +605,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
// insert SUM_FINAL cells which will expire
for (int i = 0; i < countFinal; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
@@ -620,10 +619,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
// insert SUM_FINAL cells which will NOT expire
for (int i = 0; i < countFinalNotExpire; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
@@ -634,10 +633,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
// add SUM cells
for (int i = 0; i < countNotFinal; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_1987650000" + i + "83_911" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with attribute SUM
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
@@ -689,10 +688,10 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException {
long cellValue2 = 28L;
List tags = new ArrayList<>();
- Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_1234588888_999888");
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
@@ -701,10 +700,10 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException {
currentColumnCells.add(c1);
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_100000000001_119101");
tags.add(t);
- tagByteArray = Tag.fromList(tags);
+ tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
@@ -747,10 +746,10 @@ public void testProcessSummationOneCellSumFinal() throws IOException {
// note down the current timestamp
long currentTimestamp = System.currentTimeMillis();
List tags = new ArrayList<>();
- Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123458888888_999888");
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp
@@ -785,10 +784,10 @@ public void testProcessSummationOneCell() throws IOException {
// try for 1 cell with tag SUM
List tags = new ArrayList<>();
- Tag t = new Tag(AggregationOperation.SUM.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_123458888888_999888");
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
index f200e9a..944fdeb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
@@ -115,7 +115,12 @@
org.apache.hadoop
hadoop-mapreduce-client-core
+
+ org.jruby.jcodings
+ jcodings
+
+
@@ -150,6 +155,18 @@
org.mortbay.jetty
jetty-sslengine
+
+ org.glassfish.jersey.core
+ jersey-common
+
+
+ org.eclipse.jetty
+ jetty-security
+
+
+ org.eclipse.jetty
+ jetty-http
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
index cf469a5..c4aaee6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
@@ -70,4 +70,14 @@ public long getWriteBufferSize() {
return bufferedMutator.getWriteBufferSize();
}
+ @Override
+ public void setRpcTimeout(int i) {
+ bufferedMutator.setRpcTimeout(i);
+ }
+
+ @Override
+ public void setOperationTimeout(int i) {
+ bufferedMutator.setOperationTimeout(i);
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
index c115b18..f1aabaf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
@@ -25,12 +25,14 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Query;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -141,7 +143,7 @@ public static Tag getTagFromAttribute(Map.Entry attribute) {
AggregationOperation aggOp = AggregationOperation
.getAggregationOperation(attribute.getKey());
if (aggOp != null) {
- Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
+ Tag t = new ArrayBackedTag(aggOp.getTagType(), attribute.getValue());
return t;
}
@@ -149,7 +151,7 @@ public static Tag getTagFromAttribute(Map.Entry attribute) {
AggregationCompactionDimension.getAggregationCompactionDimension(
attribute.getKey());
if (aggCompactDim != null) {
- Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
+ Tag t = new ArrayBackedTag(aggCompactDim.getTagType(), attribute.getValue());
return t;
}
return null;
@@ -199,7 +201,7 @@ public static String getAggregationCompactionDimension(List tags) {
for (Tag t : tags) {
if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
.getType()) {
- appId = Bytes.toString(t.getValue());
+ appId = Bytes.toString(TagUtil.cloneValue(t));
return appId;
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 359eec9..e504036 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -30,22 +30,23 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.slf4j.Logger;
@@ -54,7 +55,7 @@
/**
* Coprocessor for flow run table.
*/
-public class FlowRunCoprocessor extends BaseRegionObserver {
+public class FlowRunCoprocessor implements RegionObserver {
private static final Logger LOG =
LoggerFactory.getLogger(FlowRunCoprocessor.class);
@@ -100,7 +101,7 @@ public void prePut(ObserverContext e, Put put,
Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute);
tags.add(t);
}
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
NavigableMap> newFamilyMap = new TreeMap<>(
Bytes.BYTES_COMPARATOR);
for (Map.Entry> entry : put.getFamilyCellMap()
@@ -259,17 +260,16 @@ public void postFlush(ObserverContext c,
@Override
public InternalScanner preCompact(
ObserverContext e, Store store,
- InternalScanner scanner, ScanType scanType, CompactionRequest request)
+ InternalScanner scanner, ScanType scanType,
+ CompactionLifeCycleTracker tracker)
throws IOException {
- FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
- if (request != null) {
- requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
- : FlowScannerOperation.MINOR_COMPACTION);
- LOG.info("Compactionrequest= " + request.toString() + " "
- + requestOp.toString() + " RegionName=" + e.getEnvironment()
- .getRegion().getRegionInfo().getRegionNameAsString());
- }
+ FlowScannerOperation requestOp = scanType == ScanType.COMPACT_RETAIN_DELETES ?
+ FlowScannerOperation.MINOR_COMPACTION :
+ FlowScannerOperation.MAJOR_COMPACTION;
+ LOG.info("Compactionrequest= " + scanType + " "
+ + requestOp.toString() + " RegionName=" + e.getEnvironment()
+ .getRegion().getRegionInfo().getRegionNameAsString());
return new FlowScanner(e.getEnvironment(), scanner, requestOp);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index dbd0484..e2a26e4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -28,11 +28,13 @@
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -247,7 +249,7 @@ private boolean nextInternal(List cells, ScannerContext scannerContext)
}
private AggregationOperation getCurrentAggOp(Cell cell) {
- List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ List tags = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
// We assume that all the operations for a particular column are the same
return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags);
@@ -322,7 +324,7 @@ private void collectCells(SortedSet currentColumnCells,
}
// only if this app has not been seen yet, add to current column cells
- List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ List tags = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
String aggDim = HBaseTimelineStorageUtils
.getAggregationCompactionDimension(tags);
@@ -460,7 +462,7 @@ private Cell processSummation(SortedSet currentColumnCells,
for (Cell cell : currentColumnCells) {
AggregationOperation cellAggOp = getCurrentAggOp(cell);
// if this is the existing flow sum cell
- List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ List tags = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
String appId = HBaseTimelineStorageUtils
.getAggregationCompactionDimension(tags);
@@ -497,13 +499,13 @@ private Cell processSummation(SortedSet| currentColumnCells,
if (summationDone) {
Cell anyCell = currentColumnCells.first();
List tags = new ArrayList();
- Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
- t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
+ t = new ArrayBackedTag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
Cell sumCell = HBaseTimelineStorageUtils.createNewCell(
CellUtil.cloneRow(anyCell),
CellUtil.cloneFamily(anyCell),
| | | | | | | | | | | |