diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 0fd346f..5c433bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -74,6 +74,8 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; @Private @Unstable @@ -102,6 +104,8 @@ protected ResourceManager resourceManager; private final ReadLock readLock; private final WriteLock writeLock; + private static RMStateStoreOpDurations rmStateStoreOpDurations = + RMStateStoreOpDurations.getInstance(true); public static final Log LOG = LogFactory.getLog(RMStateStore.class); @@ -216,7 +220,10 @@ public RMStateStoreState transition(RMStateStore store, appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Storing info for app: " + appId); try { + long start = getClock().getTime(); store.storeApplicationStateInternal(appId, appState); + rmStateStoreOpDurations.addStoreAppCallDuration( + getClock().getTime() - start); store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { @@ -247,7 +254,10 @@ public RMStateStoreState transition(RMStateStore store, appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Updating info for app: " + appId); try { + long start = getClock().getTime(); store.updateApplicationStateInternal(appId, appState); + rmStateStoreOpDurations.addUpdateStoreAppCallDuration( + getClock().getTime() - start); if (((RMStateUpdateAppEvent) event).isNotifyApplication()) { store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED)); @@ -287,7 +297,10 @@ public RMStateStoreState transition(RMStateStore store, appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Removing info for app: " + appId); try { + long start = getClock().getTime(); store.removeApplicationStateInternal(appState); + rmStateStoreOpDurations.addRemoveStoreAppCallDuration( + getClock().getTime() - start); } catch (Exception e) { LOG.error("Error removing app: " + appId, e); isFenced = store.notifyStoreOperationFailedInternal(e); @@ -314,11 +327,13 @@ public RMStateStoreState transition(RMStateStore store, if (LOG.isDebugEnabled()) { LOG.debug("Storing info for attempt: " + attemptState.getAttemptId()); } + long start = getClock().getTime(); store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(), attemptState); - store.notifyApplicationAttempt(new RMAppAttemptEvent - (attemptState.getAttemptId(), - RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); + rmStateStoreOpDurations.addStoreAppAttemptCallDuration( + getClock().getTime() - start); + store.notifyApplicationAttempt(new RMAppAttemptEvent(attemptState.getAttemptId(), + RMAppAttemptEventType.ATTEMPT_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e); isFenced = store.notifyStoreOperationFailedInternal(e); @@ -345,11 +360,13 @@ public RMStateStoreState transition(RMStateStore store, if (LOG.isDebugEnabled()) { LOG.debug("Updating info for attempt: " + attemptState.getAttemptId()); } + long start = getClock().getTime(); store.updateApplicationAttemptStateInternal(attemptState.getAttemptId(), attemptState); - store.notifyApplicationAttempt(new RMAppAttemptEvent - (attemptState.getAttemptId(), - RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); + rmStateStoreOpDurations.addUpdateAppAttemptCallDuration( + getClock().getTime() - start); + store.notifyApplicationAttempt(new RMAppAttemptEvent(attemptState.getAttemptId(), + RMAppAttemptEventType.ATTEMPT_UPDATE_SAVED)); } catch (Exception e) { LOG.error("Error updating appAttempt: " + attemptState.getAttemptId(), e); isFenced = store.notifyStoreOperationFailedInternal(e); @@ -373,8 +390,10 @@ public RMStateStoreState transition(RMStateStore store, RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Storing RMDelegationToken and SequenceNumber"); - store.storeRMDelegationTokenState( - dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate()); + long start = getClock().getTime(); + store.storeRMDelegationTokenState(dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate()); + rmStateStoreOpDurations.addStoreDelegationTokenCallDuration( + getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Storing RMDelegationToken and SequenceNumber ", e); @@ -399,7 +418,10 @@ public RMStateStoreState transition(RMStateStore store, RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Removing RMDelegationToken and SequenceNumber"); + long start = getClock().getTime(); store.removeRMDelegationTokenState(dtEvent.getRmDTIdentifier()); + rmStateStoreOpDurations.addRemoveDelegationTokenCalllDuration( + getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Removing RMDelegationToken and SequenceNumber ", e); @@ -424,8 +446,10 @@ public RMStateStoreState transition(RMStateStore store, RMStateStoreRMDTEvent dtEvent = (RMStateStoreRMDTEvent) event; try { LOG.info("Updating RMDelegationToken and SequenceNumber"); - store.updateRMDelegationTokenState( - dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate()); + long start = getClock().getTime(); + store.updateRMDelegationTokenState(dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate()); + rmStateStoreOpDurations.addUpdateDelegationTokenCallDuration( + getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Updating RMDelegationToken and SequenceNumber ", e); @@ -451,7 +475,10 @@ public RMStateStoreState transition(RMStateStore store, (RMStateStoreRMDTMasterKeyEvent) event; try { LOG.info("Storing RMDTMasterKey."); + long start = getClock().getTime(); store.storeRMDTMasterKeyState(dtEvent.getDelegationKey()); + rmStateStoreOpDurations.addStoreMasterKeyCallDuration( + getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Storing RMDTMasterKey.", e); isFenced = store.notifyStoreOperationFailedInternal(e); @@ -476,7 +503,10 @@ public RMStateStoreState transition(RMStateStore store, (RMStateStoreRMDTMasterKeyEvent) event; try { LOG.info("Removing RMDTMasterKey."); + long start = getClock().getTime(); store.removeRMDTMasterKeyState(dtEvent.getDelegationKey()); + rmStateStoreOpDurations.addRemoveMasterKeyCallDuration( + getClock().getTime() - start); } catch (Exception e) { LOG.error("Error While Removing RMDTMasterKey.", e); isFenced = store.notifyStoreOperationFailedInternal(e); @@ -500,8 +530,11 @@ public RMStateStoreState transition(RMStateStore store, boolean isFenced = false; try { LOG.info("Updating AMRMToken"); - store.storeOrUpdateAMRMTokenSecretManagerState( - amrmEvent.getAmrmTokenSecretManagerState(), amrmEvent.isUpdate()); + long start = getClock().getTime(); + store.storeOrUpdateAMRMTokenSecretManagerState(amrmEvent.getAmrmTokenSecretManagerState(), + amrmEvent.isUpdate()); + rmStateStoreOpDurations.addUpdateAMRMTokenCallDuration( + getClock().getTime() - start); } catch (Exception e) { LOG.error("Error storing info for AMRMTokenSecretManager", e); isFenced = store.notifyStoreOperationFailedInternal(e); @@ -527,10 +560,11 @@ public RMStateStoreState transition(RMStateStore store, try { LOG.info("Storing reservation allocation." + reservationEvent .getReservationIdName()); - store.storeReservationState( - reservationEvent.getReservationAllocation(), - reservationEvent.getPlanName(), - reservationEvent.getReservationIdName()); + long start = getClock().getTime(); + store.storeReservationState(reservationEvent.getReservationAllocation(), + reservationEvent.getPlanName(), reservationEvent.getReservationIdName()); + rmStateStoreOpDurations.addStoreReservationAllocationCallDuration( + getClock().getTime() - start); } catch (Exception e) { LOG.error("Error while storing reservation allocation.", e); isFenced = store.notifyStoreOperationFailedInternal(e); @@ -556,9 +590,11 @@ public RMStateStoreState transition(RMStateStore store, try { LOG.info("Removing reservation allocation." + reservationEvent .getReservationIdName()); - store.removeReservationState( - reservationEvent.getPlanName(), - reservationEvent.getReservationIdName()); + long start = getClock().getTime(); + store.removeReservationState(reservationEvent.getPlanName(), + reservationEvent.getReservationIdName()); + rmStateStoreOpDurations.addRemoveReservationAllocationCallDuration( + getClock().getTime() - start); } catch (Exception e) { LOG.error("Error while removing reservation allocation.", e); isFenced = store.notifyStoreOperationFailedInternal(e); @@ -605,6 +641,10 @@ public RMStateStore() { stateMachine = stateMachineFactory.make(this); } + public static Clock getClock() { + return SystemClock.getInstance(); + } + public static class RMDTSecretManagerState { // DTIdentifier -> renewDate Map delegationTokenState = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOpDurations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOpDurations.java new file mode 100644 index 0000000..3b52782 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreOpDurations.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableRate; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * Class to capture the performance metrics of RMStateStore. + * This should be a singleton. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +@Metrics(context="rmstatestore-op-durations") +public class RMStateStoreOpDurations implements MetricsSource { + + @Metric("Duration for a storeApp call") + MutableRate storeAppCall; + + @Metric("Duration for a updateApp call") + MutableRate updateAppCall; + + @Metric("Duration for a removeApp call") + MutableRate removeAppCall; + + @Metric("Duration for a storeAppAttempt call") + MutableRate storeAppAttemptCall; + + @Metric("Duration for a updateAppAttempt call") + MutableRate updateAppAttemptCall; + + @Metric("Duration for a storeMasterKey call") + MutableRate storeMasterKeyCall; + + @Metric("Duration for a removeMasterKey call") + MutableRate removeMasterKeyCall; + + @Metric("Duration for a storeDelegationToken call") + MutableRate storeDelegationTokenCall; + + @Metric("Duration for a removeDelegationToken call") + MutableRate removeDelegationTokenCall; + + @Metric("Duration for a updateDelegationToken call") + MutableRate updateDelegationTokenCall; + + @Metric("Duration for a updateAMRMToken call") + MutableRate updateAMRMTokenCall; + + @Metric("Duration for a storeReservationAllocation call") + MutableRate storeReservationAllocationCall; + + @Metric("Duration for a storeReservationAllocation call") + MutableRate removeReservationAllocationCall; + + private static final RMStateStoreOpDurations INSTANCE = new RMStateStoreOpDurations(); + + private static final MetricsInfo RECORD_INFO = + info("RMStateStoreOpDurations", "Durations of RMStateStore calls"); + + private final MetricsRegistry registry; + + private boolean isExtended = false; + + + public static RMStateStoreOpDurations getInstance(boolean isExtended) { + INSTANCE.setExtended(isExtended); + return INSTANCE; + } + + private synchronized void setExtended(boolean isExtended) { + if (isExtended == INSTANCE.isExtended) { + return; + } + + storeAppCall.setExtended(isExtended); + updateAppCall.setExtended(isExtended); + removeAppCall.setExtended(isExtended); + storeAppAttemptCall.setExtended(isExtended); + updateAppAttemptCall.setExtended(isExtended); + storeMasterKeyCall.setExtended(isExtended); + removeMasterKeyCall.setExtended(isExtended); + storeDelegationTokenCall.setExtended(isExtended); + removeDelegationTokenCall.setExtended(isExtended); + updateDelegationTokenCall.setExtended(isExtended); + updateAMRMTokenCall.setExtended(isExtended); + storeReservationAllocationCall.setExtended(isExtended); + removeReservationAllocationCall.setExtended(isExtended); + + INSTANCE.isExtended = isExtended; + } + + private RMStateStoreOpDurations() { + registry = new MetricsRegistry(RECORD_INFO); + registry.tag(RECORD_INFO, "RMStateStoreOpDurations"); + + MetricsSystem ms = DefaultMetricsSystem.instance(); + if (ms != null) { + ms.register(RECORD_INFO.name(), RECORD_INFO.description(), this); + } + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + public void addStoreAppCallDuration(long value) { + storeAppCall.add(value); + } + + public void addUpdateStoreAppCallDuration(long value) { + updateAppCall.add(value); + } + + public void addRemoveStoreAppCallDuration(long value) { + removeAppCall.add(value); + } + + public void addStoreAppAttemptCallDuration(long value) { + storeAppAttemptCall.add(value); + } + + public void addUpdateAppAttemptCallDuration(long value) { + updateAppAttemptCall.add(value); + } + + public void addStoreMasterKeyCallDuration(long value) { + storeMasterKeyCall.add(value); + } + + public void addRemoveMasterKeyCallDuration(long value) { + removeMasterKeyCall.add(value); + } + + public void addStoreDelegationTokenCallDuration(long value) { + storeDelegationTokenCall.add(value); + } + + public void addRemoveDelegationTokenCalllDuration(long value) { + removeDelegationTokenCall.add(value); + } + + public void addUpdateDelegationTokenCallDuration(long value) { + updateDelegationTokenCall.add(value); + } + + public void addUpdateAMRMTokenCallDuration(long value) { + updateAMRMTokenCall.add(value); + } + + public void addStoreReservationAllocationCallDuration(long value) { + storeReservationAllocationCall.add(value); + } + + public void addRemoveReservationAllocationCallDuration(long value) { + storeReservationAllocationCall.add(value); + } +}