From 03a055ad7fcb423dd8122823b86166aef81b2610 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Fri, 4 Nov 2016 20:14:30 +0530 Subject: [PATCH] YARN-5611 --- .../apache/hadoop/mapred/TestClientRedirect.java | 9 + .../hadoop/yarn/api/ApplicationClientProtocol.java | 21 ++ .../UpdateApplicationTimeoutsRequest.java | 83 ++++++++ .../UpdateApplicationTimeoutsResponse.java | 69 +++++++ .../api/records/ApplicationSubmissionContext.java | 4 + .../apache/hadoop/yarn/conf/YarnConfiguration.java | 4 +- .../yarn/exceptions/StateStoreFailedException.java | 41 ++++ .../main/proto/applicationclient_protocol.proto | 1 + .../src/main/proto/yarn_protos.proto | 5 + .../src/main/proto/yarn_service_protos.proto | 9 + .../ApplicationClientProtocolPBClientImpl.java | 21 +- .../ApplicationClientProtocolPBServiceImpl.java | 22 +++ .../pb/UpdateApplicationTimeoutsRequestPBImpl.java | 220 +++++++++++++++++++++ .../UpdateApplicationTimeoutsResponsePBImpl.java | 181 +++++++++++++++++ .../yarn/util/AbstractLivelinessMonitor.java | 17 +- .../java/org/apache/hadoop/yarn/util/Times.java | 35 ++++ .../src/main/resources/yarn-default.xml | 4 +- .../amrmproxy/MockResourceManagerFacade.java | 9 + .../server/resourcemanager/ClientRMService.java | 131 +++++++++--- .../yarn/server/resourcemanager/RMAppManager.java | 34 ++++ .../yarn/server/resourcemanager/RMAuditLogger.java | 4 +- .../yarn/server/resourcemanager/RMServerUtils.java | 47 ++++- .../resourcemanager/recovery/RMStateStore.java | 28 ++- .../recovery/RMStateUpdateAppEvent.java | 15 +- .../recovery/records/ApplicationStateData.java | 27 +++ .../impl/pb/ApplicationStateDataPBImpl.java | 86 ++++++++ .../yarn/server/resourcemanager/rmapp/RMApp.java | 11 ++ .../server/resourcemanager/rmapp/RMAppImpl.java | 103 ++++++++-- .../resourcemanager/rmapp/RMAppUpdateType.java | 26 +++ .../rmapp/monitor/RMAppLifetimeMonitor.java | 66 +++---- .../scheduler/capacity/CapacityScheduler.java | 1 + .../yarn_server_resourcemanager_recovery.proto | 1 + .../applicationsmanager/MockAsm.java | 16 ++ .../server/resourcemanager/rmapp/MockRMApp.java | 16 ++ .../rmapp/TestApplicationLifetimeMonitor.java | 57 +++++- 35 files changed, 1316 insertions(+), 108 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/StateStoreFailedException.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppUpdateType.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index 255f998..65eac65 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -124,6 +124,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -485,6 +487,13 @@ public SignalContainerResponse signalToContainer( SignalContainerRequest request) throws IOException { return null; } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return null; + } } class HistoryService extends AMService implements HSClientProtocol { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 8ee43fb..0e377c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -59,6 +59,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -566,4 +568,23 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( SignalContainerResponse signalToContainer( SignalContainerRequest request) throws YarnException, IOException; + + /** + *

+ * The interface used by client to set ApplicationTimeouts of an application. + * The UpdateApplicationTimeoutsRequest should have timeout value with + * absolute time with ISO8601 format yyyy-MM-dd'T'HH:mm:ss.SSSZ. + *

+ * @param request to set ApplicationTimeouts of an application + * @return an empty response that the update has completed successfully. + * @throws YarnException if update request has empty values or applicatioin is + * in completing states. + * @throws IOException on IO failures + */ + @Public + @Unstable + @Idempotent + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java new file mode 100644 index 0000000..f0ed388 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * The request sent by the client to the ResourceManager to set or + * update the application timeout. + *

+ *

+ * The request includes the {@link ApplicationId} of the application and timeout + * to be set for an application + *

+ * @see ApplicationClientProtocol#updateApplicationTimeouts(UpdateApplicationTimeoutsRequest) + */ +@Public +@Unstable +public abstract class UpdateApplicationTimeoutsRequest { + public static UpdateApplicationTimeoutsRequest newInstance( + ApplicationId applicationId, + Map applicationTimeouts) { + UpdateApplicationTimeoutsRequest request = + Records.newRecord(UpdateApplicationTimeoutsRequest.class); + request.setApplicationId(applicationId); + request.setApplicationTimeouts(applicationTimeouts); + return request; + } + + /** + * Get the ApplicationId of the application. + * @return ApplicationId of the application + */ + public abstract ApplicationId getApplicationId(); + + /** + * Set the ApplicationId of the application. + * @param applicationId ApplicationId of the application + */ + public abstract void setApplicationId(ApplicationId applicationId); + + /** + * Get ApplicationTimeouts of the application. Timeout value is + * in ISO8601 standard with format yyyy-MM-dd'T'HH:mm:ss.SSSZ. + * @return all ApplicationTimeouts of the application. + */ + public abstract Map getApplicationTimeouts(); + + /** + * Set the ApplicationTimeouts for the application. Timeout value + * is absolute. Timeout value should meet ISO8601 format. Support ISO8601 + * format is yyyy-MM-dd'T'HH:mm:ss.SSSZ. All pre-existing Map entries are + * cleared before adding the new Map. + * @param applicationTimeouts ApplicationTimeoutss for the + * application + */ + public abstract void setApplicationTimeouts( + Map applicationTimeouts); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java new file mode 100644 index 0000000..91dccbd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.util.Records; + +/** + *

+ * The response sent by the ResourceManager to the client on update + * application timeout. + *

+ *

+ * A response without exception means that the update has completed + * successfully. + *

+ * @see ApplicationClientProtocol#updateApplicationTimeouts(UpdateApplicationTimeoutsRequest) + */ +@Public +@Unstable +public abstract class UpdateApplicationTimeoutsResponse { + + public static UpdateApplicationTimeoutsResponse newInstance( + Map applicationTimeouts) { + UpdateApplicationTimeoutsResponse response = + Records.newRecord(UpdateApplicationTimeoutsResponse.class); + response.setApplicationTimeouts(applicationTimeouts); + return response; + } + + /** + * Get ApplicationTimeouts of the application. Timeout value is + * in ISO8601 standard with format yyyy-MM-dd'T'HH:mm:ss.SSSZ. + * @return all ApplicationTimeouts of the application. + */ + public abstract Map getApplicationTimeouts(); + + /** + * Set the ApplicationTimeouts for the application. Timeout value + * is absolute. Timeout value should meet ISO8601 format. Support ISO8601 + * format is yyyy-MM-dd'T'HH:mm:ss.SSSZ. All pre-existing Map entries + * are cleared before adding the new Map. + * @param applicationTimeouts ApplicationTimeoutss for the + * application + */ + public abstract void setApplicationTimeouts( + Map applicationTimeouts); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java index 83f601a..e562aaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java @@ -549,6 +549,10 @@ public abstract void setLogAggregationContext( /** * Set the ApplicationTimeouts for the application in seconds. * All pre-existing Map entries are cleared before adding the new Map. + *

+ * Note: If application timeout value is less than or equal to zero + * then application submission will throw an exception. + *

* @param applicationTimeouts ApplicationTimeoutss for the * application */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3f84a23..b216f42 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1530,8 +1530,8 @@ public static boolean isAclEnabled(Configuration conf) { // Configurations for applicaiton life time monitor feature - public static final String RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS = - RM_PREFIX + "application-timeouts.lifetime-monitor.interval-ms"; + public static final String RM_APPLICATION_MONITOR_INTERVAL_MS = + RM_PREFIX + "application-timeouts.monitor.interval-ms"; public static final long DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS = 60000; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/StateStoreFailedException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/StateStoreFailedException.java new file mode 100644 index 0000000..9bf900d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/StateStoreFailedException.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +@Public +@Unstable +public class StateStoreFailedException extends YarnException { + + private static final long serialVersionUID = 8694508L; + + public StateStoreFailedException(Throwable cause) { + super(cause); + } + + public StateStoreFailedException(String message) { + super(message); + } + + public StateStoreFailedException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto index f1c3839..ba79db0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto @@ -60,4 +60,5 @@ service ApplicationClientProtocolService { rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto); rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto); rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto); + rpc updateApplicationTimeouts (UpdateApplicationTimeoutsRequestProto) returns (UpdateApplicationTimeoutsResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 9c746fd..b59d02b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -377,6 +377,11 @@ message ApplicationTimeoutMapProto { optional int64 timeout = 2; } +message ApplicationUpdateTimeoutMapProto { + optional ApplicationTimeoutTypeProto application_timeout_type = 1; + optional string expire_time = 2; +} + message LogAggregationContextProto { optional string include_pattern = 1 [default = ".*"]; optional string exclude_pattern = 2 [default = ""]; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 6526bf9..d9230d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -267,6 +267,15 @@ message SignalContainerRequestProto { message SignalContainerResponseProto { } +message UpdateApplicationTimeoutsRequestProto { + required ApplicationIdProto applicationId = 1; + repeated ApplicationUpdateTimeoutMapProto application_timeouts = 2; +} + +message UpdateApplicationTimeoutsResponseProto { + repeated ApplicationUpdateTimeoutMapProto application_timeouts = 1; +} + ////////////////////////////////////////////////////// /////// client_NM_Protocol /////////////////////////// ////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java index 2d755a2..ad7cb29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java @@ -83,6 +83,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -139,6 +141,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; @@ -165,7 +169,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import com.google.protobuf.ServiceException; @@ -600,4 +604,19 @@ public SignalContainerResponse signalToContainer( return null; } } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + UpdateApplicationTimeoutsRequestProto requestProto = + ((UpdateApplicationTimeoutsRequestPBImpl) request).getProto(); + try { + return new UpdateApplicationTimeoutsResponsePBImpl( + proxy.updateApplicationTimeouts(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java index 300ef57..93ce6a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; @@ -111,6 +112,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -162,6 +165,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto; @@ -609,4 +614,21 @@ public SignalContainerResponseProto signalToContainer( throw new ServiceException(e); } } + + @Override + public UpdateApplicationTimeoutsResponseProto updateApplicationTimeouts( + RpcController controller, UpdateApplicationTimeoutsRequestProto proto) + throws ServiceException { + UpdateApplicationTimeoutsRequestPBImpl request = + new UpdateApplicationTimeoutsRequestPBImpl(proto); + try { + UpdateApplicationTimeoutsResponse response = + real.updateApplicationTimeouts(request); + return ((UpdateApplicationTimeoutsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java new file mode 100644 index 0000000..1f86c55 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationUpdateTimeoutMapProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class UpdateApplicationTimeoutsRequestPBImpl + extends UpdateApplicationTimeoutsRequest { + + UpdateApplicationTimeoutsRequestProto proto = + UpdateApplicationTimeoutsRequestProto.getDefaultInstance(); + UpdateApplicationTimeoutsRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + private Map applicationTimeouts = null; + + public UpdateApplicationTimeoutsRequestPBImpl() { + builder = UpdateApplicationTimeoutsRequestProto.newBuilder(); + } + + public UpdateApplicationTimeoutsRequestPBImpl( + UpdateApplicationTimeoutsRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public UpdateApplicationTimeoutsRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UpdateApplicationTimeoutsRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + if (this.applicationTimeouts != null) { + addApplicationTimeouts(); + } + } + + @Override + public ApplicationId getApplicationId() { + UpdateApplicationTimeoutsRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (this.applicationId != null) { + return applicationId; + } // Else via proto + if (!p.hasApplicationId()) { + return null; + } + applicationId = convertFromProtoFormat(p.getApplicationId()); + return applicationId; + } + + @Override + public void setApplicationId(ApplicationId applicationId) { + maybeInitBuilder(); + if (applicationId == null) { + builder.clearApplicationId(); + } + this.applicationId = applicationId; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public Map getApplicationTimeouts() { + initApplicationTimeout(); + return this.applicationTimeouts; + } + + private void initApplicationTimeout() { + if (this.applicationTimeouts != null) { + return; + } + UpdateApplicationTimeoutsRequestProtoOrBuilder p = + viaProto ? proto : builder; + List lists = + p.getApplicationTimeoutsList(); + this.applicationTimeouts = + new HashMap(lists.size()); + for (ApplicationUpdateTimeoutMapProto timeoutProto : lists) { + this.applicationTimeouts.put( + ProtoUtils + .convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()), + timeoutProto.getExpireTime()); + } + } + + @Override + public void setApplicationTimeouts( + Map appTimeouts) { + if (appTimeouts == null) { + return; + } + initApplicationTimeout(); + this.applicationTimeouts.clear(); + this.applicationTimeouts.putAll(appTimeouts); + } + + private void addApplicationTimeouts() { + maybeInitBuilder(); + builder.clearApplicationTimeouts(); + if (applicationTimeouts == null) { + return; + } + Iterable values = + new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iterator = + applicationTimeouts.keySet().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ApplicationUpdateTimeoutMapProto next() { + ApplicationTimeoutType key = iterator.next(); + return ApplicationUpdateTimeoutMapProto.newBuilder() + .setExpireTime(applicationTimeouts.get(key)) + .setApplicationTimeoutType( + ProtoUtils.convertToProtoFormat(key)) + .build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllApplicationTimeouts(values); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java new file mode 100644 index 0000000..0c94f97 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationUpdateTimeoutMapProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class UpdateApplicationTimeoutsResponsePBImpl + extends UpdateApplicationTimeoutsResponse { + UpdateApplicationTimeoutsResponseProto proto = + UpdateApplicationTimeoutsResponseProto.getDefaultInstance(); + UpdateApplicationTimeoutsResponseProto.Builder builder = null; + boolean viaProto = false; + private Map applicationTimeouts = null; + + public UpdateApplicationTimeoutsResponsePBImpl() { + builder = UpdateApplicationTimeoutsResponseProto.newBuilder(); + } + + public UpdateApplicationTimeoutsResponsePBImpl( + UpdateApplicationTimeoutsResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public UpdateApplicationTimeoutsResponseProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UpdateApplicationTimeoutsResponseProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.applicationTimeouts != null) { + addApplicationTimeouts(); + } + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public Map getApplicationTimeouts() { + initApplicationTimeout(); + return this.applicationTimeouts; + } + + private void initApplicationTimeout() { + if (this.applicationTimeouts != null) { + return; + } + UpdateApplicationTimeoutsResponseProtoOrBuilder p = + viaProto ? proto : builder; + List lists = + p.getApplicationTimeoutsList(); + this.applicationTimeouts = + new HashMap(lists.size()); + for (ApplicationUpdateTimeoutMapProto timeoutProto : lists) { + this.applicationTimeouts.put( + ProtoUtils + .convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()), + timeoutProto.getExpireTime()); + } + } + + @Override + public void setApplicationTimeouts( + Map appTimeouts) { + if (appTimeouts == null) { + return; + } + initApplicationTimeout(); + this.applicationTimeouts.clear(); + this.applicationTimeouts.putAll(appTimeouts); + } + + private void addApplicationTimeouts() { + maybeInitBuilder(); + builder.clearApplicationTimeouts(); + if (applicationTimeouts == null) { + return; + } + Iterable values = + new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iterator = + applicationTimeouts.keySet().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ApplicationUpdateTimeoutMapProto next() { + ApplicationTimeoutType key = iterator.next(); + return ApplicationUpdateTimeoutMapProto.newBuilder() + .setExpireTime(applicationTimeouts.get(key)) + .setApplicationTimeoutType( + ProtoUtils.convertToProtoFormat(key)) + .build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllApplicationTimeouts(values); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java index b605026..638128e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java @@ -46,6 +46,7 @@ public static final int DEFAULT_EXPIRE = 5*60*1000;//5 mins private long expireInterval = DEFAULT_EXPIRE; private long monitorInterval = expireInterval / 3; + private volatile boolean resetTimerOnStart = true; private final Clock clock; @@ -105,8 +106,8 @@ public synchronized void register(O ob) { register(ob, clock.getTime()); } - public synchronized void register(O ob, long monitorStartTime) { - running.put(ob, monitorStartTime); + public synchronized void register(O ob, long expireTime) { + running.put(ob, expireTime); } public synchronized void unregister(O ob) { @@ -114,12 +115,18 @@ public synchronized void unregister(O ob) { } public synchronized void resetTimer() { - long time = clock.getTime(); - for (O ob : running.keySet()) { - running.put(ob, time); + if (resetTimerOnStart) { + long time = clock.getTime(); + for (O ob : running.keySet()) { + running.put(ob, time); + } } } + protected void setResetTimeOnStart(boolean resetTimeOnStart) { + this.resetTimerOnStart = resetTimeOnStart; + } + private class PingChecker implements Runnable { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Times.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Times.java index 8ae3842..5e194c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Times.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/Times.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.util; +import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; @@ -29,6 +30,8 @@ public class Times { private static final Log LOG = LogFactory.getLog(Times.class); + static final String ISO8601DATEFORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"; + // This format should match the one used in yarn.dt.plugins.js static final ThreadLocal dateFormat = new ThreadLocal() { @@ -37,6 +40,14 @@ } }; + static final ThreadLocal isoFormat = + new ThreadLocal() { + @Override + protected SimpleDateFormat initialValue() { + return new SimpleDateFormat(ISO8601DATEFORMAT); + } + }; + public static long elapsed(long started, long finished) { return Times.elapsed(started, finished, true); } @@ -74,4 +85,28 @@ public static String format(long ts) { return ts > 0 ? String.valueOf(dateFormat.get().format(new Date(ts))) : "N/A"; } + + /** + * Given a time stamp returns ISO-8601 formated string in format + * "yyyy-MM-dd'T'HH:mm:ss.SSSZ". + * + * @param ts to be formatted in ISO format. + * @return ISO 8601 formatted string. + */ + public static String formatISO8601(long ts) { + return isoFormat.get().format(new Date(ts)); + } + + /** + * Given ISO formatted string with format "yyyy-MM-dd'T'HH:mm:ss.SSSZ", return + * epoch time for local Time zone. + * + * @param isoString in format of "yyyy-MM-dd'T'HH:mm:ss.SSSZ". + * @return epoch time for local time zone. + * @throws ParseException if given ISO formatted string can not be parsed. + */ + public static long parseISO8601ToLocalTimeInMillis(String isoString) + throws ParseException { + return isoFormat.get().parse(isoString).getTime(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c265e86..04baba9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3034,9 +3034,9 @@ - The RMAppLifetimeMonitor Service uses this value as lifetime monitor interval + The RMAppLifetimeMonitor Service uses this value as monitor interval - yarn.resourcemanager.application-timeouts.lifetime-monitor.interval-ms + yarn.resourcemanager.application-timeouts.monitor.interval-ms 60000 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java index f02e306..c69313f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -91,6 +91,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.AMCommand; @@ -497,4 +499,11 @@ public FailApplicationAttemptResponse failApplicationAttempt( FailApplicationAttemptRequest request) throws YarnException, IOException { throw new NotImplementedException(); } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index e9bd230..4f9de91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -108,12 +108,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -1589,37 +1592,11 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( ApplicationId applicationId = request.getApplicationId(); Priority newAppPriority = request.getApplicationPriority(); - UserGroupInformation callerUGI; - try { - callerUGI = UserGroupInformation.getCurrentUser(); - } catch (IOException ie) { - LOG.info("Error getting UGI ", ie); - RMAuditLogger.logFailure("UNKNOWN", AuditConstants.UPDATE_APP_PRIORITY, - "UNKNOWN", "ClientRMService", "Error getting UGI", applicationId); - throw RPCUtil.getRemoteException(ie); - } - RMApp application = this.rmContext.getRMApps().get(applicationId); - if (application == null) { - RMAuditLogger.logFailure(callerUGI.getUserName(), - AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", - "Trying to update priority of an absent application", applicationId); - throw new ApplicationNotFoundException( - "Trying to update priority of an absent application " - + applicationId); - } - - if (!checkAccess(callerUGI, application.getUser(), - ApplicationAccessType.MODIFY_APP, application)) { - RMAuditLogger.logFailure(callerUGI.getShortUserName(), - AuditConstants.UPDATE_APP_PRIORITY, - "User doesn't have permissions to " - + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService", - AuditConstants.UNAUTHORIZED_USER, applicationId); - throw RPCUtil.getRemoteException(new AccessControlException("User " - + callerUGI.getShortUserName() + " cannot perform operation " - + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); - } + UserGroupInformation callerUGI = + getCallerUgi(applicationId, AuditConstants.UPDATE_APP_PRIORITY); + RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, + AuditConstants.UPDATE_APP_PRIORITY); UpdateApplicationPriorityResponse response = recordFactory .newRecordInstance(UpdateApplicationPriorityResponse.class); @@ -1724,4 +1701,98 @@ public SignalContainerResponse signalToContainer( .newRecordInstance(SignalContainerResponse.class); } + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + + ApplicationId applicationId = request.getApplicationId(); + Map applicationTimeouts = + request.getApplicationTimeouts(); + + UserGroupInformation callerUGI = + getCallerUgi(applicationId, AuditConstants.UPDATE_APP_TIMEOUTS); + RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, + AuditConstants.UPDATE_APP_TIMEOUTS); + + if (applicationTimeouts.isEmpty()) { + String message = + "At least one ApplicationTimeoutType should be configured" + + " for updating timeouts."; + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", + message, applicationId); + throw RPCUtil.getRemoteException(message); + } + + UpdateApplicationTimeoutsResponse response = recordFactory + .newRecordInstance(UpdateApplicationTimeoutsResponse.class); + + // do not allow to update before saving to state store and in completed + // states. + if (RMAppState.NEW.equals(application.getState()) + || COMPLETED_APP_STATES.contains(application.getState())) { + String msg = "Application is in " + + RMServerUtils.createApplicationState(application.getState()) + + " state can not update timeout."; + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", + msg); + throw RPCUtil.getRemoteException(msg); + } + + try { + rmAppManager.updateApplicationTimeout(application, applicationTimeouts); + response.setApplicationTimeouts(applicationTimeouts); + } catch (YarnException ex) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", + ex.getMessage()); + throw RPCUtil.getRemoteException(ex); + } + + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", applicationId); + return response; + } + + private UserGroupInformation getCallerUgi(ApplicationId applicationId, + String operation) throws YarnException { + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + RMAuditLogger.logFailure("UNKNOWN", operation, "UNKNOWN", + "ClientRMService", "Error getting UGI", applicationId); + throw RPCUtil.getRemoteException(ie); + } + return callerUGI; + } + + private RMApp verifyUserAccessForRMApp(ApplicationId applicationId, + UserGroupInformation callerUGI, String operation) throws YarnException { + RMApp application = this.rmContext.getRMApps().get(applicationId); + if (application == null) { + RMAuditLogger.logFailure(callerUGI.getUserName(), operation, "UNKNOWN", + "ClientRMService", + "Trying to " + operation + " of an absent application", + applicationId); + throw new ApplicationNotFoundException("Trying to " + operation + + " of an absent application " + applicationId); + } + + if (!checkAccess(callerUGI, application.getUser(), + ApplicationAccessType.MODIFY_APP, application)) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), operation, + "User doesn't have permissions to " + + ApplicationAccessType.MODIFY_APP.toString(), + "ClientRMService", AuditConstants.UNAUTHORIZED_USER, applicationId); + throw RPCUtil.getRemoteException(new AccessControlException("User " + + callerUGI.getShortUserName() + " cannot perform operation " + + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); + } + return application; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index c065b60..c3a3fb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -34,12 +34,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.StateStoreFailedException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.security.AccessRequest; @@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -66,6 +69,8 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.SettableFuture; /** * This class manages the list of applications for the resource manager. @@ -509,4 +514,33 @@ public void handle(RMAppManagerEvent event) { LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } } + + // transaction method. + public void updateApplicationTimeout(RMApp app, + Map newTimeoutInISO8601Format) + throws YarnException { + ApplicationId applicationId = app.getApplicationId(); + synchronized (applicationId) { + Map newExpireTime = RMServerUtils + .validateISO8601AndConvertToLocalTimeEpoch(newTimeoutInISO8601Format); + + SettableFuture future = SettableFuture.create(); + + Map currentApplicationTimeouts = + app.getApplicationTimeouts(); + + try { + app.updateApplicationTimeout(RMAppUpdateType.UPDATE, newExpireTime, + currentApplicationTimeouts, future); + Futures.get(future, YarnException.class); + } catch (YarnException ex) { + if (ex instanceof StateStoreFailedException) { + // do roll back only from memory since state-store has failed. + app.updateApplicationTimeout(RMAppUpdateType.ROLLBACK, newExpireTime, + currentApplicationTimeouts, null); + } + throw ex; + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index 0361059..d52e002 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -63,7 +63,9 @@ public static final String ALLOC_CONTAINER = "AM Allocated Container"; public static final String RELEASE_CONTAINER = "AM Released Container"; public static final String UPDATE_APP_PRIORITY = - "Update Application Priority Request"; + "Update Application Priority"; + public static final String UPDATE_APP_TIMEOUTS = + "Update Application Timeouts"; public static final String CHANGE_CONTAINER_RESOURCE = "AM Changed Container Resource"; public static final String SIGNAL_CONTAINER = "Signal Container Request"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index b2a085a..6990935 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.text.ParseException; import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; @@ -69,6 +70,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -89,6 +93,8 @@ protected static final RecordFactory RECORD_FACTORY = RecordFactoryProvider.getRecordFactory(null); + private static Clock clock = SystemClock.getInstance(); + public static List queryRMNodes(RMContext context, EnumSet acceptedStates) { // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING. @@ -398,6 +404,7 @@ public static YarnApplicationState createApplicationState( case FINISHING: case FINISHED: return YarnApplicationState.FINISHED; + case KILLING: case KILLED: return YarnApplicationState.KILLED; case FAILED: @@ -475,7 +482,7 @@ public static void validateApplicationTimeouts( if (timeouts != null) { for (Map.Entry timeout : timeouts .entrySet()) { - if (timeout.getValue() < 0) { + if (timeout.getValue() <= 0) { String message = "Invalid application timeout, value=" + timeout.getValue() + " for type=" + timeout.getKey(); throw new YarnException(message); @@ -483,4 +490,42 @@ public static void validateApplicationTimeouts( } } } + + /** + * Validate ISO8601 format with epoch time. + * @param timeoutsInISO8601 format + * @return Map expire time in local epoch + * @throws YarnException + */ + public static Map validateISO8601AndConvertToLocalTimeEpoch( + Map timeoutsInISO8601) + throws YarnException { + long currentTimeMillis = clock.getTime(); + Map newApplicationTimeout = + new HashMap(); + if (timeoutsInISO8601 != null) { + for (Map.Entry timeout : timeoutsInISO8601 + .entrySet()) { + long expireTime = 0L; + try { + expireTime = + Times.parseISO8601ToLocalTimeInMillis(timeout.getValue()); + } catch (ParseException ex) { + String message = + "Expire time is not in ISO8601 format. ISO8601 supported " + + "format is yyyy-MM-dd'T'HH:mm:ss.SSSZ"; + throw new YarnException(message); + } + if (expireTime < currentTimeMillis) { + String message = + "Expire time is less than current time, current-time=" + + Times.formatISO8601(currentTimeMillis) + " expire-time=" + + Times.formatISO8601(expireTime); + throw new YarnException(message); + } + newApplicationTimeout.put(timeout.getKey(), expireTime); + } + } + return newApplicationTimeout; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index de273c4..779bdd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -31,13 +31,14 @@ import javax.crypto.SecretKey; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.SettableFuture; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; -import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.AbstractService; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.StateStoreFailedException; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; @@ -237,6 +239,8 @@ public RMStateStoreState transition(RMStateStore store, boolean isFenced = false; ApplicationStateData appState = ((RMStateUpdateAppEvent) event).getAppState(); + SettableFuture result = + ((RMStateUpdateAppEvent) event).getResult(); ApplicationId appId = appState.getApplicationSubmissionContext().getApplicationId(); LOG.info("Updating info for app: " + appId); @@ -246,9 +250,17 @@ public RMStateStoreState transition(RMStateStore store, store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_UPDATE_SAVED)); } + + if (result != null) { + result.set(null); + } + } catch (Exception e) { LOG.error("Error updating app: " + appId, e); isFenced = store.notifyStoreOperationFailedInternal(e); + if (result != null) { + result.setException(new StateStoreFailedException(e)); + } } return finalState(isFenced); }; @@ -774,18 +786,26 @@ public void storeNewApplication(RMApp app) { ApplicationStateData appState = ApplicationStateData.newInstance(app.getSubmitTime(), app.getStartTime(), context, app.getUser(), app.getCallerContext()); + appState.setApplicationTimeouts(app.getApplicationTimeouts()); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); } @SuppressWarnings("unchecked") - public void updateApplicationState( - ApplicationStateData appState) { + public void updateApplicationState(ApplicationStateData appState) { dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); } + @SuppressWarnings("unchecked") + public void updateApplicationState( + ApplicationStateData appState, boolean notifyApp, + SettableFuture future) { + dispatcher.getEventHandler() + .handle(new RMStateUpdateAppEvent(appState, notifyApp, future)); + } + public void updateApplicationStateSynchronously( ApplicationStateData appState, boolean notifyApp) { - handleStoreEvent(new RMStateUpdateAppEvent(appState, notifyApp)); + handleStoreEvent(new RMStateUpdateAppEvent(appState, notifyApp, null)); } public void updateFencedState() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java index 69169dd..0a6220b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateUpdateAppEvent.java @@ -20,21 +20,28 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import com.google.common.util.concurrent.SettableFuture; + public class RMStateUpdateAppEvent extends RMStateStoreEvent { private final ApplicationStateData appState; // After application state is updated in state store, // should notify back to application or not private boolean notifyApplication; + private SettableFuture future; public RMStateUpdateAppEvent(ApplicationStateData appState) { super(RMStateStoreEventType.UPDATE_APP); this.appState = appState; this.notifyApplication = true; + this.future = null; } - public RMStateUpdateAppEvent(ApplicationStateData appState, boolean notifyApp) { - this(appState); + public RMStateUpdateAppEvent(ApplicationStateData appState, boolean notifyApp, + SettableFuture future) { + super(RMStateStoreEventType.UPDATE_APP); + this.appState = appState; this.notifyApplication = notifyApp; + this.future = future; } public ApplicationStateData getAppState() { @@ -44,4 +51,8 @@ public ApplicationStateData getAppState() { public boolean isNotifyApplication() { return notifyApplication; } + + public SettableFuture getResult() { + return future; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java index 2348380..79a5de2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.Records; @@ -60,6 +61,25 @@ public static ApplicationStateData newInstance(long submitTime, } public static ApplicationStateData newInstance(long submitTime, + long startTime, String user, + ApplicationSubmissionContext submissionContext, RMAppState state, + String diagnostics, long finishTime, CallerContext callerContext, + Map applicationTimeouts) { + ApplicationStateData appState = + Records.newRecord(ApplicationStateData.class); + appState.setSubmitTime(submitTime); + appState.setStartTime(startTime); + appState.setUser(user); + appState.setApplicationSubmissionContext(submissionContext); + appState.setState(state); + appState.setDiagnostics(diagnostics); + appState.setFinishTime(finishTime); + appState.setCallerContext(callerContext); + appState.setApplicationTimeouts(applicationTimeouts); + return appState; + } + + public static ApplicationStateData newInstance(long submitTime, long startTime, ApplicationSubmissionContext context, String user, CallerContext callerContext) { return newInstance(submitTime, startTime, user, context, null, "", 0, @@ -168,4 +188,11 @@ public abstract void setApplicationSubmissionContext( public abstract CallerContext getCallerContext(); public abstract void setCallerContext(CallerContext callerContext); + + @Public + public abstract Map getApplicationTimeouts(); + + @Public + public abstract void setApplicationTimeouts( + Map applicationTimeouts); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java index 15ed770..d037e68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/impl/pb/ApplicationStateDataPBImpl.java @@ -18,10 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.ipc.CallerContext; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutMapProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.RMAppStateProto; @@ -38,6 +46,7 @@ boolean viaProto = false; private ApplicationSubmissionContext applicationSubmissionContext = null; + private Map applicationTimeouts = null; public ApplicationStateDataPBImpl() { builder = ApplicationStateDataProto.newBuilder(); @@ -63,6 +72,10 @@ private void mergeLocalToBuilder() { ((ApplicationSubmissionContextPBImpl)applicationSubmissionContext) .getProto()); } + + if (this.applicationTimeouts != null) { + addApplicationTimeouts(); + } } private void mergeLocalToProto() { @@ -256,4 +269,77 @@ public static RMAppStateProto convertToProtoFormat(RMAppState e) { public static RMAppState convertFromProtoFormat(RMAppStateProto e) { return RMAppState.valueOf(e.name().replace(RM_APP_PREFIX, "")); } + + @Override + public Map getApplicationTimeouts() { + initApplicationTimeout(); + return this.applicationTimeouts; + } + + private void initApplicationTimeout() { + if (this.applicationTimeouts != null) { + return; + } + ApplicationStateDataProtoOrBuilder p = viaProto ? proto : builder; + List lists = p.getApplicationTimeoutsList(); + this.applicationTimeouts = + new HashMap(lists.size()); + for (ApplicationTimeoutMapProto timeoutProto : lists) { + this.applicationTimeouts.put( + ProtoUtils + .convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()), + timeoutProto.getTimeout()); + } + } + + @Override + public void setApplicationTimeouts( + Map appTimeouts) { + if (appTimeouts == null) { + return; + } + initApplicationTimeout(); + this.applicationTimeouts.clear(); + this.applicationTimeouts.putAll(appTimeouts); + } + + private void addApplicationTimeouts() { + maybeInitBuilder(); + builder.clearApplicationTimeouts(); + if (applicationTimeouts == null) { + return; + } + Iterable values = + new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iterator = + applicationTimeouts.keySet().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ApplicationTimeoutMapProto next() { + ApplicationTimeoutType key = iterator.next(); + return ApplicationTimeoutMapProto.newBuilder() + .setTimeout(applicationTimeouts.get(key)) + .setApplicationTimeoutType( + ProtoUtils.convertToProtoFormat(key)) + .build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllApplicationTimeouts(values); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 98cbd92..4a2f542 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -35,10 +36,13 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import com.google.common.util.concurrent.SettableFuture; + /** * The interface to an Application in the ResourceManager. Take a * look at {@link RMAppImpl} for its implementation. This interface @@ -280,4 +284,11 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, String getAppNodeLabelExpression(); CallerContext getCallerContext(); + + Map getApplicationTimeouts(); + + void updateApplicationTimeout(RMAppUpdateType updateType, + Map updateTimeout, + Map currentTimeout, + SettableFuture resultFuture) throws YarnException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 0fdc311..0e7bd09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -115,12 +115,16 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.SettableFuture; @SuppressWarnings({ "rawtypes", "unchecked" }) public class RMAppImpl implements RMApp, Recoverable { private static final Log LOG = LogFactory.getLog(RMAppImpl.class); private static final String UNAVAILABLE = "N/A"; + private static final EnumSet COMPLETED_APP_STATES = + EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED, + RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING); // Immutable fields private final ApplicationId applicationId; @@ -179,6 +183,8 @@ private Map> logAggregationFailureMessagesForNMs = new HashMap>(); private final int maxLogAggregationDiagnosticsInMemory; + private Map applicationTimeouts = + new HashMap(); // These states stored are only valid when app is at killing or final_saving. private RMAppState stateBeforeKilling; @@ -897,6 +903,7 @@ public void recover(RMState state) { this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); this.callerContext = appState.getCallerContext(); + this.applicationTimeouts = appState.getApplicationTimeouts(); // If interval > 0, some attempts might have been deleted. if (this.attemptFailuresValidityInterval > 0) { this.firstAttemptIdInStateStore = appState.getFirstAttemptId(); @@ -1109,17 +1116,16 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } } - long applicationLifetime = - app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME); - if (applicationLifetime > 0) { + for (Map.Entry timeout : + app.applicationTimeouts.entrySet()) { app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, - ApplicationTimeoutType.LIFETIME, app.submitTime, - applicationLifetime * 1000); + timeout.getKey(), timeout.getValue()); if (LOG.isDebugEnabled()) { + long remainingTime = timeout.getValue() - app.systemClock.getTime(); LOG.debug("Application " + app.applicationId - + " is registered for timeout monitor, type=" - + ApplicationTimeoutType.LIFETIME + " value=" - + applicationLifetime + " seconds"); + + " is registered for timeout monitor, type=" + timeout.getKey() + + " remaining timeout=" + + (remainingTime > 0 ? remainingTime / 1000 : 0) + " seconds"); } } @@ -1235,10 +1241,17 @@ public void transition(RMAppImpl app, RMAppEvent event) { long applicationLifetime = app.getApplicationLifetime(ApplicationTimeoutType.LIFETIME); if (applicationLifetime > 0) { + // calculate next timeout value + Long newTimeout = + Long.valueOf(app.submitTime + (applicationLifetime * 1000)); app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, - ApplicationTimeoutType.LIFETIME, app.submitTime, - applicationLifetime * 1000); - LOG.debug("Application " + app.applicationId + ApplicationTimeoutType.LIFETIME, newTimeout); + + // update applicationTimeouts with new absolute value. + app.applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, + newTimeout); + + LOG.info("Application " + app.applicationId + " is registered for timeout monitor, type=" + ApplicationTimeoutType.LIFETIME + " value=" + applicationLifetime + " seconds"); @@ -1292,6 +1305,7 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event, ApplicationStateData.newInstance(this.submitTime, this.startTime, this.user, this.submissionContext, stateToBeStored, diags, this.storedFinishTime, this.callerContext); + appState.setApplicationTimeouts(this.applicationTimeouts); this.rmContext.getStateStore().updateApplicationState(appState); } @@ -1967,4 +1981,69 @@ private long getApplicationLifetime(ApplicationTimeoutType type) { } return applicationLifetime; } -} + + @Override + public Map getApplicationTimeouts() { + this.readLock.lock(); + try { + return new HashMap(this.applicationTimeouts); + } finally { + this.readLock.unlock(); + } + } + + public void updateApplicationTimeout(RMAppUpdateType updateType, + Map updateTimeout, + Map currentTimeout, + SettableFuture resultFuture) throws YarnException { + this.writeLock.lock(); + try { + if (RMAppUpdateType.ROLLBACK.equals(updateType)) { + if (!COMPLETED_APP_STATES.contains(getState())) { + // roll back to old value only if application is running + this.rmContext.getRMAppLifetimeMonitor() + .updateApplicationTimeouts(getApplicationId(), currentTimeout); + } + + this.applicationTimeouts.putAll(currentTimeout); + + // identify newly registered timeout types and unregister it as part of + // roll back. + for (ApplicationTimeoutType timeoutType : updateTimeout.keySet()) { + Long timeout = currentTimeout.get(timeoutType); + if (timeout == null) { + // newly added timeout. Unregister it from monitoring service + this.rmContext.getRMAppLifetimeMonitor() + .unregisterApp(getApplicationId(), timeoutType); + this.applicationTimeouts.remove(timeoutType); + } + } + return; + } + + if (COMPLETED_APP_STATES.contains(getState())) { + String msg = "Application is in " + + RMServerUtils.createApplicationState(getState()) + + " state can not update timeout."; + throw new YarnException(msg); + } + + // update monitoring service + this.rmContext.getRMAppLifetimeMonitor() + .updateApplicationTimeouts(getApplicationId(), updateTimeout); + this.applicationTimeouts.putAll(updateTimeout); + + // update state store with future. + ApplicationStateData appState = + ApplicationStateData.newInstance(getSubmitTime(), getStartTime(), + getApplicationSubmissionContext(), getUser(), getCallerContext()); + appState.setApplicationTimeouts(getApplicationTimeouts()); + + // update to state store + this.rmContext.getStateStore().updateApplicationState(appState, false, + resultFuture); + } finally { + this.writeLock.unlock(); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppUpdateType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppUpdateType.java new file mode 100644 index 0000000..35b0131 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppUpdateType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import org.apache.hadoop.classification.InterfaceAudience; + +@InterfaceAudience.Private +public enum RMAppUpdateType { + UPDATE, ROLLBACK +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java index e550c97..7b8f17a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java @@ -18,9 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.monitor; -import java.util.EnumSet; -import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.commons.logging.Log; @@ -33,7 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor; import org.apache.hadoop.yarn.util.SystemClock; @@ -47,12 +45,6 @@ private static final Log LOG = LogFactory.getLog(RMAppLifetimeMonitor.class); private RMContext rmContext; - private Map monitoredApps = - new HashMap(); - - private static final EnumSet COMPLETED_APP_STATES = - EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED, - RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING); public RMAppLifetimeMonitor(RMContext rmContext) { super(RMAppLifetimeMonitor.class.getName(), SystemClock.getInstance()); @@ -62,13 +54,15 @@ public RMAppLifetimeMonitor(RMContext rmContext) { @Override protected void serviceInit(Configuration conf) throws Exception { long monitorInterval = conf.getLong( - YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS, + YarnConfiguration.RM_APPLICATION_MONITOR_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS); if (monitorInterval <= 0) { monitorInterval = YarnConfiguration.DEFAULT_RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS; } setMonitorInterval(monitorInterval); + setExpireInterval(0); // No need of expire interval for App. + setResetTimeOnStart(false); // do not reset expire time on restart LOG.info("Application lifelime monitor interval set to " + monitorInterval + " ms."); super.serviceInit(conf); @@ -77,54 +71,42 @@ protected void serviceInit(Configuration conf) throws Exception { @SuppressWarnings("unchecked") @Override protected synchronized void expire(RMAppToMonitor monitoredAppKey) { - Long remove = monitoredApps.remove(monitoredAppKey); ApplicationId appId = monitoredAppKey.getApplicationId(); RMApp app = rmContext.getRMApps().get(appId); if (app == null) { return; } - // Don't trigger a KILL event if application is in completed states - if (!COMPLETED_APP_STATES.contains(app.getState())) { - String diagnostics = - "Application killed due to exceeding its lifetime period " + remove - + " milliseconds"; - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics)); - } else { - LOG.info("Application " + appId - + " is about to complete. So not killing the application."); - } + String diagnostics = + "Application killed due to exceeding its lifetime period"; + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(appId, RMAppEventType.KILL, diagnostics)); } - public synchronized void registerApp(ApplicationId appId, - ApplicationTimeoutType timeoutType, long monitorStartTime, long timeout) { + public void registerApp(ApplicationId appId, + ApplicationTimeoutType timeoutType, long expireTime) { RMAppToMonitor appToMonitor = new RMAppToMonitor(appId, timeoutType); - register(appToMonitor, monitorStartTime); - monitoredApps.putIfAbsent(appToMonitor, timeout); - } - - @Override - protected synchronized long getExpireInterval( - RMAppToMonitor monitoredAppKey) { - return monitoredApps.get(monitoredAppKey); + register(appToMonitor, expireTime); } - public synchronized void unregisterApp(ApplicationId appId, + public void unregisterApp(ApplicationId appId, ApplicationTimeoutType timeoutType) { - RMAppToMonitor appToRemove = new RMAppToMonitor(appId, timeoutType); - unregister(appToRemove); - monitoredApps.remove(appToRemove); + RMAppToMonitor remove = new RMAppToMonitor(appId, timeoutType); + unregister(remove); } - public synchronized void unregisterApp(ApplicationId appId, - Set types) { - for (ApplicationTimeoutType type : types) { - unregisterApp(appId, type); + public void unregisterApp(ApplicationId appId, + Set timeoutTypes) { + for (ApplicationTimeoutType timeoutType : timeoutTypes) { + unregisterApp(appId, timeoutType); } } - public synchronized void updateApplicationTimeouts(ApplicationId appId, + public void updateApplicationTimeouts(ApplicationId appId, Map timeouts) { - // TODO in YARN-5611 + for (Entry entry : timeouts.entrySet()) { + ApplicationTimeoutType timeoutType = entry.getKey(); + RMAppToMonitor update = new RMAppToMonitor(appId, timeoutType); + register(update, entry.getValue()); + } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index d759d47..349d185 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2196,6 +2196,7 @@ public void updateApplicationPriority(Priority newPriority, ApplicationStateData.newInstance(rmApp.getSubmitTime(), rmApp.getStartTime(), rmApp.getApplicationSubmissionContext(), rmApp.getUser(), rmApp.getCallerContext()); + appState.setApplicationTimeouts(rmApp.getApplicationTimeouts()); rmContext.getStateStore().updateApplicationStateSynchronously(appState, false); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto index 6e2398a..4693818 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto @@ -69,6 +69,7 @@ message ApplicationStateDataProto { optional string diagnostics = 6 [default = "N/A"]; optional int64 finish_time = 7; optional hadoop.common.RPCCallerContextProto caller_context = 8; + repeated ApplicationTimeoutMapProto application_timeouts = 9; } message ApplicationAttemptStateDataProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index 19ee0b1..012131d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -45,11 +46,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppUpdateType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.util.Records; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.SettableFuture; @InterfaceAudience.Private public abstract class MockAsm extends MockApps { @@ -229,6 +232,19 @@ public String getAppNodeLabelExpression() { public CallerContext getCallerContext() { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Map getApplicationTimeouts() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void updateApplicationTimeout(RMAppUpdateType updateType, + Map updateTimeout, + Map currentTimeout, + SettableFuture resultFuture) { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 62a5c52..07dd257 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -43,6 +44,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import com.google.common.util.concurrent.SettableFuture; + public class MockRMApp implements RMApp { static final int DT = 1000000; // ms @@ -315,4 +318,17 @@ public void removeCollectorAddr() { public void setCollectorAddr(String collectorAddr) { throw new UnsupportedOperationException("Not supported yet."); } + + @Override + public Map getApplicationTimeouts() { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public void updateApplicationTimeout(RMAppUpdateType updateType, + Map updateTimeout, + Map currentTimeout, + SettableFuture resultFuture) { + throw new UnsupportedOperationException("Not supported yet."); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java index 3f2db1d..390f94c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.util.Times; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -64,7 +66,7 @@ public void setup() throws IOException { conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); - conf.setLong(YarnConfiguration.RM_APPLICATION_LIFETIME_MONITOR_INTERVAL_MS, + conf.setLong(YarnConfiguration.RM_APPLICATION_MONITOR_INTERVAL_MS, 3000L); } @@ -81,20 +83,57 @@ public void testApplicationLifetimeMonitor() throws Exception { new HashMap(); timeouts.put(ApplicationTimeoutType.LIFETIME, 10L); RMApp app1 = rm.submitApp(1024, appPriority, timeouts); + + // 20L seconds + timeouts.put(ApplicationTimeoutType.LIFETIME, 20L); + RMApp app2 = rm.submitApp(1024, appPriority, timeouts); + nm1.nodeHeartbeat(true); // Send launch Event MockAM am1 = rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId()); am1.registerAppAttempt(); rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); - Assert.assertTrue("Applicaiton killed before lifetime value", + Assert.assertTrue("Application killed before lifetime value", (System.currentTimeMillis() - app1.getSubmitTime()) > 10000); + + Map updateTimeout = + new HashMap(); + long newLifetime = 10L; + // update 10L seconds more to timeout i.e 30L seconds overall + updateTimeout.put(ApplicationTimeoutType.LIFETIME, + Times.formatISO8601(System.currentTimeMillis() + newLifetime * 1000)); + UpdateApplicationTimeoutsRequest request = + UpdateApplicationTimeoutsRequest.newInstance(app2.getApplicationId(), + updateTimeout); + + Map applicationTimeouts = + app2.getApplicationTimeouts(); + // has old timeout time + long beforeUpdate = + applicationTimeouts.get(ApplicationTimeoutType.LIFETIME); + + // update app2 lifetime to new time i.e now + timeout + rm.getRMContext().getClientRMService().updateApplicationTimeouts(request); + + applicationTimeouts = + app2.getApplicationTimeouts(); + long afterUpdate = + applicationTimeouts.get(ApplicationTimeoutType.LIFETIME); + + Assert.assertTrue("Application lifetime value not updated", + afterUpdate > beforeUpdate); + + rm.waitForState(app2.getApplicationId(), RMAppState.KILLED); + // verify for app killed with updated lifetime + Assert.assertTrue("Application killed before lifetime value", + app2.getFinishTime() > afterUpdate); + } finally { stopRM(rm); } } - @SuppressWarnings("rawtypes") @Test(timeout = 180000) public void testApplicationLifetimeOnRMRestart() throws Exception { MemoryRMStateStore memStore = new MemoryRMStateStore(); @@ -115,6 +154,12 @@ public void testApplicationLifetimeOnRMRestart() throws Exception { // Re-start RM MockRM rm2 = new MockRM(conf, memStore); + + // make sure app has been unregistered with old RM else both will trigger + // Expire event + rm1.getRMContext().getRMAppLifetimeMonitor().unregisterApp( + app1.getApplicationId(), ApplicationTimeoutType.LIFETIME); + rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); @@ -152,9 +197,9 @@ public void testApplicationLifetimeOnRMRestart() throws Exception { // wait for app life time and application to be in killed state. rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.KILLED); - Assert.assertTrue("Applicaiton killed before lifetime value", - (System.currentTimeMillis() - - recoveredApp1.getSubmitTime()) > appLifetime); + Assert.assertTrue("Application killed before lifetime value", + recoveredApp1.getFinishTime() > (recoveredApp1.getSubmitTime() + + appLifetime * 1000)); } private void stopRM(MockRM rm) { -- 2.7.4 (Apple Git-66)