diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 00ef39f..46ac691 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.time.Clock; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -32,7 +33,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.SystemClock; @Private @Unstable @@ -103,6 +104,8 @@ protected ResourceManager resourceManager; private final ReadLock readLock; private final WriteLock writeLock; + private static RMStateStoreOpDurations rmStateStoreOpDurations = + RMStateStoreOpDurations.getInstance(true); public static final Log LOG = LogFactory.getLog(RMStateStore.class); @@ -217,11 +220,14 @@ public RMStateStoreState transition(RMStateStore store, appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Storing info for app: " + appId); try { + long start = getClock().getTime(); store.storeApplicationStateInternal(appId, appState); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); store.notifyApplication( new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing app: " + appId, e); + rmStateStoreOpDurations.incrFailedOps(); if (e instanceof StoreLimitException) { store.notifyApplication( new RMAppEvent(appId, RMAppEventType.APP_SAVE_FAILED, @@ -255,7 +261,9 @@ public RMStateStoreState transition(RMStateStore store, appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Updating info for app: " + appId); try { + long start = getClock().getTime(); store.updateApplicationStateInternal(appId, appState); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); if (((RMStateUpdateAppEvent) event).isNotifyApplication()) { store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED)); @@ -268,6 +276,7 @@ public RMStateStoreState transition(RMStateStore store, } catch (Exception e) { String msg = "Error updating app: " + appId; LOG.error(msg, e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); if (result != null) { result.setException(new YarnException(msg, e)); @@ -295,9 +304,12 @@ public RMStateStoreState transition(RMStateStore store, appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Removing info for app: " + appId); try { + long start = getClock().getTime(); store.removeApplicationStateInternal(appState); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error removing app: " + appId, e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -322,13 +334,16 @@ public RMStateStoreState transition(RMStateStore store, if (LOG.isDebugEnabled()) { LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); } + long start = getClock().getTime(); store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(), attemptState); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); store.notifyApplicationAttempt(new RMAppAttemptEvent (attemptState.getAttemptId(), RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -353,13 +368,16 @@ public RMStateStoreState transition(RMStateStore store, if (LOG.isDebugEnabled()) { LOG.debug("Updating info for attempt: " + attemptState.getAttemptId()); } + long start = getClock().getTime(); store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(), attemptState); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); store.notifyApplicationAttempt(new RMAppAttemptEvent (attemptState.getAttemptId(), RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); } catch (Exception e) { LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -381,11 +399,14 @@ public RMStateStoreState transition(RMStateStore store, RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Storing RMDelegationToken and SequenceNumber"); + long start = getClock().getTime(); store.storeRMDelegationTokenState( dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Storing RMDelegationToken and SequenceNumber ", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -407,10 +428,13 @@ public RMStateStoreState transition(RMStateStore store, RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Removing RMDelegationToken and SequenceNumber"); + long start = getClock().getTime(); store.removeRMDelegationTokenState(dtEvent.getRmDTIdentifier()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Removing RMDelegationToken and SequenceNumber ", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -432,11 +456,14 @@ public RMStateStoreState transition(RMStateStore store, RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Updating RMDelegationToken and SequenceNumber"); + long start = getClock().getTime(); store.updateRMDelegationTokenState( dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Updating RMDelegationToken and SequenceNumber ", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -459,9 +486,12 @@ public RMStateStoreState transition(RMStateStore store, (RMStateStoreRMDTMasterKeyEvent) event; try { LOG.info("Storing RMDTMasterKey."); + long start = getClock().getTime(); store.storeRMDTMasterKeyState(dtEvent.getDelegationKey()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Storing RMDTMasterKey.", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -484,9 +514,12 @@ public RMStateStoreState transition(RMStateStore store, (RMStateStoreRMDTMasterKeyEvent) event; try { LOG.info("Removing RMDTMasterKey."); + long start = getClock().getTime(); store.removeRMDTMasterKeyState(dtEvent.getDelegationKey()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Removing RMDTMasterKey.", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -508,10 +541,13 @@ public RMStateStoreState transition(RMStateStore store, boolean isFenced = false; try { LOG.info("Updating AMRMToken"); + long start = getClock().getTime(); store.storeOrUpdateAMRMTokenSecretManagerState( amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error storing info for AMRMTokenSecretManager", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -535,12 +571,15 @@ public RMStateStoreState transition(RMStateStore store, try { LOG.info("Storing reservation allocation." + reservationEvent .getReservationIdName()); + long start = getClock().getTime(); store.storeReservationState( reservationEvent.getReservationAllocation(), reservationEvent.getPlanName(), reservationEvent.getReservationIdName()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error while storing reservation allocation.", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -564,11 +603,14 @@ public RMStateStoreState transition(RMStateStore store, try { LOG.info("Removing reservation allocation." + reservationEvent .getReservationIdName()); + long start = getClock().getTime(); store.removeReservationState( reservationEvent.getPlanName(), reservationEvent.getReservationIdName()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error while removing reservation allocation.", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -613,6 +655,10 @@ public RMStateStore() { stateMachine = stateMachineFactory.make(this); } + public static SystemClock getClock() { + return SystemClock.getInstance(); + } + public static class RMDTSecretManagerState { // DTIdentifier -> renewDate Map delegationTokenState = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOpDurations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOpDurations.java new file mode 100644 index 0000000..8e621d0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOpDurations.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * Class to capture the performance metrics of RMStateStore. + * This should be a singleton. + */ +@InterfaceAudience.Private @InterfaceStability.Unstable +@Metrics(context = "rmstatestore-op-durations") +public class RMStateStoreOpDurations + implements MetricsSource { + + @Metric("Duration for a writeOps call") MutableRate writeOpsCallDuration; + + @Metric("Duration for a failedOps call") MutableCounterLong failedOps; + + + private static final MetricsInfo RECORD_INFO = + info("RMStateStoreOpDurations", "Durations of RMStateStore calls"); + + private final MetricsRegistry registry; + + private boolean isExtended = false; + + private static final RMStateStoreOpDurations INSTANCE = new RMStateStoreOpDurations(); + + + public static RMStateStoreOpDurations getInstance(boolean isExtended) { + INSTANCE.setExtended(isExtended); + return INSTANCE; + } + + private synchronized void setExtended(boolean isExtended) { + if (isExtended == INSTANCE.isExtended) { + return; + } + writeOpsCallDuration.setExtended(isExtended); + INSTANCE.isExtended = isExtended; + } + + private RMStateStoreOpDurations() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "RMStateStoreOpDurations"); + + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register(RECORD_INFO.name(), RECORD_INFO.description(), this); + } + } + + @Override public void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + public void addWriteOpsCallDuration(long value) { + writeOpsCallDuration.add(value); + } + + public void incrFailedOps() { + failedOps.incr(); + } + +} \ No newline at end of file