diff --git hadoop-project/pom.xml hadoop-project/pom.xml
index a1edb17..d52bf6d 100644
--- hadoop-project/pom.xml
+++ hadoop-project/pom.xml
@@ -49,8 +49,8 @@
2.11.0
0.8.2.1
- 1.0.1
- 4.5.0-SNAPSHOT
+ 1.1.3
+ 4.7.0-HBase-1.1
2.5.1
${project.version}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
index a443b50..26aab47 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java
@@ -43,6 +43,7 @@
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
@@ -107,8 +108,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
// check in flow run table
util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table);
- List regions = server.getOnlineRegions(table);
- for (HRegion region : regions) {
+ List regions = server.getOnlineRegions(table);
+ for (Region region : regions) {
assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
}
@@ -122,8 +123,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
// check in flow activity table
util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table);
- List regions = server.getOnlineRegions(table);
- for (HRegion region : regions) {
+ List regions = server.getOnlineRegions(table);
+ for (Region region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
}
@@ -137,8 +138,8 @@ public void checkCoProcessorOff() throws IOException, InterruptedException {
// check in entity run table
util.waitUntilAllRegionsAssigned(table);
HRegionServer server = util.getRSForFirstRegionInTable(table);
- List regions = server.getOnlineRegions(table);
- for (HRegion region : regions) {
+ List regions = server.getOnlineRegions(table);
+ for (Region region : regions) {
assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(),
hbaseConf));
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
index 6b0ee5c..7d1195e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java
@@ -60,6 +60,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
/**
@@ -176,13 +177,13 @@ public void testWriteFlowRunCompaction() throws Exception {
// check in flow run table
HRegionServer server = util.getRSForFirstRegionInTable(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
- List regions = server.getOnlineRegions(TableName
+ List regions = server.getOnlineRegions(TableName
.valueOf(FlowRunTable.DEFAULT_TABLE_NAME));
assertTrue("Didn't find any regions for primary table!", regions.size() > 0);
// flush and compact all the regions of the primary table
- for (HRegion region : regions) {
- region.flushcache();
- region.compactStores(true);
+ for (Region region : regions) {
+ region.flush(true);
+ region.compact(true);
}
// check flow run for one flow many apps
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
index 8ea51a1..84632d5 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java
@@ -39,8 +39,8 @@
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
@@ -59,7 +59,7 @@
private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class);
private boolean isFlowRunRegion = false;
- private HRegion region;
+ private Region region;
/**
* generate a timestamp that is unique per row in a region this is per region.
*/
@@ -296,8 +296,8 @@ public InternalScanner preCompact(
requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION
: FlowScannerOperation.MINOR_COMPACTION);
LOG.info("Compactionrequest= " + request.toString() + " "
- + requestOp.toString() + " RegionName="
- + e.getEnvironment().getRegion().getRegionNameAsString());
+ + requestOp.toString() + " RegionName=" + e.getEnvironment()
+ .getRegion().getRegionInfo().getRegionNameAsString());
}
return new FlowScanner(e.getEnvironment(), -1, scanner, requestOp);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
index 648c77b..c1f9465 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java
@@ -20,6 +20,8 @@
import java.io.Closeable;
import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -36,9 +38,10 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -67,10 +70,32 @@
* TimestampGenerator parses the app id to generate a cell timestamp.
*/
private static final String FLOW_APP_ID = "application_00000000000_0000";
+ private static final Field SCANNER_CONTEXT_LIMITS_FIELD;
+ private static final Method GET_BATCH_METHOD;
- private final HRegion region;
+ // The ScannerContext class has package private field for limits
+ // and the method to get it (called getBatch)
+ // So we use reflection to access these
+ static {
+ try {
+ Field limitsField = ScannerContext.class.getDeclaredField("limits");
+ limitsField.setAccessible(true);
+ SCANNER_CONTEXT_LIMITS_FIELD = limitsField;
+
+ Class> limitFieldsClass = limitsField.getType();
+ Method getBatchMethod = limitFieldsClass.getDeclaredMethod("getBatch");
+ getBatchMethod.setAccessible(true);
+ GET_BATCH_METHOD = getBatchMethod;
+ } catch (Exception e) {
+ LOG.error("Failed to get ScannerContext.LimitFields.getBatch method "
+ + "through reflection.");
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private final Region region;
private final InternalScanner flowRunScanner;
- private final int limit;
+ private final int batchSize;
private final long appFinalValueRetentionThreshold;
private RegionScanner regionScanner;
private boolean hasMore;
@@ -79,9 +104,9 @@
private int currentIndex;
private FlowScannerOperation action = FlowScannerOperation.READ;
- FlowScanner(RegionCoprocessorEnvironment env, int limit,
+ FlowScanner(RegionCoprocessorEnvironment env, int batchSize,
InternalScanner internalScanner, FlowScannerOperation action) {
- this.limit = limit;
+ this.batchSize = batchSize;
this.flowRunScanner = internalScanner;
if (internalScanner instanceof RegionScanner) {
this.regionScanner = (RegionScanner) internalScanner;
@@ -112,22 +137,26 @@ public HRegionInfo getRegionInfo() {
@Override
public boolean nextRaw(List cells) throws IOException {
- return nextRaw(cells, limit);
+ return nextRaw(cells,
+ ScannerContext.newBuilder().setBatchLimit(batchSize).build());
}
@Override
- public boolean nextRaw(List cells, int cellLimit) throws IOException {
- return nextInternal(cells, cellLimit);
+ public boolean nextRaw(List cells, ScannerContext scannerContext)
+ throws IOException {
+ return nextInternal(cells, scannerContext);
}
@Override
public boolean next(List cells) throws IOException {
- return next(cells, limit);
+ return next(cells,
+ ScannerContext.newBuilder().setBatchLimit(batchSize).build());
}
@Override
- public boolean next(List cells, int cellLimit) throws IOException {
- return nextInternal(cells, cellLimit);
+ public boolean next(List cells, ScannerContext scannerContext)
+ throws IOException {
+ return nextInternal(cells, scannerContext);
}
/**
@@ -176,12 +205,12 @@ private static boolean isNumericConverter(ValueConverter converter) {
* column or returns the cell as is.
*
* @param cells
- * @param cellLimit
+ * @param scannerContext
* @return true if next row is available for the scanner, false otherwise
* @throws IOException
*/
@SuppressWarnings("deprecation")
- private boolean nextInternal(List cells, int cellLimit)
+ private boolean nextInternal(List cells, ScannerContext scannerContext)
throws IOException {
Cell cell = null;
startNext();
@@ -201,9 +230,10 @@ private boolean nextInternal(List cells, int cellLimit)
int addedCnt = 0;
long currentTimestamp = System.currentTimeMillis();
ValueConverter converter = null;
+ int limit = getBatchLimit(scannerContext);
- while (cellLimit <= 0 || addedCnt < cellLimit) {
- cell = peekAtNextCell(cellLimit);
+ while (limit <= 0 || addedCnt < limit) {
+ cell = peekAtNextCell(scannerContext);
if (cell == null) {
break;
}
@@ -221,12 +251,12 @@ private boolean nextInternal(List cells, int cellLimit)
// No operation needs to be performed on non numeric converters.
if (!isNumericConverter(converter)) {
currentColumnCells.add(cell);
- nextCell(cellLimit);
+ nextCell(scannerContext);
continue;
}
collectCells(currentColumnCells, currentAggOp, cell, alreadySeenAggDim,
- (NumericValueConverter)converter);
- nextCell(cellLimit);
+ (NumericValueConverter)converter, scannerContext);
+ nextCell(scannerContext);
}
if (!currentColumnCells.isEmpty()) {
addedCnt += emitCells(cells, currentColumnCells, currentAggOp,
@@ -268,12 +298,14 @@ private void resetState(SortedSet currentColumnCells,
private void collectCells(SortedSet| currentColumnCells,
AggregationOperation currentAggOp, Cell cell,
- Set alreadySeenAggDim, NumericValueConverter converter)
+ Set alreadySeenAggDim, NumericValueConverter converter,
+ ScannerContext scannerContext)
throws IOException {
+
if (currentAggOp == null) {
// not a min/max/metric cell, so just return it as is
currentColumnCells.add(cell);
- nextCell(limit);
+ nextCell(scannerContext);
return;
}
@@ -610,15 +642,14 @@ public boolean hasMore() {
* pointer to the next cell. This method can be called multiple times in a row
* to advance through all the available cells.
*
- * @param cellLimit
- * the limit of number of cells to return if the next batch must be
- * fetched by the wrapped scanner
+ * @param scannerContext
+ * context information for the batch of cells under consideration
* @return the next available cell or null if no more cells are available for
* the current row
* @throws IOException
*/
- public Cell nextCell(int cellLimit) throws IOException {
- Cell cell = peekAtNextCell(cellLimit);
+ public Cell nextCell(ScannerContext scannerContext) throws IOException {
+ Cell cell = peekAtNextCell(scannerContext);
if (cell != null) {
currentIndex++;
}
@@ -630,20 +661,19 @@ public Cell nextCell(int cellLimit) throws IOException {
* pointer. Calling this method multiple times in a row will continue to
* return the same cell.
*
- * @param cellLimit
- * the limit of number of cells to return if the next batch must be
- * fetched by the wrapped scanner
+ * @param scannerContext
+ * context information for the batch of cells under consideration
* @return the next available cell or null if no more cells are available for
* the current row
* @throws IOException if any problem is encountered while grabbing the next
* cell.
*/
- public Cell peekAtNextCell(int cellLimit) throws IOException {
+ public Cell peekAtNextCell(ScannerContext scannerContext) throws IOException {
if (currentIndex >= availableCells.size()) {
// done with current batch
availableCells.clear();
currentIndex = 0;
- hasMore = flowRunScanner.next(availableCells, cellLimit);
+ hasMore = flowRunScanner.next(availableCells, scannerContext);
}
Cell cell = null;
if (currentIndex < availableCells.size()) {
@@ -720,4 +750,31 @@ public boolean reseek(byte[] bytes) throws IOException {
}
return regionScanner.reseek(bytes);
}
+
+ /**
+ * @return The limit on the number of cells to retrieve on each call to
+ * next(). See {@link Scan#setBatch(int)}
+ */
+ @Override
+ public int getBatch() {
+ return batchSize;
+ }
+
+ /**
+ * Gets the batch limit from the given {@link ScannerContext} through
+ * reflection.
+ */
+ private int getBatchLimit(ScannerContext scannerContext) {
+ // We need to to access the batch limit (limit on number of columns) in the
+ // ScannerContext. The ScannerContext does not expose its method to get the
+ // limits and has it as an integer inside a class LimitFields. So get this
+ // through reflection.
+ try {
+ return (Integer) GET_BATCH_METHOD
+ .invoke(SCANNER_CONTEXT_LIMITS_FIELD.get(scannerContext));
+ } catch (Exception e) {
+ throw e instanceof RuntimeException ? (RuntimeException) e
+ : new RuntimeException(e);
+ }
+ }
}
| | | | | | | | | | | |