diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 5e3cf22..88b6284 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -74,6 +74,8 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; @Private @Unstable @@ -102,6 +104,8 @@ protected ResourceManager resourceManager; private final ReadLock readLock; private final WriteLock writeLock; + private static RMStateStoreOpDurations rmStateStoreOpDurations = + RMStateStoreOpDurations.getInstance(true); public static final Log LOG = LogFactory.getLog(RMStateStore.class); @@ -216,7 +220,9 @@ public RMStateStoreState transition(RMStateStore store, appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Storing info for app: " + appId); try { + long start = getClock().getTime(); store.storeApplicationStateInternal(appId, appState); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { @@ -247,7 +253,9 @@ public RMStateStoreState transition(RMStateStore store, appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Updating info for app: " + appId); try { + long start = getClock().getTime(); store.updateApplicationStateInternal(appId, appState); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); if (((RMStateUpdateAppEvent) event).isNotifyApplication()) { store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED)); @@ -287,9 +295,12 @@ public RMStateStoreState transition(RMStateStore store, appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Removing info for app: " + appId); try { + long start = getClock().getTime(); store.removeApplicationStateInternal(appState); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error removing app: " + appId, e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -314,13 +325,15 @@ public RMStateStoreState transition(RMStateStore store, if (LOG.isDebugEnabled()) { LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); } + long start = getClock().getTime(); store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(), attemptState); - store.notifyApplicationAttempt(new RMAppAttemptEvent - (attemptState.getAttemptId(), - RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); + store.notifyApplicationAttempt(new RMAppAttemptEvent(attemptState.getAttemptId(), + RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -345,13 +358,15 @@ public RMStateStoreState transition(RMStateStore store, if (LOG.isDebugEnabled()) { LOG.debug("Updating info for attempt: " + attemptState.getAttemptId()); } + long start = getClock().getTime(); store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(), attemptState); - store.notifyApplicationAttempt(new RMAppAttemptEvent - (attemptState.getAttemptId(), - RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); + store.notifyApplicationAttempt(new RMAppAttemptEvent(attemptState.getAttemptId(), + RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); } catch (Exception e) { LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -373,11 +388,13 @@ public RMStateStoreState transition(RMStateStore store, RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Storing RMDelegationToken and SequenceNumber"); - store.storeRMDelegationTokenState( - dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate()); + long start = getClock().getTime(); + store.storeRMDelegationTokenState(dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Storing RMDelegationToken and SequenceNumber ", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -399,10 +416,13 @@ public RMStateStoreState transition(RMStateStore store, RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Removing RMDelegationToken and SequenceNumber"); + long start = getClock().getTime(); store.removeRMDelegationTokenState(dtEvent.getRmDTIdentifier()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Removing RMDelegationToken and SequenceNumber ", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -424,11 +444,13 @@ public RMStateStoreState transition(RMStateStore store, RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Updating RMDelegationToken and SequenceNumber"); - store.updateRMDelegationTokenState( - dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate()); + long start = getClock().getTime(); + store.updateRMDelegationTokenState(dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Updating RMDelegationToken and SequenceNumber ", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -451,9 +473,12 @@ public RMStateStoreState transition(RMStateStore store, (RMStateStoreRMDTMasterKeyEvent) event; try { LOG.info("Storing RMDTMasterKey."); + long start = getClock().getTime(); store.storeRMDTMasterKeyState(dtEvent.getDelegationKey()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Storing RMDTMasterKey.", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -476,9 +501,12 @@ public RMStateStoreState transition(RMStateStore store, (RMStateStoreRMDTMasterKeyEvent) event; try { LOG.info("Removing RMDTMasterKey."); + long start = getClock().getTime(); store.removeRMDTMasterKeyState(dtEvent.getDelegationKey()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Removing RMDTMasterKey.", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -500,10 +528,13 @@ public RMStateStoreState transition(RMStateStore store, boolean isFenced = false; try { LOG.info("Updating AMRMToken"); - store.storeOrUpdateAMRMTokenSecretManagerState( - amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate()); + long start = getClock().getTime(); + store.storeOrUpdateAMRMTokenSecretManagerState(amrmEvent.getAmrmTokenSecretManagerState(), + amrmEvent.isUpdate()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error storing info for AMRMTokenSecretManager", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -527,12 +558,13 @@ public RMStateStoreState transition(RMStateStore store, try { LOG.info("Storing reservation allocation." + reservationEvent .getReservationIdName()); - store.storeReservationState( - reservationEvent.getReservationAllocation(), - reservationEvent.getPlanName(), - reservationEvent.getReservationIdName()); + long start = getClock().getTime(); + store.storeReservationState(reservationEvent.getReservationAllocation(), + reservationEvent.getPlanName(), reservationEvent.getReservationIdName()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error while storing reservation allocation.", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -556,11 +588,13 @@ public RMStateStoreState transition(RMStateStore store, try { LOG.info("Removing reservation allocation." + reservationEvent .getReservationIdName()); - store.removeReservationState( - reservationEvent.getPlanName(), - reservationEvent.getReservationIdName()); + long start = getClock().getTime(); + store.removeReservationState(reservationEvent.getPlanName(), + reservationEvent.getReservationIdName()); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error while removing reservation allocation.", e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -588,9 +622,12 @@ public RMStateStoreState transition(RMStateStore store, ApplicationId appId = attemptId.getApplicationId(); LOG.info("Removing attempt " + attemptId + " from app: " + appId); try { + long start = getClock().getTime(); store.removeApplicationAttemptInternal(attemptId); + rmStateStoreOpDurations.addWriteOpsCallDuration(getClock().getTime() - start); } catch (Exception e) { LOG.error("Error removing attempt: " + attemptId, e); + rmStateStoreOpDurations.incrFailedOps(); isFenced = store.notifyStoreOperationFailedInternal(e); } return finalState(isFenced); @@ -605,6 +642,10 @@ public RMStateStore() { stateMachine = stateMachineFactory.make(this); } + public static Clock getClock() { + return SystemClock.getInstance(); + } + public static class RMDTSecretManagerState { // DTIdentifier -> renewDate Map delegationTokenState = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOpDurations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOpDurations.java new file mode 100644 index 0000000..e19026f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOpDurations.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * Class to capture the performance metrics of RMStateStore. + * This should be a singleton. + */ +@InterfaceAudience.Private @InterfaceStability.Unstable +@Metrics(context = "rmstatestore-op-durations") public class RMStateStoreOpDurations + implements MetricsSource { + + @Metric("Duration for a writeOps call") MutableRate writeOpsCallDuration; + + @Metric("Duration for a failedOps call") MutableCounterLong failedOps; + + + private static final MetricsInfo RECORD_INFO = + info("RMStateStoreOpDurations", "Durations of RMStateStore calls"); + + private final MetricsRegistry registry; + + private boolean isExtended = false; + + private static final RMStateStoreOpDurations INSTANCE = new RMStateStoreOpDurations(); + + + public static RMStateStoreOpDurations getInstance(boolean isExtended) { + INSTANCE.setExtended(isExtended); + return INSTANCE; + } + + private synchronized void setExtended(boolean isExtended) { + if (isExtended == INSTANCE.isExtended) { + return; + } + writeOpsCallDuration.setExtended(isExtended); + INSTANCE.isExtended = isExtended; + } + + private RMStateStoreOpDurations() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "RMStateStoreOpDurations"); + + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register(RECORD_INFO.name(), RECORD_INFO.description(), this); + } + } + + @Override public void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + public void addWriteOpsCallDuration(long value) { + writeOpsCallDuration.add(value); + } + + public void incrFailedOps() { + failedOps.incr(); + } + +}