diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index 450640a..1dcf7f6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -57,6 +58,7 @@ public class FlowRunCoprocessor extends BaseRegionObserver { private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); + private boolean IS_FLOW_RUN_REGION = false; private HRegion region; /** @@ -70,6 +72,15 @@ public void start(CoprocessorEnvironment e) throws IOException { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; this.region = env.getRegion(); + String regionTableName = this.region.getRegionInfo().getTable().getNameAsString(); + Configuration hbaseConf = env.getConfiguration(); + String flowRunTableName = hbaseConf.get(FlowRunTable.TABLE_NAME_CONF_NAME, + FlowRunTable.DEFAULT_TABLE_NAME); + LOG.debug("regionTableName=" + regionTableName); + if (flowRunTableName.equalsIgnoreCase(regionTableName)) { + LOG.debug(" table is the flow run table!! " + flowRunTableName); + IS_FLOW_RUN_REGION = true; + } } } @@ -93,6 +104,9 @@ public void prePut(ObserverContext e, Put put, WALEdit edit, Durability durability) throws IOException { Map attributes = put.getAttributesMap(); + if(!IS_FLOW_RUN_REGION) { + return; + } // Assumption is that all the cells in a put are the same operation. List tags = new ArrayList<>(); if ((attributes != null) && (attributes.size() > 0)) { @@ -160,6 +174,10 @@ private long getCellTimestamp(long timestamp, List tags) { @Override public void preGetOp(ObserverContext e, Get get, List results) throws IOException { + if(!IS_FLOW_RUN_REGION) { + return; + } + Scan scan = new Scan(get); scan.setMaxVersions(); RegionScanner scanner = null; @@ -190,11 +208,14 @@ public void preGetOp(ObserverContext e, @Override public RegionScanner preScannerOpen( ObserverContext e, Scan scan, - RegionScanner s) throws IOException { - // set max versions for scan to see all - // versions to aggregate for metrics - scan.setMaxVersions(); - return s; + RegionScanner scanner) throws IOException { + + if(IS_FLOW_RUN_REGION) { + // set max versions for scan to see all + // versions to aggregate for metrics + scan.setMaxVersions(); + } + return scanner; } /* @@ -213,6 +234,9 @@ public RegionScanner preScannerOpen( public RegionScanner postScannerOpen( ObserverContext e, Scan scan, RegionScanner scanner) throws IOException { + if(!IS_FLOW_RUN_REGION) { + return scanner; + } return new FlowScanner(e.getEnvironment(), scan.getBatch(), scanner, FlowScannerOperation.READ); } @@ -221,6 +245,9 @@ public RegionScanner postScannerOpen( public InternalScanner preFlush( ObserverContext c, Store store, InternalScanner scanner) throws IOException { + if(!IS_FLOW_RUN_REGION) { + return scanner; + } if (LOG.isDebugEnabled()) { if (store != null) { LOG.debug("preFlush store = " + store.getColumnFamilyName() @@ -241,6 +268,9 @@ public InternalScanner preFlush( @Override public void postFlush(ObserverContext c, Store store, StoreFile resultFile) { + if(!IS_FLOW_RUN_REGION) { + return; + } if (LOG.isDebugEnabled()) { if (store != null) { LOG.debug("postFlush store = " + store.getColumnFamilyName() @@ -262,6 +292,9 @@ public InternalScanner preCompact( InternalScanner scanner, ScanType scanType, CompactionRequest request) throws IOException { + if(!IS_FLOW_RUN_REGION) { + return scanner; + } FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION; if (request != null) { requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 0ace529..6b5b55f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -210,7 +210,7 @@ private boolean nextInternal(List cells, int cellLimit) if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { if (converter != null && isNumericConverter(converter)) { addedCnt += emitCells(cells, currentColumnCells, currentAggOp, - (NumericValueConverter)converter, currentTimestamp); + converter, currentTimestamp); } resetState(currentColumnCells, alreadySeenAggDim); currentColumnQualifier = newColumnQualifier; @@ -219,6 +219,7 @@ private boolean nextInternal(List cells, int cellLimit) } // No operation needs to be performed on non numeric converters. if (!isNumericConverter(converter)) { + currentColumnCells.add(cell); nextCell(cellLimit); continue; } @@ -228,7 +229,7 @@ private boolean nextInternal(List cells, int cellLimit) } if (!currentColumnCells.isEmpty()) { addedCnt += emitCells(cells, currentColumnCells, currentAggOp, - (NumericValueConverter)converter, currentTimestamp); + converter, currentTimestamp); if (LOG.isDebugEnabled()) { if (addedCnt > 0) { LOG.debug("emitted cells. " + addedCnt + " for " + this.action @@ -345,7 +346,7 @@ private void collectCells(SortedSet currentColumnCells, * parameter. */ private int emitCells(List cells, SortedSet currentColumnCells, - AggregationOperation currentAggOp, NumericValueConverter converter, + AggregationOperation currentAggOp, ValueConverter converter, long currentTimestamp) throws IOException { if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { return 0; @@ -372,12 +373,13 @@ private int emitCells(List cells, SortedSet currentColumnCells, cells.addAll(currentColumnCells); return currentColumnCells.size(); case READ: - Cell sumCell = processSummation(currentColumnCells, converter); + Cell sumCell = processSummation(currentColumnCells, + (NumericValueConverter) converter); cells.add(sumCell); return 1; case MAJOR_COMPACTION: List finalCells = processSummationMajorCompaction( - currentColumnCells, converter, currentTimestamp); + currentColumnCells, (NumericValueConverter) converter, currentTimestamp); cells.addAll(finalCells); return finalCells.size(); default: