From deeac9eac4cd0078f79ba9ef08535b74c4826564 Mon Sep 17 00:00:00 2001 From: Umesh Agashe Date: Mon, 10 Apr 2017 15:32:43 -0700 Subject: [PATCH] HBASE-17888: Added generic methods for updating metrics on start and end of a procedure execution Change-Id: Ia90154befb1ac17fb921e0da56952ab2801eee77 --- .../master/MetricsAssignmentManagerSource.java | 5 + .../master/MetricsAssignmentManagerSourceImpl.java | 9 +- .../apache/hadoop/hbase/procedure2/Procedure.java | 12 + .../hadoop/hbase/procedure2/ProcedureExecutor.java | 11 + .../hbase/procedure2/TestProcedureMetrics.java | 254 +++++++++++++++++++++ .../procedure2/TestStateMachineProcedure.java | 1 - 6 files changed, 287 insertions(+), 5 deletions(-) create mode 100644 hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java index f6c9cb8..9a291bb 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSource.java @@ -49,6 +49,11 @@ public interface MetricsAssignmentManagerSource extends BaseSource { String ASSIGN_TIME_NAME = "assign"; String BULK_ASSIGN_TIME_NAME = "bulkAssign"; + String RIT_COUNT_DESC = "Number of Regions In Transition."; + String RIT_COUNT_OVER_THRESHOLD_DESC = "Number of Regions In Transition over threshold time."; + String RIT_OLDEST_AGE_DESC = "Timestamp of an oldest Region In Transition."; + String RIT_DURATION_DESC = "Durations for Regions in Transition."; + void updateAssignmentTime(long time); void updateBulkAssignTime(long time); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java index ab504f5..faae044 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/master/MetricsAssignmentManagerSourceImpl.java @@ -46,12 +46,13 @@ public class MetricsAssignmentManagerSourceImpl } public void init() { - ritGauge = metricsRegistry.newGauge(RIT_COUNT_NAME, "", 0l); - ritCountOverThresholdGauge = metricsRegistry.newGauge(RIT_COUNT_OVER_THRESHOLD_NAME, "", 0l); - ritOldestAgeGauge = metricsRegistry.newGauge(RIT_OLDEST_AGE_NAME, "", 0l); + ritGauge = metricsRegistry.newGauge(RIT_COUNT_NAME, RIT_COUNT_DESC, 0l); + ritCountOverThresholdGauge = metricsRegistry.newGauge(RIT_COUNT_OVER_THRESHOLD_NAME, + RIT_COUNT_OVER_THRESHOLD_DESC,0l); + ritOldestAgeGauge = metricsRegistry.newGauge(RIT_OLDEST_AGE_NAME, RIT_OLDEST_AGE_DESC, 0l); assignTimeHisto = metricsRegistry.newTimeHistogram(ASSIGN_TIME_NAME); bulkAssignTimeHisto = metricsRegistry.newTimeHistogram(BULK_ASSIGN_TIME_NAME); - ritDurationHisto = metricsRegistry.newTimeHistogram(RIT_DURATION_NAME); + ritDurationHisto = metricsRegistry.newTimeHistogram(RIT_DURATION_NAME, RIT_DURATION_DESC); } @Override diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 761ab3a..921875f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -240,6 +240,18 @@ public abstract class Procedure implements Comparable { return true; } + /** + * This function will be called just when procedure is submitted for execution. Override this + * method to update the metrics at the beginning of the procedure + */ + protected void updateMetricsOnStart(final TEnvironment env) {} + + /** + * This function will be called just after procedure execution is finished. Override this method + * to update metrics at the end of the procedure + */ + protected void updateMetricsOnFinish(final TEnvironment env, final long time, boolean success) {} + @Override public String toString() { // Return the simple String presentation of the procedure. diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 43f5839..ca7d0d6 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -831,6 +831,9 @@ public class ProcedureExecutor { private long pushProcedure(final Procedure proc) { final long currentProcId = proc.getProcId(); + // Update metrics on start of a procedure + proc.updateMetricsOnStart(getEnvironment()); + // Create the rollback stack for the procedure RootProcedureState stack = new RootProcedureState(); rollbackStack.put(currentProcId, stack); @@ -1145,6 +1148,9 @@ public class ProcedureExecutor { } if (proc.isSuccess()) { + // update metrics on finishing the procedure + proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); + if (LOG.isDebugEnabled()) { LOG.debug("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime())); } @@ -1276,6 +1282,10 @@ public class ProcedureExecutor { if (proc.removeStackIndex()) { proc.setState(ProcedureState.ROLLEDBACK); + + // update metrics on finishing the procedure + proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false); + if (proc.hasParent()) { store.delete(proc.getProcId()); procedures.remove(proc.getProcId()); @@ -1444,6 +1454,7 @@ public class ProcedureExecutor { private void submitChildrenProcedures(final Procedure[] subprocs) { for (int i = 0; i < subprocs.length; ++i) { final Procedure subproc = subprocs[i]; + subproc.updateMetricsOnStart(getEnvironment()); assert !procedures.containsKey(subproc.getProcId()); procedures.put(subproc.getProcId(), subproc); scheduler.addFront(subproc); diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java new file mode 100644 index 0000000..0c9b7b8 --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureMetrics { + private static final Log LOG = LogFactory.getLog(TestProcedureMetrics.class); + + private static final int PROCEDURE_EXECUTOR_SLOTS = 1; + + private TestProcEnv procEnv; + private static ProcedureExecutor procExecutor; + private ProcedureStore procStore; + + private HBaseCommonTestingUtility htu; + private FileSystem fs; + private Path testDir; + private Path logDir; + + private static int beginCount = 0; + private static int successCount = 0; + private static int failedCount = 0; + + + @Before + public void setUp() throws IOException { + htu = new HBaseCommonTestingUtility(); + testDir = htu.getDataTestDir(); + fs = testDir.getFileSystem(htu.getConfiguration()); + assertTrue(testDir.depth() > 1); + + logDir = new Path(testDir, "proc-logs"); + procEnv = new TestProcEnv(); + procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir); + procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor.testing = new ProcedureExecutor.Testing(); + procStore.start(PROCEDURE_EXECUTOR_SLOTS); + procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + } + + @After + public void tearDown() throws IOException { + procExecutor.stop(); + procStore.stop(false); + fs.delete(logDir, true); + } + + @Test + public void testMetricForSimpleProcedure() throws Exception { + // procedure that executes successfully + ProcedureMetrics proc = new ProcedureMetrics(true); + long id = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + assertNotEquals("ProcId zero!", 0, id); + beginCount++; + successCount++; + ProcedureTestingUtility.waitProcedure(procExecutor, proc); + assertEquals("beginCount doesn't match!", beginCount, proc.beginCount); + assertEquals("successCount doesn't match!", successCount, proc.successCount); + assertEquals("failedCont doesn't match!", failedCount, proc.failedCount); + } + + @Test + public void testMetricsForFailedProcedure() throws Exception { + // procedure that fails + ProcedureMetrics proc = new ProcedureMetrics(false); + long id = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + assertNotEquals("ProcId zero!", 0, id); + beginCount++; + failedCount++; + ProcedureTestingUtility.waitProcedure(procExecutor, proc); + assertEquals("beginCount doesn't match!", beginCount, proc.beginCount); + assertEquals("successCount doesn't match!", successCount, proc.successCount); + assertEquals("failedCont doesn't match!", failedCount, proc.failedCount); + } + + @Test + public void testMetricForYieldProcedure() throws Exception { + // procedure that yields + ProcedureMetrics proc = new ProcedureMetrics(true, true); + long id = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + assertNotEquals("ProcId zero!", 0, id); + beginCount++; + successCount++; + ProcedureTestingUtility.waitProcedure(procExecutor, proc); + assertEquals("beginCount doesn't match!", beginCount, proc.beginCount); + assertEquals("successCount doesn't match!", successCount, proc.successCount); + assertEquals("failedCont doesn't match!", failedCount, proc.failedCount); + } + + @Test + public void testMetricForFailedYiledProcedure() { + // procedure that yields and fails + ProcedureMetrics proc = new ProcedureMetrics(false, true); + long id = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + assertNotEquals("ProcId zero!", 0, id); + beginCount++; + failedCount++; + ProcedureTestingUtility.waitProcedure(procExecutor, proc); + assertEquals("beginCount doesn't match!", beginCount, proc.beginCount); + assertEquals("successCount doesn't match!", successCount, proc.successCount); + assertEquals("failedCont doesn't match!", failedCount, proc.failedCount); + } + + @Test + public void testMetricForProcedureWithChildren() throws Exception { + // Procedure that yileds with one of the sub-procedures that fail + int subProcCount = 10; + int failChildIndex = 2; + int yiledChildIndex = -1; + ProcedureMetrics[] subprocs = new ProcedureMetrics[subProcCount]; + for (int i = 0; i < subProcCount; ++i) { + subprocs[i] = new ProcedureMetrics(failChildIndex != i, yiledChildIndex == i, 3); + } + + ProcedureMetrics proc = new ProcedureMetrics(true, true, 3, subprocs); + long id = ProcedureTestingUtility.submitAndWait(procExecutor, proc); + assertNotEquals("ProcId zero!", 0, id); + beginCount += subProcCount + 1; + successCount += subProcCount - (failChildIndex + 1); + if (failChildIndex >= 0) { + failedCount += subProcCount + 1; + } else { + successCount++; + } + ProcedureTestingUtility.waitProcedure(procExecutor, proc); + assertEquals("beginCount doesn't match!", beginCount, proc.beginCount); + assertEquals("successCount doesn't match!", successCount, proc.successCount); + assertEquals("failedCont doesn't match!", failedCount, proc.failedCount); + } + + private static class TestProcEnv { + public boolean toggleKillBeforeStoreUpdate = false; + public boolean triggerRollbackOnChild = false; + } + + public static class ProcedureMetrics extends SequentialProcedure { + public static long beginCount = 0; + public static long successCount = 0; + public static long failedCount = 0; + + private boolean success; + private boolean yield; + private int yieldCount; + private int yieldNum; + + private ProcedureMetrics[] subprocs = null; + + public ProcedureMetrics() { + this(true); + } + + public ProcedureMetrics(boolean success) { + this(success, true); + } + + public ProcedureMetrics(boolean success, boolean yield) { + this(success, yield, 1); + } + + public ProcedureMetrics(boolean success, boolean yield, int yieldCount) { + this(success, yield, yieldCount, null); + } + + public ProcedureMetrics(boolean success, ProcedureMetrics[] subprocs) { + this(success, false, 1, subprocs); + } + + public ProcedureMetrics(boolean success, boolean yield, int yieldCount, + ProcedureMetrics[] subprocs) { + this.success = success; + this.yield = yield; + this.yieldCount = yieldCount; + this.subprocs = subprocs; + yieldNum = 0; + } + + @Override + protected void updateMetricsOnStart(TestProcEnv env) { + beginCount++; + } + + @Override + protected Procedure[] execute(TestProcEnv env) throws ProcedureYieldException, + ProcedureSuspendedException, InterruptedException { + if (this.yield) { + if (yieldNum < yieldCount) { + yieldNum++; + throw new ProcedureYieldException(); + } + } + if (!this.success) { + setFailure("Failed", new InterruptedException("Failed")); + return null; + } + return subprocs; + } + + @Override + protected void rollback(TestProcEnv env) throws IOException, InterruptedException { + + } + + @Override + protected boolean abort(TestProcEnv env) { + return false; + } + + @Override + protected void updateMetricsOnFinish(final TestProcEnv env, final long time, + boolean success) { + if (success) { + successCount++; + } else { + failedCount++; + } + } + + } +} diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java index 4b36c76..82b767e 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; -import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; -- 2.10.1 (Apple Git-78)