diff --git hadoop-project/pom.xml hadoop-project/pom.xml index a1edb17..d52bf6d 100644 --- hadoop-project/pom.xml +++ hadoop-project/pom.xml @@ -49,8 +49,8 @@ 2.11.0 0.8.2.1 - 1.0.1 - 4.5.0-SNAPSHOT + 1.1.3 + 4.7.0-HBase-1.1 2.5.1 ${project.version} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index a443b50..26aab47 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -107,8 +108,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { // check in flow run table util.waitUntilAllRegionsAssigned(table); HRegionServer server = util.getRSForFirstRegionInTable(table); - List regions = server.getOnlineRegions(table); - for (HRegion region : regions) { + List regions = server.getOnlineRegions(table); + for (Region region : regions) { assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } @@ -122,8 +123,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { // check in flow activity table util.waitUntilAllRegionsAssigned(table); HRegionServer server = util.getRSForFirstRegionInTable(table); - List regions = server.getOnlineRegions(table); - for (HRegion region : regions) { + List regions = server.getOnlineRegions(table); + for (Region region : regions) { assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } @@ -137,8 +138,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException { // check in entity run table util.waitUntilAllRegionsAssigned(table); HRegionServer server = util.getRSForFirstRegionInTable(table); - List regions = server.getOnlineRegions(table); - for (HRegion region : regions) { + List regions = server.getOnlineRegions(table); + for (Region region : regions) { assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), hbaseConf)); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 6b0ee5c..7d1195e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -60,6 +60,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; /** @@ -176,13 +177,13 @@ public void testWriteFlowRunCompaction() throws Exception { // check in flow run table HRegionServer server = util.getRSForFirstRegionInTable(TableName .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); - List regions = server.getOnlineRegions(TableName + List regions = server.getOnlineRegions(TableName .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); assertTrue("Didn't find any regions for primary table!", regions.size() > 0); // flush and compact all the regions of the primary table - for (HRegion region : regions) { - region.flushcache(); - region.compactStores(true); + for (Region region : regions) { + region.flush(true); + region.compact(true); } // check flow run for one flow many apps diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index 8ea51a1..84632d5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; @@ -59,7 +59,7 @@ private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); private boolean isFlowRunRegion = false; - private HRegion region; + private Region region; /** * generate a timestamp that is unique per row in a region this is per region. */ @@ -296,8 +296,8 @@ public InternalScanner preCompact( requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION : FlowScannerOperation.MINOR_COMPACTION); LOG.info("Compactionrequest= " + request.toString() + " " - + requestOp.toString() + " RegionName=" - + e.getEnvironment().getRegion().getRegionNameAsString()); + + requestOp.toString() + " RegionName=" + e.getEnvironment() + .getRegion().getRegionInfo().getRegionNameAsString()); } return new FlowScanner(e.getEnvironment(), -1, scanner, requestOp); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 648c77b..c1f9465 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -20,6 +20,8 @@ import java.io.Closeable; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -36,9 +38,10 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -67,10 +70,32 @@ * TimestampGenerator parses the app id to generate a cell timestamp. */ private static final String FLOW_APP_ID = "application_00000000000_0000"; + private static final Field SCANNER_CONTEXT_LIMITS_FIELD; + private static final Method GET_BATCH_METHOD; - private final HRegion region; + // The ScannerContext class has package private field for limits + // and the method to get it (called getBatch) + // So we use reflection to access these + static { + try { + Field limitsField = ScannerContext.class.getDeclaredField("limits"); + limitsField.setAccessible(true); + SCANNER_CONTEXT_LIMITS_FIELD = limitsField; + + Class limitFieldsClass = limitsField.getType(); + Method getBatchMethod = limitFieldsClass.getDeclaredMethod("getBatch"); + getBatchMethod.setAccessible(true); + GET_BATCH_METHOD = getBatchMethod; + } catch (Exception e) { + LOG.error("Failed to get ScannerContext.LimitFields.getBatch method " + + "through reflection."); + throw new IllegalStateException(e); + } + } + + private final Region region; private final InternalScanner flowRunScanner; - private final int limit; + private final int batchSize; private final long appFinalValueRetentionThreshold; private RegionScanner regionScanner; private boolean hasMore; @@ -79,9 +104,9 @@ private int currentIndex; private FlowScannerOperation action = FlowScannerOperation.READ; - FlowScanner(RegionCoprocessorEnvironment env, int limit, + FlowScanner(RegionCoprocessorEnvironment env, int batchSize, InternalScanner internalScanner, FlowScannerOperation action) { - this.limit = limit; + this.batchSize = batchSize; this.flowRunScanner = internalScanner; if (internalScanner instanceof RegionScanner) { this.regionScanner = (RegionScanner) internalScanner; @@ -112,22 +137,26 @@ public HRegionInfo getRegionInfo() { @Override public boolean nextRaw(List cells) throws IOException { - return nextRaw(cells, limit); + return nextRaw(cells, + ScannerContext.newBuilder().setBatchLimit(batchSize).build()); } @Override - public boolean nextRaw(List cells, int cellLimit) throws IOException { - return nextInternal(cells, cellLimit); + public boolean nextRaw(List cells, ScannerContext scannerContext) + throws IOException { + return nextInternal(cells, scannerContext); } @Override public boolean next(List cells) throws IOException { - return next(cells, limit); + return next(cells, + ScannerContext.newBuilder().setBatchLimit(batchSize).build()); } @Override - public boolean next(List cells, int cellLimit) throws IOException { - return nextInternal(cells, cellLimit); + public boolean next(List cells, ScannerContext scannerContext) + throws IOException { + return nextInternal(cells, scannerContext); } /** @@ -176,12 +205,12 @@ private static boolean isNumericConverter(ValueConverter converter) { * column or returns the cell as is. * * @param cells - * @param cellLimit + * @param scannerContext * @return true if next row is available for the scanner, false otherwise * @throws IOException */ @SuppressWarnings("deprecation") - private boolean nextInternal(List cells, int cellLimit) + private boolean nextInternal(List cells, ScannerContext scannerContext) throws IOException { Cell cell = null; startNext(); @@ -201,9 +230,10 @@ private boolean nextInternal(List cells, int cellLimit) int addedCnt = 0; long currentTimestamp = System.currentTimeMillis(); ValueConverter converter = null; + int limit = getBatchLimit(scannerContext); - while (cellLimit <= 0 || addedCnt < cellLimit) { - cell = peekAtNextCell(cellLimit); + while (limit <= 0 || addedCnt < limit) { + cell = peekAtNextCell(scannerContext); if (cell == null) { break; } @@ -221,12 +251,12 @@ private boolean nextInternal(List cells, int cellLimit) // No operation needs to be performed on non numeric converters. if (!isNumericConverter(converter)) { currentColumnCells.add(cell); - nextCell(cellLimit); + nextCell(scannerContext); continue; } collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim, - (NumericValueConverter)converter); - nextCell(cellLimit); + (NumericValueConverter)converter, scannerContext); + nextCell(scannerContext); } if (!currentColumnCells.isEmpty()) { addedCnt += emitCells(cells, currentColumnCells, currentAggOp, @@ -268,12 +298,14 @@ private void resetState(SortedSet currentColumnCells, private void collectCells(SortedSet currentColumnCells, AggregationOperation currentAggOp, Cell cell, - Set alreadySeenAggDim, NumericValueConverter converter) + Set alreadySeenAggDim, NumericValueConverter converter, + ScannerContext scannerContext) throws IOException { + if (currentAggOp == null) { // not a min/max/metric cell, so just return it as is currentColumnCells.add(cell); - nextCell(limit); + nextCell(scannerContext); return; } @@ -610,15 +642,14 @@ public boolean hasMore() { * pointer to the next cell. This method can be called multiple times in a row * to advance through all the available cells. * - * @param cellLimit - * the limit of number of cells to return if the next batch must be - * fetched by the wrapped scanner + * @param scannerContext + * context information for the batch of cells under consideration * @return the next available cell or null if no more cells are available for * the current row * @throws IOException */ - public Cell nextCell(int cellLimit) throws IOException { - Cell cell = peekAtNextCell(cellLimit); + public Cell nextCell(ScannerContext scannerContext) throws IOException { + Cell cell = peekAtNextCell(scannerContext); if (cell != null) { currentIndex++; } @@ -630,20 +661,19 @@ public Cell nextCell(int cellLimit) throws IOException { * pointer. Calling this method multiple times in a row will continue to * return the same cell. * - * @param cellLimit - * the limit of number of cells to return if the next batch must be - * fetched by the wrapped scanner + * @param scannerContext + * context information for the batch of cells under consideration * @return the next available cell or null if no more cells are available for * the current row * @throws IOException if any problem is encountered while grabbing the next * cell. */ - public Cell peekAtNextCell(int cellLimit) throws IOException { + public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException { if (currentIndex >= availableCells.size()) { // done with current batch availableCells.clear(); currentIndex = 0; - hasMore = flowRunScanner.next(availableCells, cellLimit); + hasMore = flowRunScanner.next(availableCells, scannerContext); } Cell cell = null; if (currentIndex < availableCells.size()) { @@ -720,4 +750,31 @@ public boolean reseek(byte[] bytes) throws IOException { } return regionScanner.reseek(bytes); } + + /** + * @return The limit on the number of cells to retrieve on each call to + * next(). See {@link Scan#setBatch(int)} + */ + @Override + public int getBatch() { + return batchSize; + } + + /** + * Gets the batch limit from the given {@link ScannerContext} through + * reflection. + */ + private int getBatchLimit(ScannerContext scannerContext) { + // We need to to access the batch limit (limit on number of columns) in the + // ScannerContext. The ScannerContext does not expose its method to get the + // limits and has it as an integer inside a class LimitFields. So get this + // through reflection. + try { + return (Integer) GET_BATCH_METHOD + .invoke(SCANNER_CONTEXT_LIMITS_FIELD.get(scannerContext)); + } catch (Exception e) { + throw e instanceof RuntimeException ? (RuntimeException) e + : new RuntimeException(e); + } + } }