diff --git hadoop-project/pom.xml hadoop-project/pom.xml
index c4dc1bf..3e897d7 100644
--- hadoop-project/pom.xml
+++ hadoop-project/pom.xml
@@ -49,8 +49,8 @@
2.11.0
0.8.2.1
- 1.2.6
- 2.5.1
+ 2.0.0-beta-1.SNAPSHOT
+ 3.0.0-alpha4
11.0.2
${project.version}
@@ -876,6 +876,11 @@
2.6
+ commons-lang
+ commons-lang3
+ 3.6
+
+
commons-collections
commons-collections
3.2.2
@@ -1647,7 +1652,7 @@
- true
+ false
@@ -1821,10 +1826,5 @@
-
- dynamodb-local-oregon
- DynamoDB Local Release Repository
- https://s3-us-west-2.amazonaws.com/dynamodb-local/release
-
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
index ffba1fe..cc7a11f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml
@@ -145,6 +145,12 @@
test
+
+ org.eclipse.jetty
+ jetty-http
+ 9.3.19.v20170502
+
+
com.sun.jersey
jersey-client
@@ -222,6 +228,10 @@
org.apache.hadoop
hadoop-mapreduce-client-core
+
+ org.eclipse.jetty
+ jetty-http
+
@@ -255,6 +265,10 @@
org.apache.hadoop
hadoop-mapreduce-client-core
+
+ org.eclipse.jetty
+ jetty-http
+
@@ -293,6 +307,25 @@
test
+
+
+ org.apache.hadoop
+ hadoop-hdfs-client
+ ${hbase-compatible-hadoop.version}
+ test
+
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs-client
+ ${hbase-compatible-hadoop.version}
+ test
+ test-jar
+
+
org.apache.hbase
hbase-testing-util
@@ -335,8 +368,26 @@
jetty-webapp
test
+
+
+ org.mockito
+ mockito-all
+
+
+
+ org.apache.commons
+ commons-lang3
+
+
+
+ io.netty
+ netty-all
+ 4.1.5.Final
+
+
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index 1ad02e1..0877e38 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
@@ -129,8 +130,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
private void checkCoprocessorExists(TableName table, boolean exists)
throws IOException, InterruptedException {
HRegionServer server = util.getRSForFirstRegionInTable(table);
- List regions = server.getOnlineRegions(table);
- for (Region region : regions) {
+ List regions = server.getRegions(table);
+ for (HRegion region : regions) {
boolean found = false;
Set coprocs = region.getCoprocessorHost().getCoprocessors();
for (String coprocName : coprocs) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index 0ef8260..aa54e44 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -31,11 +31,13 @@
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
@@ -44,6 +46,7 @@
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -326,12 +329,12 @@ public void testWriteFlowRunCompaction() throws Exception {
HRegionServer server = util.getRSForFirstRegionInTable(
BaseTable.getTableName(c1, FlowRunTable.TABLE_NAME_CONF_NAME,
FlowRunTable.DEFAULT_TABLE_NAME));
- List regions = server.getOnlineRegions(BaseTable.getTableName(c1,
+ List regions = server.getRegions(BaseTable.getTableName(c1,
FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME));
assertTrue("Didn't find any regions for primary table!",
regions.size() > 0);
// flush and compact all the regions of the primary table
- for (Region region : regions) {
+ for (HRegion region : regions) {
region.flush(true);
region.compact(true);
}
@@ -384,13 +387,10 @@ private void checkFlowRunTable(String cluster, String user, String flow,
private FlowScanner getFlowScannerForTestingCompaction() {
// create a FlowScanner object with the sole purpose of invoking a process
// summation;
- CompactionRequest request = new CompactionRequest();
- request.setIsMajor(true, true);
// okay to pass in nulls for the constructor arguments
// because all we want to do is invoke the process summation
FlowScanner fs = new FlowScanner(null, null,
- (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
- : FlowScannerOperation.MINOR_COMPACTION));
+ FlowScannerOperation.MAJOR_COMPACTION);
assertNotNull(fs);
return fs;
}
@@ -415,40 +415,40 @@ public void checkProcessSummationMoreCellsSumFinal2()
SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
List tags = new ArrayList<>();
- Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_1234588888_91188");
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray);
currentColumnCells.add(c1);
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_12700000001_29102");
tags.add(t);
- tagByteArray = Tag.fromList(tags);
+ tagByteArray = TagUtil.fromList(tags);
// create a cell with a recent timestamp and attribute SUM_FINAL
Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray);
currentColumnCells.add(c2);
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_191780000000001_8195");
tags.add(t);
- tagByteArray = Tag.fromList(tags);
+ tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c3 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray);
currentColumnCells.add(c3);
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_191780000000001_98104");
tags.add(t);
- tagByteArray = Tag.fromList(tags);
+ tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c4 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray);
@@ -515,10 +515,10 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
// insert SUM_FINAL cells
for (int i = 0; i < count; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
@@ -529,10 +529,10 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException {
// add SUM cells
for (int i = 0; i < count; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_1987650000" + i + "83_911" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with attribute SUM
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
@@ -606,10 +606,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
// insert SUM_FINAL cells which will expire
for (int i = 0; i < countFinal; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray);
@@ -620,10 +620,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
// insert SUM_FINAL cells which will NOT expire
for (int i = 0; i < countFinalNotExpire; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123450000" + i + "01_19" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray);
@@ -634,10 +634,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags()
// add SUM cells
for (int i = 0; i < countNotFinal; i++) {
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_1987650000" + i + "83_911" + i);
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
// create a cell with attribute SUM
c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier,
cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray);
@@ -689,10 +689,10 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException {
long cellValue2 = 28L;
List tags = new ArrayList<>();
- Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_1234588888_999888");
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp and attribute SUM_FINAL
@@ -701,10 +701,10 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException {
currentColumnCells.add(c1);
tags = new ArrayList<>();
- t = new Tag(AggregationOperation.SUM.getTagType(),
+ t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_100000000001_119101");
tags.add(t);
- tagByteArray = Tag.fromList(tags);
+ tagByteArray = TagUtil.fromList(tags);
// create a cell with a VERY old timestamp but has attribute SUM
Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily,
@@ -747,10 +747,10 @@ public void testProcessSummationOneCellSumFinal() throws IOException {
// note down the current timestamp
long currentTimestamp = System.currentTimeMillis();
List tags = new ArrayList<>();
- Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
"application_123458888888_999888");
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
// create a cell with a VERY old timestamp
@@ -785,10 +785,10 @@ public void testProcessSummationOneCell() throws IOException {
// try for 1 cell with tag SUM
List tags = new ArrayList<>();
- Tag t = new Tag(AggregationOperation.SUM.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(),
"application_123458888888_999888");
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
index f200e9a..4a32785 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml
@@ -115,8 +115,34 @@
org.apache.hadoop
hadoop-mapreduce-client-core
+
+ org.jruby.jcodings
+ jcodings
+
+
+
+ org.glassfish.jersey.core
+ jersey-common
+ 2.25.1
+
+
+ org.javassist
+ javassist
+
+
+
+
+ org.glassfish
+ javax.el
+ 3.0.0
+
+
+ javassist
+ org.javassist
+ 3.18.1-GA
+
org.apache.hbase
@@ -150,6 +176,22 @@
org.mortbay.jetty
jetty-sslengine
+
+ org.glassfish.jersey.core
+ jersey-common
+
+
+ org.eclipse.jetty
+ jetty-security
+
+
+ org.eclipse.jetty
+ jetty-http
+
+
+ org.glassfish
+ javax.el
+
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
index cf469a5..c4aaee6 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java
@@ -70,4 +70,14 @@ public long getWriteBufferSize() {
return bufferedMutator.getWriteBufferSize();
}
+ @Override
+ public void setRpcTimeout(int i) {
+ bufferedMutator.setRpcTimeout(i);
+ }
+
+ @Override
+ public void setOperationTimeout(int i) {
+ bufferedMutator.setOperationTimeout(i);
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
index c115b18..f1aabaf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java
@@ -25,12 +25,14 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Query;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -141,7 +143,7 @@ public static Tag getTagFromAttribute(Map.Entry attribute) {
AggregationOperation aggOp = AggregationOperation
.getAggregationOperation(attribute.getKey());
if (aggOp != null) {
- Tag t = new Tag(aggOp.getTagType(), attribute.getValue());
+ Tag t = new ArrayBackedTag(aggOp.getTagType(), attribute.getValue());
return t;
}
@@ -149,7 +151,7 @@ public static Tag getTagFromAttribute(Map.Entry attribute) {
AggregationCompactionDimension.getAggregationCompactionDimension(
attribute.getKey());
if (aggCompactDim != null) {
- Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue());
+ Tag t = new ArrayBackedTag(aggCompactDim.getTagType(), attribute.getValue());
return t;
}
return null;
@@ -199,7 +201,7 @@ public static String getAggregationCompactionDimension(List tags) {
for (Tag t : tags) {
if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t
.getType()) {
- appId = Bytes.toString(t.getValue());
+ appId = Bytes.toString(TagUtil.cloneValue(t));
return appId;
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 359eec9..b50ac62 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Optional;
import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
@@ -30,22 +31,25 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator;
import org.slf4j.Logger;
@@ -54,7 +58,7 @@
/**
* Coprocessor for flow run table.
*/
-public class FlowRunCoprocessor extends BaseRegionObserver {
+public class FlowRunCoprocessor implements RegionObserver, RegionCoprocessor{
private static final Logger LOG =
LoggerFactory.getLogger(FlowRunCoprocessor.class);
@@ -74,6 +78,12 @@ public void start(CoprocessorEnvironment e) throws IOException {
}
}
+ @Override
+ public Optional getRegionObserver() {
+ return Optional.of(this);
+ }
+
+
/*
* (non-Javadoc)
*
@@ -91,7 +101,7 @@ public void start(CoprocessorEnvironment e) throws IOException {
*/
@Override
public void prePut(ObserverContext e, Put put,
- WALEdit edit, Durability durability) throws IOException {
+ WALEdit edit, Durability durability) throws IOException {
Map attributes = put.getAttributesMap();
// Assumption is that all the cells in a put are the same operation.
List tags = new ArrayList<>();
@@ -100,7 +110,7 @@ public void prePut(ObserverContext e, Put put,
Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute);
tags.add(t);
}
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
NavigableMap> newFamilyMap = new TreeMap<>(
Bytes.BYTES_COMPARATOR);
for (Map.Entry> entry : put.getFamilyCellMap()
@@ -188,13 +198,12 @@ public void preGetOp(ObserverContext e,
* org.apache.hadoop.hbase.regionserver.RegionScanner)
*/
@Override
- public RegionScanner preScannerOpen(
- ObserverContext e, Scan scan,
- RegionScanner scanner) throws IOException {
+ public void preScannerOpen(
+ ObserverContext e, Scan scan)
+ throws IOException {
// set max versions for scan to see all
// versions to aggregate for metrics
scan.setMaxVersions();
- return scanner;
}
/*
@@ -220,7 +229,8 @@ public RegionScanner postScannerOpen(
@Override
public InternalScanner preFlush(
ObserverContext c, Store store,
- InternalScanner scanner) throws IOException {
+ InternalScanner scanner, FlushLifeCycleTracker cycleTracker)
+ throws IOException {
if (LOG.isDebugEnabled()) {
if (store != null) {
LOG.debug("preFlush store = " + store.getColumnFamilyName()
@@ -228,8 +238,7 @@ public InternalScanner preFlush(
+ " flushedCellsCount=" + store.getFlushedCellsCount()
+ " compactedCellsCount=" + store.getCompactedCellsCount()
+ " majorCompactedCellsCount="
- + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
- + store.getMemstoreFlushSize() + " memstoreSize="
+ + store.getMajorCompactedCellsCount() + " memstoreSize="
+ store.getMemStoreSize() + " size=" + store.getSize()
+ " storeFilesCount=" + store.getStorefilesCount());
}
@@ -239,27 +248,10 @@ public InternalScanner preFlush(
}
@Override
- public void postFlush(ObserverContext c,
- Store store, StoreFile resultFile) {
- if (LOG.isDebugEnabled()) {
- if (store != null) {
- LOG.debug("postFlush store = " + store.getColumnFamilyName()
- + " flushableSize=" + store.getFlushableSize()
- + " flushedCellsCount=" + store.getFlushedCellsCount()
- + " compactedCellsCount=" + store.getCompactedCellsCount()
- + " majorCompactedCellsCount="
- + store.getMajorCompactedCellsCount() + " memstoreFlushSize="
- + store.getMemstoreFlushSize() + " memstoreSize="
- + store.getMemStoreSize() + " size=" + store.getSize()
- + " storeFilesCount=" + store.getStorefilesCount());
- }
- }
- }
-
- @Override
public InternalScanner preCompact(
ObserverContext e, Store store,
- InternalScanner scanner, ScanType scanType, CompactionRequest request)
+ InternalScanner scanner, ScanType scanType,
+ CompactionLifeCycleTracker tracker, CompactionRequest request)
throws IOException {
FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index dbd0484..fc801b4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -28,11 +28,13 @@
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -120,7 +122,7 @@
*/
@Override
public HRegionInfo getRegionInfo() {
- return region.getRegionInfo();
+ return new HRegionInfo(region.getRegionInfo());
}
@Override
@@ -247,7 +249,7 @@ private boolean nextInternal(List cells, ScannerContext scannerContext)
}
private AggregationOperation getCurrentAggOp(Cell cell) {
- List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ List tags = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
// We assume that all the operations for a particular column are the same
return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags);
@@ -322,7 +324,7 @@ private void collectCells(SortedSet currentColumnCells,
}
// only if this app has not been seen yet, add to current column cells
- List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ List tags = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
String aggDim = HBaseTimelineStorageUtils
.getAggregationCompactionDimension(tags);
@@ -460,7 +462,7 @@ private Cell processSummation(SortedSet currentColumnCells,
for (Cell cell : currentColumnCells) {
AggregationOperation cellAggOp = getCurrentAggOp(cell);
// if this is the existing flow sum cell
- List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
+ List tags = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
String appId = HBaseTimelineStorageUtils
.getAggregationCompactionDimension(tags);
@@ -497,13 +499,13 @@ private Cell processSummation(SortedSet| currentColumnCells,
if (summationDone) {
Cell anyCell = currentColumnCells.first();
List tags = new ArrayList();
- Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(),
+ Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
- t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
+ t = new ArrayBackedTag(AggregationCompactionDimension.APPLICATION_ID.getTagType(),
Bytes.toBytes(FLOW_APP_ID));
tags.add(t);
- byte[] tagByteArray = Tag.fromList(tags);
+ byte[] tagByteArray = TagUtil.fromList(tags);
Cell sumCell = HBaseTimelineStorageUtils.createNewCell(
CellUtil.cloneRow(anyCell),
CellUtil.cloneFamily(anyCell),
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
index 0edd6a5..9a78b8d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/ApplicationEntityReader.java
@@ -257,7 +257,8 @@ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
* @throws IOException if any problem occurs while updating filter list.
*/
private void updateFilterForConfsAndMetricsToRetrieve(
- FilterList listBasedOnFields) throws IOException {
+ FilterList listBasedOnFields, Set cfsInFields)
+ throws IOException {
TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
// Please note that if confsToRetrieve is specified, we would have added
// CONFS to fields to retrieve in augmentParams() even if not specified.
@@ -267,6 +268,7 @@ private void updateFilterForConfsAndMetricsToRetrieve(
createFilterForConfsOrMetricsToRetrieve(
dataToRetrieve.getConfsToRetrieve(),
ApplicationColumnFamily.CONFIGS, ApplicationColumnPrefix.CONFIG));
+ cfsInFields.add(new String(ApplicationColumnFamily.CONFIGS.getBytes()));
}
// Please note that if metricsToRetrieve is specified, we would have added
@@ -277,11 +279,13 @@ private void updateFilterForConfsAndMetricsToRetrieve(
createFilterForConfsOrMetricsToRetrieve(
dataToRetrieve.getMetricsToRetrieve(),
ApplicationColumnFamily.METRICS, ApplicationColumnPrefix.METRIC));
+ cfsInFields.add(new String(ApplicationColumnFamily.METRICS.getBytes()));
}
}
@Override
- protected FilterList constructFilterListBasedOnFields() throws IOException {
+ protected FilterList constructFilterListBasedOnFields(Set cfsInFields)
+ throws IOException {
if (!needCreateFilterListBasedOnFields()) {
// Fetch all the columns. No need of a filter.
return null;
@@ -302,8 +306,9 @@ protected FilterList constructFilterListBasedOnFields() throws IOException {
excludeFieldsFromInfoColFamily(infoColFamilyList);
}
listBasedOnFields.addFilter(infoColFamilyList);
+ cfsInFields.add(new String(ApplicationColumnFamily.INFO.getBytes()));
- updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
+ updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields);
return listBasedOnFields;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
index a1cdb29..f60fcf5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowActivityEntityReader.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
@@ -96,7 +97,8 @@ protected FilterList constructFilterListBasedOnFilters() throws IOException {
}
@Override
- protected FilterList constructFilterListBasedOnFields() {
+ protected FilterList constructFilterListBasedOnFields(
+ Set cfsInFields) {
return null;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
index af043b3..fad80f3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/FlowRunEntityReader.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.EnumSet;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
@@ -151,7 +152,8 @@ private FilterList updateFixedColumns() {
}
@Override
- protected FilterList constructFilterListBasedOnFields() throws IOException {
+ protected FilterList constructFilterListBasedOnFields(Set cfsInFields)
+ throws IOException {
FilterList list = new FilterList(Operator.MUST_PASS_ONE);
// By default fetch everything in INFO column family.
FamilyFilter infoColumnFamily =
@@ -165,6 +167,7 @@ protected FilterList constructFilterListBasedOnFields() throws IOException {
&& !hasField(dataToRetrieve.getFieldsToRetrieve(), Field.METRICS)) {
FilterList infoColFamilyList = new FilterList(Operator.MUST_PASS_ONE);
infoColFamilyList.addFilter(infoColumnFamily);
+ cfsInFields.add(new String(FlowRunColumnFamily.INFO.getBytes()));
infoColFamilyList.addFilter(new QualifierFilter(CompareOp.NOT_EQUAL,
new BinaryPrefixComparator(FlowRunColumnPrefix.METRIC
.getColumnPrefixBytes(""))));
@@ -181,6 +184,7 @@ protected FilterList constructFilterListBasedOnFields() throws IOException {
&& !metricsToRetrieve.getFilterList().isEmpty()) {
FilterList infoColFamilyList = new FilterList();
infoColFamilyList.addFilter(infoColumnFamily);
+ cfsInFields.add(new String(FlowRunColumnFamily.INFO.getBytes()));
FilterList columnsList = updateFixedColumns();
columnsList.addFilter(TimelineFilterUtils.createHBaseFilterList(
FlowRunColumnPrefix.METRIC, metricsToRetrieve));
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
index 3a44445..674d8d3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/GenericEntityReader.java
@@ -347,7 +347,8 @@ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
* @throws IOException if any problem occurs while updating filter list.
*/
private void updateFilterForConfsAndMetricsToRetrieve(
- FilterList listBasedOnFields) throws IOException {
+ FilterList listBasedOnFields, Set cfsInFields)
+ throws IOException {
TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
// Please note that if confsToRetrieve is specified, we would have added
// CONFS to fields to retrieve in augmentParams() even if not specified.
@@ -357,6 +358,7 @@ private void updateFilterForConfsAndMetricsToRetrieve(
.createFilterForConfsOrMetricsToRetrieve(
dataToRetrieve.getConfsToRetrieve(), EntityColumnFamily.CONFIGS,
EntityColumnPrefix.CONFIG));
+ cfsInFields.add(new String(EntityColumnFamily.CONFIGS.getBytes()));
}
// Please note that if metricsToRetrieve is specified, we would have added
@@ -367,11 +369,13 @@ private void updateFilterForConfsAndMetricsToRetrieve(
.createFilterForConfsOrMetricsToRetrieve(
dataToRetrieve.getMetricsToRetrieve(),
EntityColumnFamily.METRICS, EntityColumnPrefix.METRIC));
+ cfsInFields.add(new String(EntityColumnFamily.METRICS.getBytes()));
}
}
@Override
- protected FilterList constructFilterListBasedOnFields() throws IOException {
+ protected FilterList constructFilterListBasedOnFields(Set cfsInFields)
+ throws IOException {
if (!needCreateFilterListBasedOnFields()) {
// Fetch all the columns. No need of a filter.
return null;
@@ -392,7 +396,8 @@ protected FilterList constructFilterListBasedOnFields() throws IOException {
excludeFieldsFromInfoColFamily(infoColFamilyList);
}
listBasedOnFields.addFilter(infoColFamilyList);
- updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
+ cfsInFields.add(new String(EntityColumnFamily.INFO.getBytes()));
+ updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields);
return listBasedOnFields;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java
index e780dcc..29904df 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/SubApplicationEntityReader.java
@@ -246,7 +246,8 @@ private void excludeFieldsFromInfoColFamily(FilterList infoColFamilyList) {
* @throws IOException if any problem occurs while updating filter list.
*/
private void updateFilterForConfsAndMetricsToRetrieve(
- FilterList listBasedOnFields) throws IOException {
+ FilterList listBasedOnFields, Set cfsInFields)
+ throws IOException {
TimelineDataToRetrieve dataToRetrieve = getDataToRetrieve();
// Please note that if confsToRetrieve is specified, we would have added
// CONFS to fields to retrieve in augmentParams() even if not specified.
@@ -257,6 +258,8 @@ private void updateFilterForConfsAndMetricsToRetrieve(
dataToRetrieve.getConfsToRetrieve(),
SubApplicationColumnFamily.CONFIGS,
SubApplicationColumnPrefix.CONFIG));
+ cfsInFields.add(
+ new String(SubApplicationColumnFamily.CONFIGS.getBytes()));
}
// Please note that if metricsToRetrieve is specified, we would have added
@@ -268,11 +271,14 @@ private void updateFilterForConfsAndMetricsToRetrieve(
dataToRetrieve.getMetricsToRetrieve(),
SubApplicationColumnFamily.METRICS,
SubApplicationColumnPrefix.METRIC));
+ cfsInFields.add(
+ new String(SubApplicationColumnFamily.METRICS.getBytes()));
}
}
@Override
- protected FilterList constructFilterListBasedOnFields() throws IOException {
+ protected FilterList constructFilterListBasedOnFields(Set cfsInFields)
+ throws IOException {
if (!needCreateFilterListBasedOnFields()) {
// Fetch all the columns. No need of a filter.
return null;
@@ -292,7 +298,8 @@ protected FilterList constructFilterListBasedOnFields() throws IOException {
excludeFieldsFromInfoColFamily(infoColFamilyList);
}
listBasedOnFields.addFilter(infoColFamilyList);
- updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields);
+ cfsInFields.add(new String(SubApplicationColumnFamily.INFO.getBytes()));
+ updateFilterForConfsAndMetricsToRetrieve(listBasedOnFields, cfsInFields);
return listBasedOnFields;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
index 07e8423..e7aa5f8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/reader/TimelineEntityReader.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.timelineservice.storage.reader;
import java.io.IOException;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -30,11 +31,17 @@
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.FilterListBase;
import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
@@ -123,8 +130,8 @@ protected TimelineEntityReader(TimelineReaderContext ctxt,
* @return a {@link FilterList} object.
* @throws IOException if any problem occurs while creating filter list.
*/
- protected abstract FilterList constructFilterListBasedOnFields()
- throws IOException;
+ protected abstract FilterList constructFilterListBasedOnFields(
+ Set cfsInFields) throws IOException;
/**
* Creates a {@link FilterList} based on info, config and metric filters. This
@@ -149,7 +156,10 @@ private FilterList createFilterList() throws IOException {
FilterList listBasedOnFilters = constructFilterListBasedOnFilters();
boolean hasListBasedOnFilters = listBasedOnFilters != null &&
!listBasedOnFilters.getFilters().isEmpty();
- FilterList listBasedOnFields = constructFilterListBasedOnFields();
+
+ Set cfsInListBasedOnFields = new HashSet<>(0);
+ FilterList listBasedOnFields =
+ constructFilterListBasedOnFields(cfsInListBasedOnFields);
boolean hasListBasedOnFields = listBasedOnFields != null &&
!listBasedOnFields.getFilters().isEmpty();
// If filter lists based on both filters and fields can be created,
@@ -162,6 +172,21 @@ private FilterList createFilterList() throws IOException {
if (hasListBasedOnFilters && hasListBasedOnFields) {
FilterList list = new FilterList();
list.addFilter(listBasedOnFilters);
+
+ Set cfsInListBasedOnFilters = new HashSet<>(0);
+ extractColumnFamiliesFromFiltersBasedOnFilters(
+ listBasedOnFilters, cfsInListBasedOnFilters);
+
+ // must exclude cfs that are already covered in fields-based filters
+ // otherwise we will return the whole cf
+ cfsInListBasedOnFilters.removeAll(cfsInListBasedOnFields);
+
+ if (!cfsInListBasedOnFilters.isEmpty()) {
+ for (String cf: cfsInListBasedOnFilters) {
+ listBasedOnFields.addFilter(new FamilyFilter(CompareOp.EQUAL,
+ new BinaryComparator(Bytes.toBytes(cf))));
+ }
+ }
list.addFilter(listBasedOnFields);
return list;
} else if (hasListBasedOnFilters) {
@@ -172,6 +197,24 @@ private FilterList createFilterList() throws IOException {
return null;
}
+ private static void extractColumnFamiliesFromFiltersBasedOnFilters(
+ Filter filter, Set columnFamilies) {
+ if (filter instanceof SingleColumnValueFilter) {
+ byte[] cf = ((SingleColumnValueFilter) filter).getFamily();
+ columnFamilies.add(new String(cf));
+ } else if (filter instanceof FilterListBase) {
+ FilterListBase filterListBase = (FilterListBase) filter;
+ for (Filter fs: filterListBase.getFilters()) {
+ extractColumnFamiliesFromFiltersBasedOnFilters(fs, columnFamilies);
+ }
+ } else if (filter instanceof FilterList) {
+ FilterList filterList = (FilterList) filter;
+ for (Filter fs: filterList.getFilters()) {
+ extractColumnFamiliesFromFiltersBasedOnFilters(fs, columnFamilies);
+ }
+ }
+ }
+
protected TimelineDataToRetrieve getDataToRetrieve() {
return dataToRetrieve;
}
@@ -204,7 +247,7 @@ public TimelineEntity readEntity(Configuration hbaseConf, Connection conn)
validateParams();
augmentParams(hbaseConf, conn);
- FilterList filterList = constructFilterListBasedOnFields();
+ FilterList filterList = constructFilterListBasedOnFields(new HashSet<>(0));
if (LOG.isDebugEnabled() && filterList != null) {
LOG.debug("FilterList created for get is - " + filterList);
}
diff --git pom.xml pom.xml
index f0b3c8e..2b2db43 100644
--- pom.xml
+++ pom.xml
@@ -56,12 +56,13 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
${distMgmtSnapshotsId}
${distMgmtSnapshotsName}
${distMgmtSnapshotsUrl}
+ true
repository.jboss.org
http://repository.jboss.org/nexus/content/groups/public/
- false
+ true
| | | | | | | | | | | |