From 8219a69672238d57906d6ee65892d060638355b5 Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Fri, 14 Oct 2016 19:22:58 +0530 Subject: [PATCH] YARN-5611 --- .../apache/hadoop/mapred/TestClientRedirect.java | 9 + .../hadoop/yarn/api/ApplicationClientProtocol.java | 20 ++ .../UpdateApplicationTimeoutsRequest.java | 73 +++++++ .../UpdateApplicationTimeoutsResponse.java | 34 ++++ .../main/proto/applicationclient_protocol.proto | 1 + .../src/main/proto/yarn_service_protos.proto | 8 + .../ApplicationClientProtocolPBClientImpl.java | 21 +- .../ApplicationClientProtocolPBServiceImpl.java | 22 +++ .../pb/UpdateApplicationTimeoutsRequestPBImpl.java | 218 +++++++++++++++++++++ .../UpdateApplicationTimeoutsResponsePBImpl.java | 73 +++++++ .../yarn/util/AbstractLivelinessMonitor.java | 4 + .../amrmproxy/MockResourceManagerFacade.java | 9 + .../server/resourcemanager/ClientRMService.java | 139 ++++++++++--- .../yarn/server/resourcemanager/RMAuditLogger.java | 4 +- .../yarn/server/resourcemanager/RMServerUtils.java | 2 +- .../server/resourcemanager/rmapp/RMAppImpl.java | 2 +- .../rmapp/monitor/RMAppLifetimeMonitor.java | 86 +++++++- .../rmapp/TestApplicationLifetimeMonitor.java | 40 +++- 18 files changed, 720 insertions(+), 45 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java index 255f998..65eac65 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java @@ -124,6 +124,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -485,6 +487,13 @@ public SignalContainerResponse signalToContainer( SignalContainerRequest request) throws IOException { return null; } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + return null; + } } class HistoryService extends AMService implements HSClientProtocol { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java index 8ee43fb..883aed1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/ApplicationClientProtocol.java @@ -59,6 +59,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -566,4 +568,22 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( SignalContainerResponse signalToContainer( SignalContainerRequest request) throws YarnException, IOException; + + /** + *

+ * The interface used by client to set ApplicationTimeouts of an application. + *

+ * Note : It is a blocking call + * + * @param request to set ApplicationTimeouts of an application + * @return an empty response + * @throws YarnException + * @throws IOException + */ + @Public + @Unstable + @Idempotent + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java new file mode 100644 index 0000000..64f8ef8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsRequest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Unstable +public abstract class UpdateApplicationTimeoutsRequest { + public static UpdateApplicationTimeoutsRequest newInstance( + ApplicationId applicationId, + Map applicationTimeouts) { + UpdateApplicationTimeoutsRequest request = + Records.newRecord(UpdateApplicationTimeoutsRequest.class); + request.setApplicationId(applicationId); + request.setApplicationTimeouts(applicationTimeouts); + return request; + } + + /** + * Get the ApplicationId of the application. + * + * @return ApplicationId of the application + */ + public abstract ApplicationId getApplicationId(); + + /** + * Set the ApplicationId of the application. + * + * @param applicationId ApplicationId of the application + */ + public abstract void setApplicationId(ApplicationId applicationId); + + /** + * Get ApplicationTimeouts of the application. Timeout value is + * in seconds. + * + * @return all ApplicationTimeouts of the application. + */ + public abstract Map getApplicationTimeouts(); + + /** + * Set the ApplicationTimeouts for the application in seconds. + * All pre-existing Map entries are cleared before adding the new Map. + * + * @param applicationTimeouts ApplicationTimeoutss for the + * application + */ + public abstract void setApplicationTimeouts( + Map applicationTimeouts); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java new file mode 100644 index 0000000..3217569 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/UpdateApplicationTimeoutsResponse.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +@Public +@Unstable +public abstract class UpdateApplicationTimeoutsResponse { + + public static UpdateApplicationTimeoutsResponse newInstance() { + UpdateApplicationTimeoutsResponse response = + Records.newRecord(UpdateApplicationTimeoutsResponse.class); + return response; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto index f1c3839..ba79db0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/applicationclient_protocol.proto @@ -60,4 +60,5 @@ service ApplicationClientProtocolService { rpc getClusterNodeLabels (GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto); rpc updateApplicationPriority (UpdateApplicationPriorityRequestProto) returns (UpdateApplicationPriorityResponseProto); rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto); + rpc updateApplicationTimeouts (UpdateApplicationTimeoutsRequestProto) returns (UpdateApplicationTimeoutsResponseProto); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 6526bf9..86b30c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -267,6 +267,14 @@ message SignalContainerRequestProto { message SignalContainerResponseProto { } +message UpdateApplicationTimeoutsRequestProto { + required ApplicationIdProto applicationId = 1; + repeated ApplicationTimeoutMapProto application_timeouts = 2; +} + +message UpdateApplicationTimeoutsResponseProto { +} + ////////////////////////////////////////////////////// /////// client_NM_Protocol /////////////////////////// ////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java index 2d755a2..ad7cb29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ApplicationClientProtocolPBClientImpl.java @@ -83,6 +83,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; @@ -139,6 +141,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReservationUpdateResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; @@ -165,7 +169,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationSubmissionRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.ReservationUpdateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import com.google.protobuf.ServiceException; @@ -600,4 +604,19 @@ public SignalContainerResponse signalToContainer( return null; } } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + UpdateApplicationTimeoutsRequestProto requestProto = + ((UpdateApplicationTimeoutsRequestPBImpl) request).getProto(); + try { + return new UpdateApplicationTimeoutsResponsePBImpl( + proxy.updateApplicationTimeouts(null, requestProto)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java index 300ef57..93ce6a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/ApplicationClientProtocolPBServiceImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl; @@ -111,6 +112,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationPriorityResponsePBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.UpdateApplicationTimeoutsResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -162,6 +165,8 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationPriorityResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationResponseProto; @@ -609,4 +614,21 @@ public SignalContainerResponseProto signalToContainer( throw new ServiceException(e); } } + + @Override + public UpdateApplicationTimeoutsResponseProto updateApplicationTimeouts( + RpcController controller, UpdateApplicationTimeoutsRequestProto proto) + throws ServiceException { + UpdateApplicationTimeoutsRequestPBImpl request = + new UpdateApplicationTimeoutsRequestPBImpl(proto); + try { + UpdateApplicationTimeoutsResponse response = + real.updateApplicationTimeouts(request); + return ((UpdateApplicationTimeoutsResponsePBImpl) response).getProto(); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java new file mode 100644 index 0000000..ad75aa1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsRequestPBImpl.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutMapProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsRequestProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class UpdateApplicationTimeoutsRequestPBImpl + extends UpdateApplicationTimeoutsRequest { + + UpdateApplicationTimeoutsRequestProto proto = + UpdateApplicationTimeoutsRequestProto.getDefaultInstance(); + UpdateApplicationTimeoutsRequestProto.Builder builder = null; + boolean viaProto = false; + + private ApplicationId applicationId = null; + private Map applicationTimeouts = null; + + public UpdateApplicationTimeoutsRequestPBImpl() { + builder = UpdateApplicationTimeoutsRequestProto.newBuilder(); + } + + public UpdateApplicationTimeoutsRequestPBImpl( + UpdateApplicationTimeoutsRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public UpdateApplicationTimeoutsRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UpdateApplicationTimeoutsRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.applicationId != null) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + if (this.applicationTimeouts != null) { + addApplicationTimeouts(); + } + } + + @Override + public ApplicationId getApplicationId() { + UpdateApplicationTimeoutsRequestProtoOrBuilder p = + viaProto ? proto : builder; + if (this.applicationId != null) { + return applicationId; + } // Else via proto + if (!p.hasApplicationId()) { + return null; + } + applicationId = convertFromProtoFormat(p.getApplicationId()); + return applicationId; + } + + @Override + public void setApplicationId(ApplicationId applicationId) { + maybeInitBuilder(); + if (applicationId == null) { + builder.clearApplicationId(); + } + this.applicationId = applicationId; + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public Map getApplicationTimeouts() { + initApplicationTimeout(); + return this.applicationTimeouts; + } + + private void initApplicationTimeout() { + if (this.applicationTimeouts != null) { + return; + } + UpdateApplicationTimeoutsRequestProtoOrBuilder p = + viaProto ? proto : builder; + List lists = p.getApplicationTimeoutsList(); + this.applicationTimeouts = + new HashMap(lists.size()); + for (ApplicationTimeoutMapProto timeoutProto : lists) { + this.applicationTimeouts.put( + ProtoUtils + .convertFromProtoFormat(timeoutProto.getApplicationTimeoutType()), + timeoutProto.getTimeout()); + } + } + + @Override + public void setApplicationTimeouts( + Map appTimeouts) { + if (appTimeouts == null) { + return; + } + initApplicationTimeout(); + this.applicationTimeouts.clear(); + this.applicationTimeouts.putAll(appTimeouts); + } + + private void addApplicationTimeouts() { + maybeInitBuilder(); + builder.clearApplicationTimeouts(); + if (applicationTimeouts == null) { + return; + } + Iterable values = + new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iterator = + applicationTimeouts.keySet().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public ApplicationTimeoutMapProto next() { + ApplicationTimeoutType key = iterator.next(); + return ApplicationTimeoutMapProto.newBuilder() + .setTimeout(applicationTimeouts.get(key)) + .setApplicationTimeoutType( + ProtoUtils.convertToProtoFormat(key)) + .build(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllApplicationTimeouts(values); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java new file mode 100644 index 0000000..9c2b157 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/UpdateApplicationTimeoutsResponsePBImpl.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateApplicationTimeoutsResponseProto; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class UpdateApplicationTimeoutsResponsePBImpl + extends UpdateApplicationTimeoutsResponse { + UpdateApplicationTimeoutsResponseProto proto = + UpdateApplicationTimeoutsResponseProto.getDefaultInstance(); + UpdateApplicationTimeoutsResponseProto.Builder builder = null; + boolean viaProto = false; + + public UpdateApplicationTimeoutsResponsePBImpl() { + builder = UpdateApplicationTimeoutsResponseProto.newBuilder(); + } + + public UpdateApplicationTimeoutsResponsePBImpl( + UpdateApplicationTimeoutsResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public UpdateApplicationTimeoutsResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java index b605026..e4993fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/AbstractLivelinessMonitor.java @@ -113,6 +113,10 @@ public synchronized void unregister(O ob) { running.remove(ob); } + protected synchronized Long getMonitorStartTime(O ob) { + return running.get(ob); + } + public synchronized void resetTimer() { long time = clock.getTime(); for (O ob : running.keySet()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java index 2ccf827..b530fb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -91,6 +91,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.AMCommand; @@ -496,4 +498,11 @@ public FailApplicationAttemptResponse failApplicationAttempt( FailApplicationAttemptRequest request) throws YarnException, IOException { throw new NotImplementedException(); } + + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + throw new NotImplementedException(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index e9bd230..cdd2e2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -108,12 +108,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest; import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -140,6 +143,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInputValidator; @@ -1589,37 +1593,11 @@ public UpdateApplicationPriorityResponse updateApplicationPriority( ApplicationId applicationId = request.getApplicationId(); Priority newAppPriority = request.getApplicationPriority(); - UserGroupInformation callerUGI; - try { - callerUGI = UserGroupInformation.getCurrentUser(); - } catch (IOException ie) { - LOG.info("Error getting UGI ", ie); - RMAuditLogger.logFailure("UNKNOWN", AuditConstants.UPDATE_APP_PRIORITY, - "UNKNOWN", "ClientRMService", "Error getting UGI", applicationId); - throw RPCUtil.getRemoteException(ie); - } - RMApp application = this.rmContext.getRMApps().get(applicationId); - if (application == null) { - RMAuditLogger.logFailure(callerUGI.getUserName(), - AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", - "Trying to update priority of an absent application", applicationId); - throw new ApplicationNotFoundException( - "Trying to update priority of an absent application " - + applicationId); - } - - if (!checkAccess(callerUGI, application.getUser(), - ApplicationAccessType.MODIFY_APP, application)) { - RMAuditLogger.logFailure(callerUGI.getShortUserName(), - AuditConstants.UPDATE_APP_PRIORITY, - "User doesn't have permissions to " - + ApplicationAccessType.MODIFY_APP.toString(), "ClientRMService", - AuditConstants.UNAUTHORIZED_USER, applicationId); - throw RPCUtil.getRemoteException(new AccessControlException("User " - + callerUGI.getShortUserName() + " cannot perform operation " - + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); - } + UserGroupInformation callerUGI = + getCallerUgi(applicationId, AuditConstants.UPDATE_APP_PRIORITY); + RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, + AuditConstants.UPDATE_APP_PRIORITY); UpdateApplicationPriorityResponse response = recordFactory .newRecordInstance(UpdateApplicationPriorityResponse.class); @@ -1724,4 +1702,105 @@ public SignalContainerResponse signalToContainer( .newRecordInstance(SignalContainerResponse.class); } + @Override + public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( + UpdateApplicationTimeoutsRequest request) + throws YarnException, IOException { + + ApplicationId applicationId = request.getApplicationId(); + Map applicationTimeouts = + request.getApplicationTimeouts(); + + UserGroupInformation callerUGI = + getCallerUgi(applicationId, AuditConstants.UPDATE_APP_TIMEOUTS); + RMApp application = verifyUserAccessForRMApp(applicationId, callerUGI, + AuditConstants.UPDATE_APP_TIMEOUTS); + + + if (applicationTimeouts.isEmpty()) { + String message = + "At least one ApplicationTimeoutType should be configured for updating timeouts."; + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "UNKNOWN", "ClientRMService", + message, applicationId); + throw RPCUtil.getRemoteException(message); + } + + RMServerUtils.validateApplicationTimeouts(applicationTimeouts); + + UpdateApplicationTimeoutsResponse response = recordFactory + .newRecordInstance(UpdateApplicationTimeoutsResponse.class); + + if (EnumSet.of(RMAppState.NEW, RMAppState.NEW_SAVING) + .contains(application.getState())) { + if (COMPLETED_APP_STATES.contains(application.getState())) { + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", + applicationId); + return response; + } + String msg = "Application in " + application.getState() + + " state cannot update timeout."; + RMAuditLogger.logFailure(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_PRIORITY, "UNKNOWN", "ClientRMService", + msg); + throw new YarnException(msg); + } + + // update in monitor first to reflect immediately + rmContext.getRMAppLifetimeMonitor().updateApplicationTimeouts(applicationId, + applicationTimeouts); + + // Update to state store + ApplicationStateData appState = ApplicationStateData.newInstance( + application.getSubmitTime(), application.getStartTime(), + application.getApplicationSubmissionContext(), application.getUser(), + application.getCallerContext()); + rmContext.getStateStore().updateApplicationStateSynchronously(appState, + false); + + RMAuditLogger.logSuccess(callerUGI.getShortUserName(), + AuditConstants.UPDATE_APP_TIMEOUTS, "ClientRMService", applicationId); + return response; + } + + private UserGroupInformation getCallerUgi(ApplicationId applicationId, + String operation) throws YarnException { + UserGroupInformation callerUGI; + try { + callerUGI = UserGroupInformation.getCurrentUser(); + } catch (IOException ie) { + LOG.info("Error getting UGI ", ie); + RMAuditLogger.logFailure("UNKNOWN", operation, "UNKNOWN", + "ClientRMService", "Error getting UGI", applicationId); + throw RPCUtil.getRemoteException(ie); + } + return callerUGI; + } + + private RMApp verifyUserAccessForRMApp(ApplicationId applicationId, + UserGroupInformation callerUGI, String operation) throws YarnException { + RMApp application = this.rmContext.getRMApps().get(applicationId); + if (application == null) { + RMAuditLogger.logFailure(callerUGI.getUserName(), operation, "UNKNOWN", + "ClientRMService", + "Trying to " + operation + " of an absent application", + applicationId); + throw new ApplicationNotFoundException("Trying to " + operation + + " of an absent application " + applicationId); + } + + if (!checkAccess(callerUGI, application.getUser(), + ApplicationAccessType.MODIFY_APP, application)) { + RMAuditLogger.logFailure(callerUGI.getShortUserName(), operation, + "User doesn't have permissions to " + + ApplicationAccessType.MODIFY_APP.toString(), + "ClientRMService", AuditConstants.UNAUTHORIZED_USER, applicationId); + throw RPCUtil.getRemoteException(new AccessControlException("User " + + callerUGI.getShortUserName() + " cannot perform operation " + + ApplicationAccessType.MODIFY_APP.name() + " on " + applicationId)); + } + return application; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index 0361059..d52e002 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -63,7 +63,9 @@ public static final String ALLOC_CONTAINER = "AM Allocated Container"; public static final String RELEASE_CONTAINER = "AM Released Container"; public static final String UPDATE_APP_PRIORITY = - "Update Application Priority Request"; + "Update Application Priority"; + public static final String UPDATE_APP_TIMEOUTS = + "Update Application Timeouts"; public static final String CHANGE_CONTAINER_RESOURCE = "AM Changed Container Resource"; public static final String SIGNAL_CONTAINER = "Signal Container Request"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index b2a085a..75c2dc4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -475,7 +475,7 @@ public static void validateApplicationTimeouts( if (timeouts != null) { for (Map.Entry timeout : timeouts .entrySet()) { - if (timeout.getValue() < 0) { + if (timeout.getValue() <= 0) { String message = "Invalid application timeout, value=" + timeout.getValue() + " for type=" + timeout.getKey(); throw new YarnException(message); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 0fdc311..40fe4d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1173,7 +1173,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { app.submissionContext.getApplicationTimeouts(); if (timeouts != null && timeouts.size() > 0) { app.rmContext.getRMAppLifetimeMonitor() - .unregisterApp(app.getApplicationId(), timeouts.keySet()); + .unregisterApp(app.getApplicationId(), timeouts.keySet(), true); } if (app.transitionTodo instanceof SingleArcTransition) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java index e550c97..c68de7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/monitor/RMAppLifetimeMonitor.java @@ -21,6 +21,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.commons.logging.Log; @@ -45,11 +46,16 @@ extends AbstractLivelinessMonitor { private static final Log LOG = LogFactory.getLog(RMAppLifetimeMonitor.class); + private static final int MS = 1000; private RMContext rmContext; private Map monitoredApps = new HashMap(); + private Map> + appIdToTimeoutTypeMapping = + new HashMap>(); + private static final EnumSet COMPLETED_APP_STATES = EnumSet.of(RMAppState.FINISHED, RMAppState.FINISHING, RMAppState.FAILED, RMAppState.KILLED, RMAppState.FINAL_SAVING, RMAppState.KILLING); @@ -97,10 +103,19 @@ protected synchronized void expire(RMAppToMonitor monitoredAppKey) { } public synchronized void registerApp(ApplicationId appId, - ApplicationTimeoutType timeoutType, long monitorStartTime, long timeout) { + ApplicationTimeoutType timeoutType, long monitorStartTime, + long timeout) { + Map timeoutTypeMapping = + appIdToTimeoutTypeMapping.get(appId); + if (timeoutTypeMapping == null) { + timeoutTypeMapping = + new HashMap(); + appIdToTimeoutTypeMapping.put(appId, timeoutTypeMapping); + } RMAppToMonitor appToMonitor = new RMAppToMonitor(appId, timeoutType); register(appToMonitor, monitorStartTime); - monitoredApps.putIfAbsent(appToMonitor, timeout); + monitoredApps.put(appToMonitor, timeout); + timeoutTypeMapping.put(timeoutType, appToMonitor); } @Override @@ -111,20 +126,73 @@ protected synchronized long getExpireInterval( public synchronized void unregisterApp(ApplicationId appId, ApplicationTimeoutType timeoutType) { - RMAppToMonitor appToRemove = new RMAppToMonitor(appId, timeoutType); - unregister(appToRemove); - monitoredApps.remove(appToRemove); + Map timeoutTypeMapping = + appIdToTimeoutTypeMapping.get(appId); + RMAppToMonitor remove = timeoutTypeMapping.remove(timeoutType); + unregister(remove); + monitoredApps.remove(remove); } public synchronized void unregisterApp(ApplicationId appId, - Set types) { - for (ApplicationTimeoutType type : types) { - unregisterApp(appId, type); + Set timeoutTypes, boolean isStopMonitoringApp) { + + Map timeoutTypeMapping = + appIdToTimeoutTypeMapping.get(appId); + for (ApplicationTimeoutType timeoutType : timeoutTypes) { + RMAppToMonitor remove = timeoutTypeMapping.remove(timeoutType); + unregister(remove); + monitoredApps.remove(remove); + } + + // isApp flag for removing mapping from appIdToTimeoutTypeMapping + if (isStopMonitoringApp) { + appIdToTimeoutTypeMapping.remove(appId); } } + // If app not registered then register the app from now public synchronized void updateApplicationTimeouts(ApplicationId appId, Map timeouts) { - // TODO in YARN-5611 + + // update application submission context + RMApp app = rmContext.getRMApps().get(appId); + if (app == null || app.isAppFinalStateStored()) { + return; // race can happen when waiting for getting lock. + } + + Map newTimeouts = + new HashMap(); + + Map timeoutTypeMapping = + appIdToTimeoutTypeMapping.get(appId); + if (timeoutTypeMapping == null) { + timeoutTypeMapping = + new HashMap(); + appIdToTimeoutTypeMapping.put(appId, timeoutTypeMapping); + } + + for (Entry entry : timeouts.entrySet()) { + ApplicationTimeoutType timeoutType = entry.getKey(); + RMAppToMonitor rmAppToMonitor = timeoutTypeMapping.get(timeoutType); + long newTimeout = entry.getValue(); + if (rmAppToMonitor == null) { + // If application is not registered earlier, register and start + // monitoring from now + RMAppToMonitor appToMonitor = new RMAppToMonitor(appId, timeoutType); + register(appToMonitor); + monitoredApps.put(appToMonitor, newTimeout * MS); + timeoutTypeMapping.put(timeoutType, appToMonitor); + } else { + // Already registered app, just update time out. + Long appTimeout = monitoredApps.get(rmAppToMonitor); + newTimeout = newTimeout + (appTimeout / MS); + monitoredApps.put(rmAppToMonitor, newTimeout * MS); + } + newTimeouts.put(timeoutType, newTimeout); + } + + // update application submission context here itself + app.getApplicationSubmissionContext().setApplicationTimeouts(newTimeouts); } + } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java index 3f2db1d..6e729ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestApplicationLifetimeMonitor.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest; import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -81,14 +82,49 @@ public void testApplicationLifetimeMonitor() throws Exception { new HashMap(); timeouts.put(ApplicationTimeoutType.LIFETIME, 10L); RMApp app1 = rm.submitApp(1024, appPriority, timeouts); + + // 20L seconds + timeouts.put(ApplicationTimeoutType.LIFETIME, 20L); + RMApp app2 = rm.submitApp(1024, appPriority, timeouts); + nm1.nodeHeartbeat(true); // Send launch Event MockAM am1 = rm.sendAMLaunched(app1.getCurrentAppAttempt().getAppAttemptId()); am1.registerAppAttempt(); rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); - Assert.assertTrue("Applicaiton killed before lifetime value", + Assert.assertTrue("Application killed before lifetime value", (System.currentTimeMillis() - app1.getSubmitTime()) > 10000); + + long newLifetime = 10L; + // update 10L seconds more to timeout i.e 30L seconds overall + timeouts.put(ApplicationTimeoutType.LIFETIME, newLifetime); + UpdateApplicationTimeoutsRequest request = + UpdateApplicationTimeoutsRequest.newInstance(app2.getApplicationId(), + timeouts); + + Map applicationTimeouts = + app2.getApplicationSubmissionContext().getApplicationTimeouts(); + Long beforeUpdate = + applicationTimeouts.get(ApplicationTimeoutType.LIFETIME); + + // update app2 lifetime + rm.getRMContext().getClientRMService().updateApplicationTimeouts(request); + + applicationTimeouts = + app2.getApplicationSubmissionContext().getApplicationTimeouts(); + Long afterUpdate = + applicationTimeouts.get(ApplicationTimeoutType.LIFETIME); + + Assert.assertEquals("Application lifetime value not updated", + beforeUpdate.longValue() + newLifetime, afterUpdate.longValue()); + + rm.waitForState(app2.getApplicationId(), RMAppState.KILLED); + // verify for app killed with updated lifetime + Assert.assertTrue("Application killed before lifetime value", + (System.currentTimeMillis() - app2.getSubmitTime()) > afterUpdate + * 1000); + } finally { stopRM(rm); } @@ -152,7 +188,7 @@ public void testApplicationLifetimeOnRMRestart() throws Exception { // wait for app life time and application to be in killed state. rm2.waitForState(recoveredApp1.getApplicationId(), RMAppState.KILLED); - Assert.assertTrue("Applicaiton killed before lifetime value", + Assert.assertTrue("Application killed before lifetime value", (System.currentTimeMillis() - recoveredApp1.getSubmitTime()) > appLifetime); } -- 2.7.4 (Apple Git-66)