Index: src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java @@ -46,4 +46,8 @@ return HConstants.DEFAULT_CLUSTER_ID; } + @Override + public String getName() { + return this.getClass().getSimpleName().toLowerCase(); + } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4264,6 +4264,9 @@ public void processRowsWithLocks(RowProcessor processor, long timeout) throws IOException { + final long startNanoTime = System.nanoTime(); + String metricsName = "rowprocessor." + processor.getName(); + for (byte[] row : processor.getRowsToLock()) { checkRow(row, "processRowsWithLocks"); } @@ -4285,12 +4288,21 @@ doProcessRowWithTimeout( processor, now, this, null, null, timeout); processor.postProcess(this, walEdit); + } catch (IOException e) { + long endNanoTime = System.nanoTime(); + HRegion.incrTimeVaryingMetric(metricsName + ".error.nano", + endNanoTime - startNanoTime); + throw e; } finally { closeRegionOperation(); } + final long endNanoTime = System.nanoTime(); + HRegion.incrTimeVaryingMetric(metricsName + ".nano", + endNanoTime - startNanoTime); return; } + long lockedNanoTime, processDoneNanoTime, unlockedNanoTime = 0; MultiVersionConsistencyControl.WriteEntry writeEntry = null; boolean locked = false; boolean walSyncSuccessful = false; @@ -4313,13 +4325,15 @@ // 3. Region lock this.updatesLock.readLock().lock(); locked = true; + lockedNanoTime = System.nanoTime(); long now = EnvironmentEdgeManager.currentTimeMillis(); try { // 4. Let the processor scan the rows, generate mutations and add // waledits doProcessRowWithTimeout( processor, now, this, mutations, walEdit, timeout); + processDoneNanoTime = System.nanoTime(); if (!mutations.isEmpty()) { // 5. Get a mvcc write number @@ -4344,6 +4358,8 @@ this.updatesLock.readLock().unlock(); locked = false; } + unlockedNanoTime = System.nanoTime(); + // 9. Release row lock(s) if (acquiredLocks != null) { for (Integer lid : acquiredLocks) { @@ -4382,18 +4398,41 @@ releaseRowLock(lid); } } + unlockedNanoTime = unlockedNanoTime == 0 ? + System.nanoTime() : unlockedNanoTime; } // 12. Run post-process hook processor.postProcess(this, walEdit); + } catch (IOException e) { + long endNanoTime = System.nanoTime(); + HRegion.incrTimeVaryingMetric(metricsName + ".error.nano", + endNanoTime - startNanoTime); + throw e; } finally { closeRegionOperation(); if (!mutations.isEmpty() && isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) { requestFlush(); } } + // Populate all metrics + long endNanoTime = System.nanoTime(); + HRegion.incrTimeVaryingMetric(metricsName + ".nano", + endNanoTime - startNanoTime); + + HRegion.incrTimeVaryingMetric(metricsName + ".acquirelock.nano", + lockedNanoTime - startNanoTime); + + HRegion.incrTimeVaryingMetric(metricsName + ".process.nano", + processDoneNanoTime - lockedNanoTime); + + HRegion.incrTimeVaryingMetric(metricsName + ".occupylock.nano", + unlockedNanoTime - lockedNanoTime); + + HRegion.incrTimeVaryingMetric(metricsName + ".sync.nano", + endNanoTime - unlockedNanoTime); } private void doProcessRowWithTimeout(final RowProcessor processor, @@ -4795,8 +4834,9 @@ // Request a cache flush. Do it outside update lock. requestFlush(); } - if(wrongLength){ - throw new IOException("Attempted to increment field that isn't 64 bits wide"); + if (wrongLength) { + throw new IOException( + "Attempted to increment field that isn't 64 bits wide"); } return result; } @@ -4812,7 +4852,7 @@ throw new NoSuchColumnFamilyException("Column family " + Bytes.toString(family) + " does not exist in region " + this + " in table " + this.htableDescriptor); - } + } } public static final long FIXED_OVERHEAD = ClassSize.align( Index: src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java @@ -64,7 +64,7 @@ /** * HRegion handles the locks and MVCC and invokes this method properly. - * + * * You should override this to create your own RowProcessor. * * If you are doing read-modify-write here, you should consider using @@ -103,4 +103,9 @@ */ UUID getClusterId(); + /** + * Human readable name of the processor + * @return The name of the processor + */ + String getName(); } Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -138,7 +138,7 @@ prepareTestData(); RowProcessorProtocol protocol = table.coprocessorProxy(RowProcessorProtocol.class, ROW); - RowProcessorEndpoint.FriendsOfFriendsProcessor processor = + RowProcessorEndpoint.FriendsOfFriendsProcessor processor = new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A); Set result = protocol.process(processor); @@ -176,7 +176,7 @@ private int incrementCounter(HTable table) throws Throwable { RowProcessorProtocol protocol = table.coprocessorProxy(RowProcessorProtocol.class, ROW); - RowProcessorEndpoint.IncrementCounterProcessor processor = + RowProcessorEndpoint.IncrementCounterProcessor processor = new RowProcessorEndpoint.IncrementCounterProcessor(ROW); int counterValue = protocol.process(processor); return counterValue; @@ -234,7 +234,7 @@ private void swapRows(HTable table) throws Throwable { RowProcessorProtocol protocol = table.coprocessorProxy(RowProcessorProtocol.class, ROW); - RowProcessorEndpoint.RowSwapProcessor processor = + RowProcessorEndpoint.RowSwapProcessor processor = new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2); protocol.process(processor); } @@ -244,7 +244,7 @@ prepareTestData(); RowProcessorProtocol protocol = table.coprocessorProxy(RowProcessorProtocol.class, ROW); - RowProcessorEndpoint.TimeoutProcessor processor = + RowProcessorEndpoint.TimeoutProcessor processor = new RowProcessorEndpoint.TimeoutProcessor(ROW); boolean exceptionCaught = false; try { @@ -510,13 +510,18 @@ Bytes.writeByteArray(out, row1); Bytes.writeByteArray(out, row2); } + + @Override + public String getName() { + return "swap"; + } } public static class TimeoutProcessor extends BaseRowProcessor implements Writable { byte[] row = new byte[0]; - + /** * Empty constructor for Writable */ @@ -556,6 +561,11 @@ public void write(DataOutput out) throws IOException { Bytes.writeByteArray(out, row); } + + @Override + public String getName() { + return "timeout"; + } } public static void doScan(