diff --git hadoop-project/pom.xml hadoop-project/pom.xml index f99e413..918533b 100644 --- hadoop-project/pom.xml +++ hadoop-project/pom.xml @@ -49,8 +49,8 @@ 2.11.0 0.8.2.1 - 1.2.6 - 2.5.1 + 2.0.0-beta-1.SNAPSHOT + 3.0.0-alpha4 11.0.2 ${project.version} @@ -876,6 +876,11 @@ 2.6 + commons-lang + commons-lang3 + 3.6 + + commons-collections commons-collections 3.2.2 @@ -1646,7 +1651,7 @@ - true + false @@ -1820,10 +1825,5 @@ - - dynamodb-local-oregon - DynamoDB Local Release Repository - https://s3-us-west-2.amazonaws.com/dynamodb-local/release - diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml index ffba1fe..cc7a11f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml @@ -145,6 +145,12 @@ test + + org.eclipse.jetty + jetty-http + 9.3.19.v20170502 + + com.sun.jersey jersey-client @@ -222,6 +228,10 @@ org.apache.hadoop hadoop-mapreduce-client-core + + org.eclipse.jetty + jetty-http + @@ -255,6 +265,10 @@ org.apache.hadoop hadoop-mapreduce-client-core + + org.eclipse.jetty + jetty-http + @@ -293,6 +307,25 @@ test + + + org.apache.hadoop + hadoop-hdfs-client + ${hbase-compatible-hadoop.version} + test + + + + + org.apache.hadoop + hadoop-hdfs-client + ${hbase-compatible-hadoop.version} + test + test-jar + + org.apache.hbase hbase-testing-util @@ -335,8 +368,26 @@ jetty-webapp test + + + org.mockito + mockito-all + + + + org.apache.commons + commons-lang3 + + + + io.netty + netty-all + 4.1.5.Final + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 1ad02e1..0877e38 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; @@ -129,8 +130,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { private void checkCoprocessorExists(TableName table, boolean exists) throws IOException, InterruptedException { HRegionServer server = util.getRSForFirstRegionInTable(table); - List regions = server.getOnlineRegions(table); - for (Region region : regions) { + List regions = server.getRegions(table); + for (HRegion region : regions) { boolean found = false; Set coprocs = region.getCoprocessorHost().getCoprocessors(); for (String coprocName : coprocs) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 0ef8260..aa54e44 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -31,11 +31,13 @@ import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -44,6 +46,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -326,12 +329,12 @@ public void testWriteFlowRunCompaction() throws Exception { HRegionServer server = util.getRSForFirstRegionInTable( BaseTable.getTableName(c1, FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); - List regions = server.getOnlineRegions(BaseTable.getTableName(c1, + List regions = server.getRegions(BaseTable.getTableName(c1, FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); assertTrue("Didn't find any regions for primary table!", regions.size() > 0); // flush and compact all the regions of the primary table - for (Region region : regions) { + for (HRegion region : regions) { region.flush(true); region.compact(true); } @@ -384,13 +387,10 @@ private void checkFlowRunTable(String cluster, String user, String flow, private FlowScanner getFlowScannerForTestingCompaction() { // create a FlowScanner object with the sole purpose of invoking a process // summation; - CompactionRequest request = new CompactionRequest(); - request.setIsMajor(true, true); // okay to pass in nulls for the constructor arguments // because all we want to do is invoke the process summation FlowScanner fs = new FlowScanner(null, null, - (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION - : FlowScannerOperation.MINOR_COMPACTION)); + FlowScannerOperation.MAJOR_COMPACTION); assertNotNull(fs); return fs; } @@ -415,40 +415,40 @@ public void checkProcessSummationMoreCellsSumFinal2() SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); List tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(), "application_1234588888_91188"); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = TagUtil.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL Cell c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray); currentColumnCells.add(c1); tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(), "application_12700000001_29102"); tags.add(t); - tagByteArray = Tag.fromList(tags); + tagByteArray = TagUtil.fromList(tags); // create a cell with a recent timestamp and attribute SUM_FINAL Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray); currentColumnCells.add(c2); tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(), "application_191780000000001_8195"); tags.add(t); - tagByteArray = Tag.fromList(tags); + tagByteArray = TagUtil.fromList(tags); // create a cell with a VERY old timestamp but has attribute SUM Cell c3 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray); currentColumnCells.add(c3); tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(), "application_191780000000001_98104"); tags.add(t); - tagByteArray = Tag.fromList(tags); + tagByteArray = TagUtil.fromList(tags); // create a cell with a VERY old timestamp but has attribute SUM Cell c4 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); @@ -515,10 +515,10 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { // insert SUM_FINAL cells for (int i = 0; i < count; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(), "application_123450000" + i + "01_19" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = TagUtil.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); @@ -529,10 +529,10 @@ public void checkProcessSummationMoreCellsSumFinalMany() throws IOException { // add SUM cells for (int i = 0; i < count; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(), "application_1987650000" + i + "83_911" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = TagUtil.fromList(tags); // create a cell with attribute SUM c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); @@ -606,10 +606,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() // insert SUM_FINAL cells which will expire for (int i = 0; i < countFinal; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(), "application_123450000" + i + "01_19" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = TagUtil.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); @@ -620,10 +620,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() // insert SUM_FINAL cells which will NOT expire for (int i = 0; i < countFinalNotExpire; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(), "application_123450000" + i + "01_19" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = TagUtil.fromList(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray); @@ -634,10 +634,10 @@ public void checkProcessSummationMoreCellsSumFinalVariedTags() // add SUM cells for (int i = 0; i < countNotFinal; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(), "application_1987650000" + i + "83_911" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = TagUtil.fromList(tags); // create a cell with attribute SUM c1 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); @@ -689,10 +689,10 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException { long cellValue2 = 28L; List tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(), "application_1234588888_999888"); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = TagUtil.fromList(tags); SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); // create a cell with a VERY old timestamp and attribute SUM_FINAL @@ -701,10 +701,10 @@ public void testProcessSummationMoreCellsSumFinal() throws IOException { currentColumnCells.add(c1); tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(), "application_100000000001_119101"); tags.add(t); - tagByteArray = Tag.fromList(tags); + tagByteArray = TagUtil.fromList(tags); // create a cell with a VERY old timestamp but has attribute SUM Cell c2 = HBaseTimelineStorageUtils.createNewCell(aRowKey, aFamily, @@ -747,10 +747,10 @@ public void testProcessSummationOneCellSumFinal() throws IOException { // note down the current timestamp long currentTimestamp = System.currentTimeMillis(); List tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(), "application_123458888888_999888"); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = TagUtil.fromList(tags); SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); // create a cell with a VERY old timestamp @@ -785,10 +785,10 @@ public void testProcessSummationOneCell() throws IOException { // try for 1 cell with tag SUM List tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM.getTagType(), + Tag t = new ArrayBackedTag(AggregationOperation.SUM.getTagType(), "application_123458888888_999888"); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = TagUtil.fromList(tags); SortedSet currentColumnCells = new TreeSet(KeyValue.COMPARATOR); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml index f200e9a..4a32785 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/pom.xml @@ -115,8 +115,34 @@ org.apache.hadoop hadoop-mapreduce-client-core + + org.jruby.jcodings + jcodings + + + + org.glassfish.jersey.core + jersey-common + 2.25.1 + + + org.javassist + javassist + + + + + org.glassfish + javax.el + 3.0.0 + + + javassist + org.javassist + 3.18.1-GA + org.apache.hbase @@ -150,6 +176,22 @@ org.mortbay.jetty jetty-sslengine + + org.glassfish.jersey.core + jersey-common + + + org.eclipse.jetty + jetty-security + + + org.eclipse.jetty + jetty-http + + + org.glassfish + javax.el + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java index cf469a5..c4aaee6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/BufferedMutatorDelegator.java @@ -70,4 +70,14 @@ public long getWriteBufferSize() { return bufferedMutator.getWriteBufferSize(); } + @Override + public void setRpcTimeout(int i) { + bufferedMutator.setRpcTimeout(i); + } + + @Override + public void setOperationTimeout(int i) { + bufferedMutator.setOperationTimeout(i); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java index c115b18..f1aabaf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineStorageUtils.java @@ -25,12 +25,14 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.client.Query; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -141,7 +143,7 @@ public static Tag getTagFromAttribute(Map.Entry attribute) { AggregationOperation aggOp = AggregationOperation .getAggregationOperation(attribute.getKey()); if (aggOp != null) { - Tag t = new Tag(aggOp.getTagType(), attribute.getValue()); + Tag t = new ArrayBackedTag(aggOp.getTagType(), attribute.getValue()); return t; } @@ -149,7 +151,7 @@ public static Tag getTagFromAttribute(Map.Entry attribute) { AggregationCompactionDimension.getAggregationCompactionDimension( attribute.getKey()); if (aggCompactDim != null) { - Tag t = new Tag(aggCompactDim.getTagType(), attribute.getValue()); + Tag t = new ArrayBackedTag(aggCompactDim.getTagType(), attribute.getValue()); return t; } return null; @@ -199,7 +201,7 @@ public static String getAggregationCompactionDimension(List tags) { for (Tag t : tags) { if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t .getType()) { - appId = Bytes.toString(t.getValue()); + appId = Bytes.toString(TagUtil.cloneValue(t)); return appId; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index 359eec9..b50ac62 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Optional; import java.util.TreeMap; import org.apache.hadoop.hbase.Cell; @@ -30,22 +31,25 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; import org.slf4j.Logger; @@ -54,7 +58,7 @@ /** * Coprocessor for flow run table. */ -public class FlowRunCoprocessor extends BaseRegionObserver { +public class FlowRunCoprocessor implements RegionObserver, RegionCoprocessor{ private static final Logger LOG = LoggerFactory.getLogger(FlowRunCoprocessor.class); @@ -74,6 +78,12 @@ public void start(CoprocessorEnvironment e) throws IOException { } } + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + /* * (non-Javadoc) * @@ -91,7 +101,7 @@ public void start(CoprocessorEnvironment e) throws IOException { */ @Override public void prePut(ObserverContext e, Put put, - WALEdit edit, Durability durability) throws IOException { + WALEdit edit, Durability durability) throws IOException { Map attributes = put.getAttributesMap(); // Assumption is that all the cells in a put are the same operation. List tags = new ArrayList<>(); @@ -100,7 +110,7 @@ public void prePut(ObserverContext e, Put put, Tag t = HBaseTimelineStorageUtils.getTagFromAttribute(attribute); tags.add(t); } - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = TagUtil.fromList(tags); NavigableMap> newFamilyMap = new TreeMap<>( Bytes.BYTES_COMPARATOR); for (Map.Entry> entry : put.getFamilyCellMap() @@ -188,13 +198,12 @@ public void preGetOp(ObserverContext e, * org.apache.hadoop.hbase.regionserver.RegionScanner) */ @Override - public RegionScanner preScannerOpen( - ObserverContext e, Scan scan, - RegionScanner scanner) throws IOException { + public void preScannerOpen( + ObserverContext e, Scan scan) + throws IOException { // set max versions for scan to see all // versions to aggregate for metrics scan.setMaxVersions(); - return scanner; } /* @@ -220,7 +229,8 @@ public RegionScanner postScannerOpen( @Override public InternalScanner preFlush( ObserverContext c, Store store, - InternalScanner scanner) throws IOException { + InternalScanner scanner, FlushLifeCycleTracker cycleTracker) + throws IOException { if (LOG.isDebugEnabled()) { if (store != null) { LOG.debug("preFlush store = " + store.getColumnFamilyName() @@ -228,8 +238,7 @@ public InternalScanner preFlush( + " flushedCellsCount=" + store.getFlushedCellsCount() + " compactedCellsCount=" + store.getCompactedCellsCount() + " majorCompactedCellsCount=" - + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" - + store.getMemstoreFlushSize() + " memstoreSize=" + + store.getMajorCompactedCellsCount() + " memstoreSize=" + store.getMemStoreSize() + " size=" + store.getSize() + " storeFilesCount=" + store.getStorefilesCount()); } @@ -239,27 +248,10 @@ public InternalScanner preFlush( } @Override - public void postFlush(ObserverContext c, - Store store, StoreFile resultFile) { - if (LOG.isDebugEnabled()) { - if (store != null) { - LOG.debug("postFlush store = " + store.getColumnFamilyName() - + " flushableSize=" + store.getFlushableSize() - + " flushedCellsCount=" + store.getFlushedCellsCount() - + " compactedCellsCount=" + store.getCompactedCellsCount() - + " majorCompactedCellsCount=" - + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" - + store.getMemstoreFlushSize() + " memstoreSize=" - + store.getMemStoreSize() + " size=" + store.getSize() - + " storeFilesCount=" + store.getStorefilesCount()); - } - } - } - - @Override public InternalScanner preCompact( ObserverContext e, Store store, - InternalScanner scanner, ScanType scanType, CompactionRequest request) + InternalScanner scanner, ScanType scanType, + CompactionLifeCycleTracker tracker, CompactionRequest request) throws IOException { FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index dbd0484..fc801b4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -28,11 +28,13 @@ import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -120,7 +122,7 @@ */ @Override public HRegionInfo getRegionInfo() { - return region.getRegionInfo(); + return new HRegionInfo(region.getRegionInfo()); } @Override @@ -247,7 +249,7 @@ private boolean nextInternal(List cells, ScannerContext scannerContext) } private AggregationOperation getCurrentAggOp(Cell cell) { - List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + List tags = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); // We assume that all the operations for a particular column are the same return HBaseTimelineStorageUtils.getAggregationOperationFromTagsList(tags); @@ -322,7 +324,7 @@ private void collectCells(SortedSet currentColumnCells, } // only if this app has not been seen yet, add to current column cells - List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + List tags = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); String aggDim = HBaseTimelineStorageUtils .getAggregationCompactionDimension(tags); @@ -460,7 +462,7 @@ private Cell processSummation(SortedSet currentColumnCells, for (Cell cell : currentColumnCells) { AggregationOperation cellAggOp = getCurrentAggOp(cell); // if this is the existing flow sum cell - List tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), + List tags = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); String appId = HBaseTimelineStorageUtils .getAggregationCompactionDimension(tags); @@ -497,13 +499,13 @@ private Cell processSummation(SortedSet currentColumnCells, if (summationDone) { Cell anyCell = currentColumnCells.first(); List tags = new ArrayList(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Tag t = new ArrayBackedTag(AggregationOperation.SUM_FINAL.getTagType(), Bytes.toBytes(FLOW_APP_ID)); tags.add(t); - t = new Tag(AggregationCompactionDimension.APPLICATION_ID.getTagType(), + t = new ArrayBackedTag(AggregationCompactionDimension.APPLICATION_ID.getTagType(), Bytes.toBytes(FLOW_APP_ID)); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = TagUtil.fromList(tags); Cell sumCell = HBaseTimelineStorageUtils.createNewCell( CellUtil.cloneRow(anyCell), CellUtil.cloneFamily(anyCell), diff --git pom.xml pom.xml index f0b3c8e..2b2db43 100644 --- pom.xml +++ pom.xml @@ -56,12 +56,13 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs ${distMgmtSnapshotsId} ${distMgmtSnapshotsName} ${distMgmtSnapshotsUrl} + true repository.jboss.org http://repository.jboss.org/nexus/content/groups/public/ - false + true