diff --git hadoop-project/pom.xml hadoop-project/pom.xml
index 04b93c4..d44abc8 100644
--- hadoop-project/pom.xml
+++ hadoop-project/pom.xml
@@ -49,8 +49,8 @@
2.11.0
0.8.2.1
- 1.2.6
- 2.5.1
+ 2.0.0-alpha4
+ 3.0.0-alpha4
11.0.2
${project.version}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
index ffba1fe..ab4bd45 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
@@ -145,6 +145,15 @@
test
+
+
+ org.eclipse.jetty
+ jetty-http
+ 9.3.19.v20170502
+ test
+
+
com.sun.jersey
jersey-client
@@ -190,6 +199,10 @@
org.apache.hadoop
hadoop-mapreduce-client-core
+
+ org.jruby.jcodings
+ jcodings
+
@@ -222,6 +235,10 @@
org.apache.hadoop
hadoop-mapreduce-client-core
+
+ org.eclipse.jetty
+ jetty-http
+
@@ -255,6 +272,14 @@
org.apache.hadoop
hadoop-mapreduce-client-core
+
+ org.eclipse.jetty
+ jetty-http
+
+
+ org.glassfish
+ javax.el
+
@@ -293,6 +318,25 @@
test
+
+
+ org.apache.hadoop
+ hadoop-hdfs-client
+ ${hbase-compatible-hadoop.version}
+ test
+
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs-client
+ ${hbase-compatible-hadoop.version}
+ test
+ test-jar
+
+
org.apache.hbase
hbase-testing-util
@@ -335,6 +379,31 @@
jetty-webapp
test
+
+
+
+
+ org.mockito
+ mockito-all
+ test
+
+
+
+
+
+ org.glassfish
+ javax.el
+ 3.0.0
+ test
+
+
+
+
+ io.netty
+ netty-all
+ 4.1.5.Final
+
@@ -347,6 +416,20 @@
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+
+
+ org.apache.hadoop.hbase.shaded.
+
+
+
+
+
maven-jar-plugin
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index 1ad02e1..b8dc87a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -40,8 +40,8 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
@@ -129,8 +129,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
private void checkCoprocessorExists(TableName table, boolean exists)
throws IOException, InterruptedException {
HRegionServer server = util.getRSForFirstRegionInTable(table);
- List regions = server.getOnlineRegions(table);
- for (Region region : regions) {
+ List regions = server.getRegions(table);
+ for (HRegion region : regions) {
boolean found = false;
Set coprocs = region.getCoprocessorHost().getCoprocessors();
for (String coprocName : coprocs) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index 0ef8260..d0fe571 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -31,11 +31,13 @@
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
@@ -44,9 +46,8 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@@ -326,12 +327,12 @@ public void testWriteFlowRunCompaction() throws Exception {
HRegionServer server = util.getRSForFirstRegionInTable(
BaseTable.getTableName(c1, FlowRunTable.TABLE_NAME_CONF_NAME,
FlowRunTable.DEFAULT_TABLE_NAME));
- List regions = server.getOnlineRegions(BaseTable.getTableName(c1,
+ List regions = server.getRegions(BaseTable.getTableName(c1,
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
assertTrue("Didn't find any regions for primary table!",
regions.size() > 0);
// flush and compact all the regions of the primary table
- for (Region region : regions) {
+ for (HRegion region : regions) {
region.flush(true);
region.compact(true);
}
@@ -384,13 +385,10 @@ private void checkFlowRunTable(String cluster, String user, String flow,
private FlowScanner getFlowScannerForTestingCompaction() {
// create a FlowScanner object with the sole purpose of invoking a process
// summation;
- CompactionRequest request = new CompactionRequest();
- request.setIsMajor(true, true);
// okay to pass in nulls for the constructor arguments
// because all we want to do is invoke the process summation
FlowScanner fs = new FlowScanner(null, null,
- (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
- : FlowScannerOperation.MINOR_COMPACTION));
+ FlowScannerOperation.MAJOR_COMPACTION);
assertNotNull(fs);
return fs;
}
@@ -415,40 +413,40 @@ public void checkProcessSummationMoreCellsSumFinal2()
SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
List tags = new ArrayList<>();
- Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_1234588888_91188");
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
currentColumnCells.add(c1);
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_12700000001_29102");
tags.add(t);
- tagByteArray = Tag.fromList(tags);
+ tagByteArray = TagUtil.fromList(tags);
// create a cell with a recent timestamp and attribute SUM_FINAL
Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_191780000000001_8195");
tags.add(t);
- tagByteArray = Tag.fromList(tags);
+ tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c3 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
currentColumnCells.add(c3);
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_191780000000001_98104");
tags.add(t);
- tagByteArray = Tag.fromList(tags);
+ tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c4 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
@@ -515,10 +513,10 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
// insert SUM_FINAL cells
for (int i = 0; i < count; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
@@ -529,10 +527,10 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
// add SUM cells
for (int i = 0; i < count; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_1987650000" + i + "83_911" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with attribute SUM
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
@@ -606,10 +604,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
// insert SUM_FINAL cells which will expire
for (int i = 0; i < countFinal; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
@@ -620,10 +618,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
// insert SUM_FINAL cells which will NOT expire
for (int i = 0; i < countFinalNotExpire; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
@@ -634,10 +632,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
// add SUM cells
for (int i = 0; i < countNotFinal; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_1987650000" + i + "83_911" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with attribute SUM
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
@@ -689,10 +687,10 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException {
long cellValue2 = 28L;
List tags = new ArrayList<>();
- Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_1234588888_999888");
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
@@ -701,10 +699,10 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException {
currentColumnCells.add(c1);
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_100000000001_119101");
tags.add(t);
- tagByteArray = Tag.fromList(tags);
+ tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
@@ -747,10 +745,10 @@ public void testProcessSummationOneCellSumFinal() throws IOException {
// note down the current timestamp
long currentTimestamp = System.currentTimeMillis();
List tags = new ArrayList<>();
- Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123458888888_999888");
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp
@@ -785,10 +783,10 @@ public void testProcessSummationOneCell() throws IOException {
// try for 1 cell with tag SUM
List tags = new ArrayList<>();
- Tag t = new Tag(AggregationOperation.SUM.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_123458888888_999888");
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
index f200e9a..2ccd8ad 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
@@ -115,6 +115,10 @@
org.apache.hadoop
hadoop-mapreduce-client-core
+
+ org.jruby.jcodings
+ jcodings
+
@@ -150,9 +154,40 @@
org.mortbay.jetty
jetty-sslengine
+
+ org.eclipse.jetty
+ jetty-http
+
+
+ org.eclipse.jetty
+ jetty-security
+
+
+ org.glassfish.jersey.core
+ jersey-common
+
+
+ org.glassfish
+ javax.el
+
+
+
+ org.glassfish.jersey.core
+ jersey-common
+ 2.25.1
+
+
+
+
+
+ org.glassfish
+ javax.el
+ 3.0.0
+
+
junit
junit
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
index cf469a5..f53aac9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
@@ -42,32 +42,49 @@ public BufferedMutatorDelegator(BufferedMutator bufferedMutator) {
this.bufferedMutator = bufferedMutator;
}
+ @Override
public TableName getName() {
return bufferedMutator.getName();
}
+ @Override
public Configuration getConfiguration() {
return bufferedMutator.getConfiguration();
}
+ @Override
public void mutate(Mutation mutation) throws IOException {
bufferedMutator.mutate(mutation);
}
+ @Override
public void mutate(List extends Mutation> mutations) throws IOException {
bufferedMutator.mutate(mutations);
}
+ @Override
public void close() throws IOException {
bufferedMutator.close();
}
+ @Override
public void flush() throws IOException {
bufferedMutator.flush();
}
+ @Override
public long getWriteBufferSize() {
return bufferedMutator.getWriteBufferSize();
}
+ @Override
+ public void setRpcTimeout(int i) {
+ bufferedMutator.setRpcTimeout(i);
+ }
+
+ @Override
+ public void setOperationTimeout(int i) {
+ bufferedMutator.setOperationTimeout(i);
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
index c115b18..b69ef5f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
@@ -25,12 +25,14 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Query;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -141,7 +143,7 @@ public static Tag getTagFromAttribute(Map.Entry attribute) {
AggregationOperation aggOp = AggregationOperation
.getAggregationOperation(attribute.getKey());
if (aggOp != null) {
- Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
+ Tag t = new ArrayBackedTag(aggOp.getTagType(), attribute.getValue());
return t;
}
@@ -149,7 +151,8 @@ public static Tag getTagFromAttribute(Map.Entry attribute) {
AggregationCompactionDimension.getAggregationCompactionDimension(
attribute.getKey());
if (aggCompactDim != null) {
- Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
+ Tag t = new ArrayBackedTag(aggCompactDim.getTagType(),
+ attribute.getValue());
return t;
}
return null;
@@ -199,7 +202,7 @@ public static String getAggregationCompactionDimension(List tags) {
for (Tag t : tags) {
if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
.getType()) {
- appId = Bytes.toString(t.getValue());
+ appId = Bytes.toString(TagUtil.cloneValue(t));
return appId;
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 359eec9..f320c55 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Optional;
import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
@@ -30,22 +31,25 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.slf4j.Logger;
@@ -54,7 +58,7 @@
/**
* Coprocessor for flow run table.
*/
-public class FlowRunCoprocessor extends BaseRegionObserver {
+public class FlowRunCoprocessor implements RegionObserver, RegionCoprocessor{
private static final Logger LOG =
LoggerFactory.getLogger(FlowRunCoprocessor.class);
@@ -74,6 +78,11 @@ public void start(CoprocessorEnvironment e) throws IOException {
}
}
+ @Override
+ public Optional getRegionObserver() {
+ return Optional.of(this);
+ }
+
/*
* (non-Javadoc)
*
@@ -86,12 +95,12 @@ public void start(CoprocessorEnvironment e) throws IOException {
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache
* .hadoop.hbase.coprocessor.ObserverContext,
* org.apache.hadoop.hbase.client.Put,
- * org.apache.hadoop.hbase.regionserver.wal.WALEdit,
+ * org.apache.hadoop.hbase.wal.WALEdit,
* org.apache.hadoop.hbase.client.Durability)
*/
@Override
public void prePut(ObserverContext e, Put put,
- WALEdit edit, Durability durability) throws IOException {
+ WALEdit edit, Durability durability) throws IOException {
Map attributes = put.getAttributesMap();
// Assumption is that all the cells in a put are the same operation.
List tags = new ArrayList<>();
@@ -100,7 +109,7 @@ public void prePut(ObserverContext e, Put put,
Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute);
tags.add(t);
}
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
NavigableMap> newFamilyMap = new TreeMap<>(
Bytes.BYTES_COMPARATOR);
for (Map.Entry> entry : put.getFamilyCellMap()
@@ -184,17 +193,15 @@ public void preGetOp(ObserverContext e,
* @see
* org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org
* .apache.hadoop.hbase.coprocessor.ObserverContext,
- * org.apache.hadoop.hbase.client.Scan,
- * org.apache.hadoop.hbase.regionserver.RegionScanner)
+ * org.apache.hadoop.hbase.client.Scan)
*/
@Override
- public RegionScanner preScannerOpen(
- ObserverContext e, Scan scan,
- RegionScanner scanner) throws IOException {
+ public void preScannerOpen(
+ ObserverContext e, Scan scan)
+ throws IOException {
// set max versions for scan to see all
// versions to aggregate for metrics
scan.setMaxVersions();
- return scanner;
}
/*
@@ -220,7 +227,8 @@ public RegionScanner postScannerOpen(
@Override
public InternalScanner preFlush(
ObserverContext c, Store store,
- InternalScanner scanner) throws IOException {
+ InternalScanner scanner, FlushLifeCycleTracker cycleTracker)
+ throws IOException {
if (LOG.isDebugEnabled()) {
if (store != null) {
LOG.debug("preFlush store = " + store.getColumnFamilyName()
@@ -228,8 +236,7 @@ public InternalScanner preFlush(
+ " flushedCellsCount=" + store.getFlushedCellsCount()
+ " compactedCellsCount=" + store.getCompactedCellsCount()
+ " majorCompactedCellsCount="
- + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
- + store.getMemstoreFlushSize() + " memstoreSize="
+ + store.getMajorCompactedCellsCount() + " memstoreSize="
+ store.getMemStoreSize() + " size=" + store.getSize()
+ " storeFilesCount=" + store.getStorefilesCount());
}
@@ -239,27 +246,10 @@ public InternalScanner preFlush(
}
@Override
- public void postFlush(ObserverContext c,
- Store store, StoreFile resultFile) {
- if (LOG.isDebugEnabled()) {
- if (store != null) {
- LOG.debug("postFlush store = " + store.getColumnFamilyName()
- + " flushableSize=" + store.getFlushableSize()
- + " flushedCellsCount=" + store.getFlushedCellsCount()
- + " compactedCellsCount=" + store.getCompactedCellsCount()
- + " majorCompactedCellsCount="
- + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
- + store.getMemstoreFlushSize() + " memstoreSize="
- + store.getMemStoreSize() + " size=" + store.getSize()
- + " storeFilesCount=" + store.getStorefilesCount());
- }
- }
- }
-
- @Override
public InternalScanner preCompact(
ObserverContext e, Store store,
- InternalScanner scanner, ScanType scanType, CompactionRequest request)
+ InternalScanner scanner, ScanType scanType,
+ CompactionLifeCycleTracker tracker, CompactionRequest request)
throws IOException {
FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index dbd0484..080a356 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -28,11 +28,13 @@
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -120,7 +122,7 @@
*/
@Override
public HRegionInfo getRegionInfo() {
- return region.getRegionInfo();
+ return new HRegionInfo(region.getRegionInfo());
}
@Override
@@ -247,7 +249,7 @@ private boolean nextInternal(List cells, ScannerContext scannerContext)
}
private AggregationOperation getCurrentAggOp(Cell cell) {
- List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ List tags = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
// We assume that all the operations for a particular column are the same
return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags);
@@ -322,7 +324,7 @@ private void collectCells(SortedSet currentColumnCells,
}
// only if this app has not been seen yet, add to current column cells
- List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ List tags = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
String aggDim = HBaseTimelineStorageUtils
.getAggregationCompactionDimension(tags);
@@ -460,7 +462,7 @@ private Cell processSummation(SortedSet currentColumnCells,
for (Cell cell : currentColumnCells) {
AggregationOperation cellAggOp = getCurrentAggOp(cell);
// if this is the existing flow sum cell
- List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ List tags = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
String appId = HBaseTimelineStorageUtils
.getAggregationCompactionDimension(tags);
@@ -497,13 +499,14 @@ private Cell processSummation(SortedSet| currentColumnCells,
if (summationDone) {
Cell anyCell = currentColumnCells.first();
List tags = new ArrayList();
- Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
- t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
+ t = new ArrayBackedTag(
+ AggregationCompactionDimension.APPLICATION_ID.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
Cell sumCell = HBaseTimelineStorageUtils.createNewCell(
CellUtil.cloneRow(anyCell),
CellUtil.cloneFamily(anyCell),
| | | | | | | | | | | |