diff --git hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd index 4c36307..baa4011 100644 --- hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd +++ hadoop-yarn-project/hadoop-yarn/bin/yarn.cmd @@ -149,7 +149,7 @@ if "%1" == "--loglevel" ( ) ) - set yarncommands=resourcemanager nodemanager proxyserver rmadmin version jar ^ + set yarncommands=resourcemanager nodemanager proxyserver rmadmin landlord version jar ^ application applicationattempt container node queue logs daemonlog historyserver ^ timelineserver timelinereader classpath for %%i in ( %yarncommands% ) do ( @@ -180,6 +180,11 @@ goto :eof set YARN_OPTS=%YARN_OPTS% %YARN_CLIENT_OPTS% goto :eof +:landlord + set CLASS=org.apache.hadoop.yarn.client.leasereclaim.LeaseReclaimCLI + set YARN_OPTS=%YARN_OPTS% %YARN_CLIENT_OPTS% + goto :eof + :application set CLASS=org.apache.hadoop.yarn.client.cli.ApplicationCLI set YARN_OPTS=%YARN_OPTS% %YARN_CLIENT_OPTS% @@ -319,6 +324,7 @@ goto :eof @echo timelineserver run the timeline server @echo timelinereader run the timeline reader server @echo rmadmin admin tools + @echo landlord landlord tools @echo version print the version @echo jar ^ run a jar file @echo application prints application(s) report/kill application diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml index 52e276b..bdc360e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/pom.xml @@ -120,6 +120,7 @@ applicationmaster_protocol.proto applicationclient_protocol.proto containermanagement_protocol.proto + leasereclaim.proto server/yarn_server_resourcemanager_service_protos.proto server/resourcemanager_administration_protocol.proto application_history_client.proto diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LeaseNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LeaseNode.java new file mode 100644 index 0000000..758fd31 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LeaseNode.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.LeaseNodeProto; +import org.apache.hadoop.yarn.util.Records; + +/** + * lease status for a machine + * Note: endtime is UTC time, and unit is milliseconds + */ +@Private +@Unstable +public abstract class LeaseNode { + /** + * Default values for lease node properties + */ + private static final long DELTA_LEASE_DURATION = 30000; // in millisecond + public static final Resource NONE_LEASED = Resource.newInstance(0, 0); + public static final long RESOURCE_NEVER_TIMEOUT = Long.MAX_VALUE; + + public static LeaseNode newInstance(String landlord) { + return newInstance(landlord, NONE_LEASED); + } + + public static LeaseNode newInstance(String landlord, Resource resource) { + return newInstance(landlord, resource, RESOURCE_NEVER_TIMEOUT); + } + + public static LeaseNode newInstance(String landlord, Resource resource, long endTime) { + LeaseNode info = Records.newRecord(LeaseNode.class); + info.setLandlord(landlord); + info.setResource(resource); + info.setEndTime(endTime); + return info; + } + + public abstract LeaseNodeProto getProto(); + + public abstract String getLandlord(); + + public abstract void setLandlord(String landlord); + + public abstract Resource getResource(); + + public abstract void setResource(Resource resource); + + public abstract long getEndTime(); + + public abstract void setEndTime(long endTime); + + /* not reclaimed && not timeout */ + public boolean isLeased() { + return !(isReclaimed() || isTimeout()); + } + + public boolean isReclaimed() { + Resource resource = getResource(); + if (resource != null && resource.equals(NONE_LEASED)) { + return true; + } + return false; + } + + public boolean isTimeout() { + return isTimeout(getEndTime()); + } + + public static boolean isTimeout(long endTime) { + return endTime < System.currentTimeMillis() + DELTA_LEASE_DURATION; + } + + public Resource getValidResource() { + if (isTimeout()) { + return NONE_LEASED; + } + return getResource(); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Landlord=").append(getLandlord()); + sb.append(", Resource=").append(getResource() == null ? null : getResource().toString()); + sb.append(", EndTime=").append(getEndTime()); + return sb.toString(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LeaseRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LeaseRequest.java new file mode 100644 index 0000000..07bc394 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LeaseRequest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.LeaseRequestProto; +import org.apache.hadoop.yarn.util.Records; + +/** + * a lease/reclaim request (including debug info and different types of requests), + * sent from Landlord (client), + * will be handled by LeaseManager (server) + */ +@Private +@Unstable +public abstract class LeaseRequest { + public static LeaseRequest newInstance(UpdateResourceRequest updateResource, UpdateIDRequest updateID, String note) { + OperationClientInfo clientDebug = OperationClientInfo.newInstance(note); + return newInstance(clientDebug, updateResource, updateID); + } + + public static LeaseRequest newInstance(OperationClientInfo clientDebug, UpdateResourceRequest updateResource, UpdateIDRequest updateID) { + LeaseRequest info = Records.newRecord(LeaseRequest.class); + info.setClientDebugInfo(clientDebug); + info.setUpdateResource(updateResource); + info.setUpdateID(updateID); + return info; + } + + public abstract LeaseRequestProto getProto(); + + public abstract OperationClientInfo getClientDebugInfo(); + + public abstract void setClientDebugInfo(OperationClientInfo clientDebugInfo); + + public abstract UpdateResourceRequest getUpdateResource(); + + public abstract void setUpdateResource(UpdateResourceRequest request); + + public abstract UpdateIDRequest getUpdateID(); + + public abstract void setUpdateID(UpdateIDRequest request); + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("ClientDebugInfo=").append( + getClientDebugInfo() == null ? null : getClientDebugInfo().toString()); + sb.append(", UpdateResourceRequest=").append( + getUpdateResource() == null ? null : getUpdateResource().toString()); + sb.append("UpdateIDRequest=").append( + getUpdateID() == null ? null : getUpdateID().toString()); + return sb.toString(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MachineLeaseID.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MachineLeaseID.java new file mode 100644 index 0000000..71e3167 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MachineLeaseID.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.MachineLeaseIDProto; +import org.apache.hadoop.yarn.util.Records; + +/** + * not used now + */ +@Private +@Unstable +public abstract class MachineLeaseID { + public static MachineLeaseID newInstance(String machine, String leaseID) { + MachineLeaseID info = Records.newRecord(MachineLeaseID.class); + info.setMachine(machine); + info.setLeaseID(leaseID); + return info; + } + + public abstract MachineLeaseIDProto getProto(); + + public abstract String getMachine(); + + public abstract void setMachine(String machine); + + public abstract String getLeaseID(); + + public abstract void setLeaseID(String leaseID); + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Machine=").append(getMachine()); + sb.append(", LeaseID=").append(getLeaseID()); + return sb.toString(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MachineLeaseInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MachineLeaseInfo.java new file mode 100644 index 0000000..bb5c050 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MachineLeaseInfo.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.MachineLeaseInfoProto; +import org.apache.hadoop.yarn.util.Records; + +/** + * a machine and its lease status + */ +@Private +@Unstable +public abstract class MachineLeaseInfo { + public static MachineLeaseInfo newInstance(String machine, LeaseNode status) { + MachineLeaseInfo info = Records.newRecord(MachineLeaseInfo.class); + info.setMachine(machine); + info.setStatus(status); + return info; + } + + public abstract MachineLeaseInfoProto getProto(); + + public abstract String getMachine(); + + public abstract void setMachine(String machine); + + public abstract LeaseNode getStatus(); + + public abstract void setStatus(LeaseNode status); + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Machine=").append(getMachine()); + sb.append(", Status=").append(getStatus()==null ? null : getStatus().toString()); + return sb.toString(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MachineLeaseInfoMap.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MachineLeaseInfoMap.java new file mode 100644 index 0000000..e181e08 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/MachineLeaseInfoMap.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.MachineLeaseInfoMapProto; +import org.apache.hadoop.yarn.util.Records; + +import java.util.Map; + +/** + * a map of machine and lease status, persist in lease reclaim store + */ +@Private +@Unstable +public abstract class MachineLeaseInfoMap { + public static MachineLeaseInfoMap newInstance(Map status) { + MachineLeaseInfoMap info = Records.newRecord(MachineLeaseInfoMap.class); + info.setNodeToStatus(status); + return info; + } + + public abstract MachineLeaseInfoMapProto getProto(); + + public abstract Map getNodeToStatus(); + + public abstract void setNodeToStatus(Map status); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/OperationClientInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/OperationClientInfo.java new file mode 100644 index 0000000..467861b --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/OperationClientInfo.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.OperationClientInfoProto; +import org.apache.hadoop.yarn.util.Records; +import org.mortbay.log.Log; + +import java.net.InetAddress; + +/** + * debug info for a yarn client (e.g., a landlord client) + */ +@Private +@Unstable +public abstract class OperationClientInfo { + + public static OperationClientInfo newInstance(String note) { + String username = System.getProperty("user.name"); + return newInstance(username, note); + } + + public static OperationClientInfo newInstance(String username, String note) { + String hostname = "unknown"; + String address = "unknown"; + try { + hostname = InetAddress.getLocalHost().getHostName(); + address = InetAddress.getLocalHost().getHostAddress(); + } catch (Exception e) { + Log.warn("Get client hostname and address failed: " + e); + } + return newInstance(username, hostname, address, note); + } + + public static OperationClientInfo newInstance(String username, String hostname, String address, String note) { + long timestamp = System.currentTimeMillis(); + OperationClientInfo info = Records.newRecord(OperationClientInfo.class); + info.setUsername(username); + info.setHostname(hostname); + info.setAddress(address); + info.setTimestamp(timestamp); + info.setNote(note); + return info; + } + + public abstract OperationClientInfoProto getProto(); + + public abstract String getUsername(); + + public abstract void setUsername(String username); + + public abstract String getHostname(); + + public abstract void setHostname(String hostname); + + public abstract String getAddress(); + + public abstract void setAddress(String address); + + public abstract long getTimestamp(); + + public abstract void setTimestamp(long timestamp); + + public abstract String getNote(); + + public abstract void setNote(String note); + + public String toString() { + return "user: " + getUsername() + + ", hostname: " + getHostname() + + ", address: " + getHostname() + + ", timestamp: " + getTimestamp() + + ", note: " + getNote(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateIDRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateIDRequest.java new file mode 100644 index 0000000..4173164 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateIDRequest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.UpdateIDRequestProto; +import org.apache.hadoop.yarn.util.Records; + +/** + * not used now + */ +@Private +@Unstable +public abstract class UpdateIDRequest { + public static UpdateIDRequest newInstance(String machine, String leaseID) { + UpdateIDRequest info = Records.newRecord(UpdateIDRequest.class); + info.setMachine(machine); + info.setLeaseID(leaseID); + return info; + } + + public abstract UpdateIDRequestProto getProto(); + + public abstract String getMachine(); + + public abstract void setMachine(String machine); + + public abstract String getLeaseID(); + + public abstract void setLeaseID(String leaseID); + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Machine=").append(getMachine()); + sb.append(", leaseID=").append(getLeaseID()); + return sb.toString(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateResourceRequest.java new file mode 100644 index 0000000..784d3ff --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/UpdateResourceRequest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.UpdateResourceRequestProto; +import org.apache.hadoop.yarn.util.Records; +import java.util.Set; + + +/** + * lease/recliam request (which will update a list of machines resource) + * Note: endtime is UTC time, and unit is milliseconds + */ +@Private +@Unstable +public abstract class UpdateResourceRequest { + public static UpdateResourceRequest newInstance(String landlord, Resource resource, long endTime, Set machines) { + UpdateResourceRequest info = Records.newRecord(UpdateResourceRequest.class); + info.setLandlord(landlord); + info.setResource(resource); + info.setEndTime(endTime); + info.setMachines(machines); + return info; + } + + public abstract UpdateResourceRequestProto getProto(); + + public abstract String getLandlord(); + + public abstract void setLandlord(String landlord); + + public abstract Resource getResource(); + + public abstract void setResource(Resource resource); + + public abstract long getEndTime(); + + public abstract void setEndTime(long endTime); + + public abstract Set getMachines(); + + public abstract void setMachines(Set machines); + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Landlord=").append(getLandlord()); + sb.append(", Resource=").append(getResource() == null ? null : getResource().toString()); + sb.append(", EndTime=").append(getEndTime()); + sb.append(", Machines=").append(getMachines() == null ? null : getMachines().toString()); + return sb.toString(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3bb73f5..f6a93ae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2435,6 +2435,30 @@ public static boolean isAclEnabled(Configuration conf) { SHARED_CACHE_PREFIX + "nm.uploader.thread-count"; public static final int DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT = 20; + // ////////////////////////////// + // normalize hostname configs + // ////////////////////////////// + public static final String NORMALIZE_HOSTNAME_CLASS = + "yarn.normalize.hostname.class"; + + //////////////////////////////// + // lease reclaim configs + //////////////////////////////// + /** The class to use as the lease/reclaim persistent store.*/ + public static final String LEASE_PREFIX = "yarn.lease."; + public static final String LEASE_STORE_PREFIX = LEASE_PREFIX + "store."; + public static final String LEASE_STORE_CLASS = LEASE_STORE_PREFIX + "class"; + /** ZK related configuration: + * note: only configure zk path here. + * For other zk related configuration, use RM_ZK_PREFIX related configurations. + * If there's requirement on using different zk from rm zk, + * we can add more items here about zk for lease/reclaim */ + public static final String ZK_LEASE_STORE_ZK_PREFIX = LEASE_STORE_PREFIX + "zk."; + public static final String ZK_LEASE_STORE_PATH = ZK_LEASE_STORE_ZK_PREFIX + "root"; + public static final String DEFAULT_ZK_LEASE_STORE_PATH = "/Lease"; + + + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/leasereclaim.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/leasereclaim.proto new file mode 100644 index 0000000..9fa4d0c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/leasereclaim.proto @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "YarnLeaseReclaimProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "Security.proto"; +import "yarn_protos.proto"; + +/* debug info for a yarn client (e.g., a landlord client) */ +message OperationClientInfoProto { + optional string username = 1; + optional string hostname = 2; + optional string address = 3; + optional int64 timestamp = 4; + optional string note = 5; +} + +/* lease/recliam request (which will update a list of machines resource) + * Note: endtime is UTC time, and unit is milliseconds + */ +message UpdateResourceRequestProto { + optional string landlord = 1; + optional ResourceProto resource = 2; + optional int64 endTime = 3; + repeated string machines = 4; +} + +/* not used now */ +message UpdateIDRequestProto { + optional string machine = 1; + optional string leaseID = 2; +} + +/* a request (including debug info and different types of requests) written to lease reclaim store */ +message LeaseRequestProto { + optional OperationClientInfoProto clientDebugInfo = 1; + optional UpdateResourceRequestProto updateResource = 2; + optional UpdateIDRequestProto updateID = 3; +} + +/* lease status for a machine + * Note: endtime is UTC time, and unit is milliseconds + */ +message LeaseNodeProto { + optional ResourceProto resource = 1; + optional int64 endTime = 2; + optional string landlord = 3; +} + +/* a machine and its lease status */ +message MachineLeaseInfoProto { + optional string machine = 1; + optional LeaseNodeProto status = 2; +} + +/* not used now */ +message MachineLeaseIDProto { + optional string machine = 1; + optional string leaseID = 2; +} + +/* a map of machine and lease status, persist in lease reclaim store */ +message MachineLeaseInfoMapProto { + repeated MachineLeaseInfoProto nodeToStatus = 1; +} + + + + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/leasereclaim/LeaseReclaimCLI.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/leasereclaim/LeaseReclaimCLI.java new file mode 100644 index 0000000..8a1991d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/leasereclaim/LeaseReclaimCLI.java @@ -0,0 +1,477 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.yarn.client.leasereclaim; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAAdmin; +import org.apache.hadoop.ha.HAServiceTarget; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.LeaseNode; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.RMHAServiceTarget; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +/** + * lease/reclaim client + */ +@Private +@Unstable +public class LeaseReclaimCLI extends HAAdmin { + + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + private LeaseReclaimProtocol leaseReclaimProtocol = null; + + protected final static Map ADMIN_USAGE = + ImmutableMap.builder() + .put("-leaseNodes", + new UsageInfo("[node1,node2,... resource duration] \n\t\t" + + " resource should be ,, All means all available resources, e.g.: 1024,8) \n\t\t" + + " duration should be , 0 means forever, e.g.: 30)", + "Lease a set of nodes to a YARN managed cluster")) + .put("-reclaimNodes", + new UsageInfo("[node1,node2,...]", + "Reclaim a set of nodes previously leased to a YARN managed cluster")) + .put("-getLeasedNodes", + new UsageInfo("[-all|-landlord ] \n\t\t" + + " -all means get all leasedNodes; \n\t\t" + + " -landlord means get leasednodes leased by . If is not specified, get the leasednodes leased by the client user", + "get a set of leased nodes information")) + .build(); + + public LeaseReclaimCLI() { + super(); + } + + public LeaseReclaimCLI(Configuration conf) { + super(conf); + } + + private static void appendHAUsage(final StringBuilder usageBuilder) { + for (String cmdKey : USAGE.keySet()) { + if (cmdKey.equals("-help")) { + continue; + } + UsageInfo usageInfo = USAGE.get(cmdKey); + usageBuilder.append(" [" + cmdKey + " " + usageInfo.args + "]"); + } + } + + private static void buildHelpMsg(String cmd, StringBuilder builder) { + UsageInfo usageInfo = ADMIN_USAGE.get(cmd); + if (usageInfo == null) { + usageInfo = USAGE.get(cmd); + if (usageInfo == null) { + return; + } + } + String space = (usageInfo.args == "") ? "" : " "; + builder.append(" " + cmd + space + usageInfo.args + ": " + + usageInfo.help); + } + + private static void buildIndividualUsageMsg(String cmd, + StringBuilder builder) { + boolean isHACommand = false; + UsageInfo usageInfo = ADMIN_USAGE.get(cmd); + if (usageInfo == null) { + usageInfo = USAGE.get(cmd); + if (usageInfo == null) { + return; + } + isHACommand = true; + } + String space = (usageInfo.args == "") ? "" : " "; + builder.append("Usage: yarn landlord [" + + cmd + space + usageInfo.args + + "]\n"); + if (isHACommand) { + builder.append(cmd + " can only be used when RM HA is enabled"); + } + } + + private static void buildUsageMsg(StringBuilder builder, + boolean isHAEnabled) { + builder.append("Usage: yarn landlord\n"); + for (String cmdKey : ADMIN_USAGE.keySet()) { + UsageInfo usageInfo = ADMIN_USAGE.get(cmdKey); + builder.append(" " + cmdKey + " " + usageInfo.args + "\n"); + } + if (isHAEnabled) { + for (String cmdKey : USAGE.keySet()) { + if (!cmdKey.equals("-help")) { + UsageInfo usageInfo = USAGE.get(cmdKey); + builder.append(" " + cmdKey + " " + usageInfo.args + "\n"); + } + } + } + } + + private static void printHelp(String cmd, boolean isHAEnabled) { + StringBuilder summary = new StringBuilder(); + summary.append("landlord is the command to execute YARN landlord commands.\n"); + summary.append("The full syntax is: \n\n" + + "yarn landlord " + + " [-leaseNodes [node1 node2 ...]]" + + " [-reclaimNodes [node1 node2 ...]]" + + " [-getLeasedNodes [-all|-landlord ]]" + + " [-help [cmd]]"); + if (isHAEnabled) { + appendHAUsage(summary); + } + summary.append("\n"); + + StringBuilder helpBuilder = new StringBuilder(); + System.out.println(summary); + for (String cmdKey : ADMIN_USAGE.keySet()) { + buildHelpMsg(cmdKey, helpBuilder); + helpBuilder.append("\n"); + } + if (isHAEnabled) { + for (String cmdKey : USAGE.keySet()) { + if (!cmdKey.equals("-help")) { + buildHelpMsg(cmdKey, helpBuilder); + helpBuilder.append("\n"); + } + } + } + System.out.println(helpBuilder); + System.out.println(); + ToolRunner.printGenericCommandUsage(System.out); + } + + /** + * Displays format of commands. + * + * @param cmd The command that is being executed. + */ + private static void printUsage(String cmd, boolean isHAEnabled) { + StringBuilder usageBuilder = new StringBuilder(); + if (ADMIN_USAGE.containsKey(cmd) || USAGE.containsKey(cmd)) { + buildIndividualUsageMsg(cmd, usageBuilder); + } else { + buildUsageMsg(usageBuilder, isHAEnabled); + } + System.err.println(usageBuilder); + ToolRunner.printGenericCommandUsage(System.err); + + } + + /** get hostname list from args */ + private Set getNodes(String args) { + + System.out.println("DEBUG:: getNodes(\"" + args + "\")"); + + String[] nodes = args.split(","); + Set set = new HashSet(); + for (String host : nodes) { + //TODO: remove these lines. Later, will check whether add default port here is needed. + /** + if (host.indexOf(':') == -1) { + host += ":" + YarnConfiguration.DEFAULT_NM_PORT; + } + */ + set.add(host); + } + + System.out.println("DEBUG:: getNodes().size()=" + set.size()); + + return set; + } + + /** get resource from args */ + private Resource getResource(String args) throws IllegalArgumentException { + System.out + .println("DEBUG:: getResource(\"" + args + "\")"); + + if (args.equalsIgnoreCase("ALL")) { + return null; + } + + String[] resource = args.split(","); + int memory, vCores; + try { + memory = Integer.parseInt(resource[0]); + vCores = Integer.parseInt(resource[1]); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid resource format: " + e.getMessage()); + } + + System.out.println("DEBUG:: getResource().memory=" + + memory + " , vCores=" + vCores); + + return Resource.newInstance(memory, vCores); + } + + /** get endTime from args */ + private long getTime(String args) { + System.out.println("DEBUG:: getTime(\"" + args + "\")"); + + long minutes = Long.parseLong(args); + if (minutes <= 0) { + return LeaseReclaimProtocol.DEFAULT_RESOURCE_TIMEOUT; + } + return System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(minutes); + } + + private int leaseNodes(Set nodes, Resource resource, long endTime, String landlordId) throws Exception { + if (landlordId != null) { + leaseReclaimProtocol.setLandlordId(landlordId); + } + leaseReclaimProtocol.lease(nodes, resource, endTime); + return 0; + } + + private int leaseNodes(Set nodes, Resource resource, long endTime) throws Exception { + return leaseNodes(nodes, resource, endTime, System.getenv("USERNAME")); + } + + /** called by cmd */ + private int leaseNodes(String nodes, String resource, String duration) + throws Exception{ + Set set = getNodes(nodes); + Resource r = getResource(resource); + long endTime = getTime(duration); + + SimpleDateFormat df = new SimpleDateFormat("hh:mm:ss MM/dd/yyyy"); + df.setTimeZone(TimeZone.getTimeZone("UTC")); + String now = df.format(System.currentTimeMillis()); + String dfStr = df.format(endTime); + + StringBuilder sb = new StringBuilder(); + sb.append("Lease nodes " + set.toString()); + sb.append(", with resource per node: " + (r == null ? "registered capacity" : r)); + sb.append(", for " + duration + " minutes, from UTC time " + now + " to UTC time " + dfStr + " (" + + endTime + ")"); + System.out.println(sb); + + return leaseNodes(set, r, endTime); + } + + private int reclaimNodes(Set nodes, String landlordId) throws Exception { + if (landlordId != null) { + leaseReclaimProtocol.setLandlordId(landlordId); + } + leaseReclaimProtocol.reclaim(nodes); + return 0; + } + + private int reclaimNodes(Set nodes) throws Exception { + return reclaimNodes(nodes, System.getenv("USERNAME")); + } + + /** called by cmd */ + private int reclaimNodes(String args) throws Exception { + Set set = getNodes(args); + + StringBuilder sb = new StringBuilder(); + sb.append("Reclaim nodes " + set.toString()); + System.out.println(sb); + + return reclaimNodes(set); + } + + /** called by cmd */ + private int getLeasedNodes(String opt, String landlord) throws Exception { + Map result = null; + boolean all = opt.equalsIgnoreCase("-all"); + + if (!opt.equalsIgnoreCase("-all") && !opt.equalsIgnoreCase("-landlord")) { + throw new IllegalArgumentException("wrong argument"); + } + result = leaseReclaimProtocol.getLeasedNodes(all, landlord); + SimpleDateFormat df = new SimpleDateFormat("hh:mm:ss MM/dd/yyyy"); + df.setTimeZone(TimeZone.getTimeZone("UTC")); + String now = df.format(System.currentTimeMillis()); + + StringBuilder sb0 = new StringBuilder(); + sb0.append("Get " + opt + "(" + landlord + ") Leased Nodes at UTC time " + + now); + sb0.append(" (Total " + result.size() + " nodes) :"); + System.out.println(sb0); + + for (Map.Entry entry : result.entrySet()) { + StringBuilder sb = new StringBuilder(); + sb.append("hostname:" + entry.getKey()); + sb.append(", landlord:" + entry.getValue().getLandlord()); + sb.append(", resource:" + entry.getValue().getResource()); + + long endTime = entry.getValue().getEndTime(); + String dfStr = df.format(endTime); + sb.append(", endtime:" + endTime + "(UTC time " + dfStr + ", timeout=" + + entry.getValue().isTimeout() + ")"); + System.out.println(sb); + } + return 0; + } + + @Override + public int run(String[] args) throws Exception { + List argsList = new ArrayList(); + for (int i = 0; i < args.length; i++) { + argsList.add(args[i]); + } + args = argsList.toArray(new String[0]); + + YarnConfiguration yarnConf = + getConf() == null ? new YarnConfiguration() : new YarnConfiguration( + getConf()); + leaseReclaimProtocol = new LeaseReclaimProtocol(yarnConf); + + boolean isHAEnabled = + yarnConf.getBoolean(YarnConfiguration.RM_HA_ENABLED, + YarnConfiguration.DEFAULT_RM_HA_ENABLED); + + if (args.length < 1) { + printUsage("", isHAEnabled); + return -1; + } + + int exitCode = -1; + int i = 0; + String cmd = args[i++]; + + exitCode = 0; + if ("-help".equals(cmd)) { + if (i < args.length) { + printUsage(args[i], isHAEnabled); + } else { + printHelp("", isHAEnabled); + } + return exitCode; + } + + if (USAGE.containsKey(cmd)) { + if (isHAEnabled) { + return super.run(args); + } + System.out.println("Cannot run " + cmd + + " when ResourceManager HA is not enabled"); + return -1; + } + + try { + if ("-leaseNodes".equals(cmd)) { + if (i + 2 >= args.length) { + System.err.println("No lease nodes are specified"); + exitCode = -1; + } else { + exitCode = leaseNodes(args[i], args[i + 1], args[i + 2]); + } + i = +2; + } else if ("-reclaimNodes".equals(cmd)) { + if (i >= args.length) { + System.err.println("No lease nodes are specified"); + exitCode = -1; + } else { + exitCode = reclaimNodes(args[i]); + } + } else if ("-getLeasedNodes".equals(cmd)) { + if (i >= args.length) { + System.err.println("lack of argument!"); + exitCode = -1; + } else { + String landlord = null; + if (i + 1 < args.length) { + landlord = args[i + 1]; + } + exitCode = getLeasedNodes(args[i], landlord); + } + } else { + exitCode = -1; + System.err.println(cmd.substring(1) + ": Unknown command"); + printUsage("", isHAEnabled); + } + + } catch (IllegalArgumentException arge) { + exitCode = -1; + System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage()); + printUsage(cmd, isHAEnabled); + } catch (RemoteException e) { + // + // This is a error returned by hadoop server. Print + // out the first line of the error mesage, ignore the stack trace. + exitCode = -1; + try { + String[] content; + content = e.getLocalizedMessage().split("\n"); + System.err.println(cmd.substring(1) + ": " + + content[0]); + } catch (Exception ex) { + System.err.println(cmd.substring(1) + ": " + + ex.getLocalizedMessage()); + } + } catch (Exception e) { + exitCode = -1; + System.err.println(cmd.substring(1) + ": " + + e.getLocalizedMessage()); + } + return exitCode; + } + + @Override + protected HAServiceTarget resolveTarget(String rmId) { + Collection rmIds = HAUtil.getRMHAIds(getConf()); + if (!rmIds.contains(rmId)) { + StringBuilder msg = new StringBuilder(); + msg.append(rmId + " is not a valid serviceId. It should be one of "); + for (String id : rmIds) { + msg.append(id + " "); + } + throw new IllegalArgumentException(msg.toString()); + } + try { + YarnConfiguration conf = new YarnConfiguration(getConf()); + conf.set(YarnConfiguration.RM_HA_ID, rmId); + return new RMHAServiceTarget(conf); + } catch (IllegalArgumentException iae) { + throw new YarnRuntimeException("Could not connect to " + rmId + + "; the configuration for it might be missing"); + } catch (IOException ioe) { + throw new YarnRuntimeException( + "Could not connect to RM HA Admin for node " + rmId); + } + } + + public static void main(String[] args) throws Exception { + int result = ToolRunner.run(new LeaseReclaimCLI(), args); + System.exit(result); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/leasereclaim/LeaseReclaimProtocol.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/leasereclaim/LeaseReclaimProtocol.java new file mode 100644 index 0000000..325a4a2 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/leasereclaim/LeaseReclaimProtocol.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.hadoop.yarn.client.leasereclaim; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.LeaseNode; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.leasereclaim.LeaseReclaimStore; + +import java.text.SimpleDateFormat; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; + + +/** + * APIs used by lease/reclaim client (landlord) + */ +public class LeaseReclaimProtocol { + + private static final Log LOG = LogFactory.getLog(LeaseReclaimProtocol.class); + + public static final Resource ZERO_CAPACITY = Resource.newInstance(0, 0); + public static final long DEFAULT_RESOURCE_TIMEOUT = LeaseNode.RESOURCE_NEVER_TIMEOUT; + + private String landlordId; + private LeaseReclaimStore leaseReclaimStore = null; + + public LeaseReclaimProtocol(YarnConfiguration conf) throws Exception { + leaseReclaimStore = LeaseReclaimStore.newInstance(conf); + if (leaseReclaimStore == null) { + throw new YarnException("LeaseReclaimStore class is not configured," + + " can't execute commands!"); + } + } + + public void lease(Set nodes) throws Exception { + lease(nodes, null, DEFAULT_RESOURCE_TIMEOUT); + } + + public void lease(Set nodes, long endTime) throws Exception { + lease(nodes, null, endTime); + } + + public void lease(Set nodes, Resource resource) throws Exception { + lease(nodes, resource, DEFAULT_RESOURCE_TIMEOUT); + } + + public void lease(Set nodes, Resource resource, long endTime) throws Exception { + String time = "never timeout"; + if (endTime != DEFAULT_RESOURCE_TIMEOUT) { + SimpleDateFormat df = new SimpleDateFormat("hh:mm:ss MM/dd/yyyy"); + df.setTimeZone(TimeZone.getTimeZone("UTC")); + time = df.format(endTime); + } + String resourceStr = "registered capacity"; + if (resource != null) { + resourceStr = resource.toString(); + } + LOG.info("Lease " + nodes.size() + " node(s) each with resource " + resourceStr + ", endTime is " + time + "(" + endTime + ")."); + updateNodeResource(nodes, resource, endTime); + } + + public void reclaim(Set nodes) throws Exception { + updateNodeResource(nodes, ZERO_CAPACITY, DEFAULT_RESOURCE_TIMEOUT); + LOG.info("Reclaim " + nodes.size() + " node(s)"); + } + + public Map getLeasedNodes(boolean all, String landlord) + throws Exception { + if (all) { + landlord = null; + } + return leaseReclaimStore.getMachineLeaseInfoMapFromStore(landlord); + } + + /** + * This method implements both lease and reclaim APIs by + * updating the amount of usable resources on the specified nodes. + * + * @param nodes + * @param resource + * @param endTime + * @throws Exception + */ + private void updateNodeResource(Set nodes, Resource resource, long endTime) + throws Exception { + leaseReclaimStore.putNewLeaseRequestToStore(this.landlordId, resource, endTime, nodes); + } + + public void setLandlordId(String landlordId) { + this.landlordId = landlordId; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml index 8f31874..aca8b0c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml @@ -184,6 +184,12 @@ log4j log4j + + org.apache.zookeeper + zookeeper + test-jar + test + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LeaseNodePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LeaseNodePBImpl.java new file mode 100644 index 0000000..a25abed --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LeaseNodePBImpl.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.LeaseNode; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.LeaseNodeProto; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.LeaseNodeProtoOrBuilder; + +/** + * lease status for a machine + * Note: endtime is UTC time, and unit is milliseconds + */ +public class LeaseNodePBImpl extends LeaseNode { + + LeaseNodeProto proto = LeaseNodeProto.getDefaultInstance(); + LeaseNodeProto.Builder builder = null; + boolean viaProto = false; + + public LeaseNodePBImpl() { + builder = LeaseNodeProto.newBuilder(); + } + + public LeaseNodePBImpl(LeaseNodeProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public LeaseNodeProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = LeaseNodeProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String getLandlord() { + LeaseNodeProtoOrBuilder p = viaProto ? proto : builder; + return p.getLandlord(); + } + + @Override + public void setLandlord(String landlord) { + maybeInitBuilder(); + if (landlord == null) { + builder.clearLandlord(); + return; + } + builder.setLandlord(landlord); + } + + @Override + public Resource getResource() { + LeaseNodeProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasResource()) { + return null; + } + return new ResourcePBImpl(p.getResource()); + } + + @Override + public void setResource(Resource resource) { + maybeInitBuilder(); + if (resource == null) { + builder.clearResource(); + return; + } + builder.setResource(((ResourcePBImpl) resource).getProto()); + } + + @Override + public long getEndTime() { + LeaseNodeProtoOrBuilder p = viaProto ? proto : builder; + return p.getEndTime(); + } + + @Override + public void setEndTime(long endTime) { + maybeInitBuilder(); + builder.setEndTime(endTime); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LeaseRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LeaseRequestPBImpl.java new file mode 100644 index 0000000..a1a9781 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LeaseRequestPBImpl.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.LeaseRequest; +import org.apache.hadoop.yarn.api.records.OperationClientInfo; +import org.apache.hadoop.yarn.api.records.UpdateResourceRequest; +import org.apache.hadoop.yarn.api.records.UpdateIDRequest; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.LeaseRequestProto; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.LeaseRequestProtoOrBuilder; + +/** + * a lease/reclaim request (including debug info and different types of requests), + * sent from Landlord (client), + * will be handled by LeaseManager (server) + */ +public class LeaseRequestPBImpl extends LeaseRequest { + + LeaseRequestProto proto = LeaseRequestProto.getDefaultInstance(); + LeaseRequestProto.Builder builder = null; + boolean viaProto = false; + + public LeaseRequestPBImpl() { + builder = LeaseRequestProto.newBuilder(); + } + + public LeaseRequestPBImpl(LeaseRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public LeaseRequestProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = LeaseRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public OperationClientInfo getClientDebugInfo() { + LeaseRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasClientDebugInfo()) { + return null; + } + return new OperationClientInfoPBImpl(p.getClientDebugInfo()); + } + + @Override + public void setClientDebugInfo(OperationClientInfo clientDebugInfo) { + maybeInitBuilder(); + if (clientDebugInfo == null) { + builder.clearClientDebugInfo(); + return; + } + builder.setClientDebugInfo(((OperationClientInfoPBImpl) clientDebugInfo).getProto()); + } + + @Override + public UpdateResourceRequest getUpdateResource() { + LeaseRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasUpdateResource()) { + return null; + } + return new UpdateResourceRequestPBImpl(p.getUpdateResource()); + + } + + @Override + public void setUpdateResource(UpdateResourceRequest request) { + maybeInitBuilder(); + if (request == null) { + builder.clearUpdateResource(); + return; + } + builder.setUpdateResource(((UpdateResourceRequestPBImpl) request).getProto()); + } + + @Override + public UpdateIDRequest getUpdateID() { + LeaseRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasUpdateID()) { + return null; + } + return new UpdateIDRequestPBImpl(p.getUpdateID()); + } + + @Override + public void setUpdateID(UpdateIDRequest request) { + maybeInitBuilder(); + if (request == null) { + builder.clearUpdateID(); + return; + } + builder.setUpdateID(((UpdateIDRequestPBImpl) request).getProto()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MachineLeaseIDPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MachineLeaseIDPBImpl.java new file mode 100644 index 0000000..567ac22 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MachineLeaseIDPBImpl.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.MachineLeaseID; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.MachineLeaseIDProto; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.MachineLeaseIDProtoOrBuilder; + +/** + * not used now + */ +public class MachineLeaseIDPBImpl extends MachineLeaseID { + + MachineLeaseIDProto proto = MachineLeaseIDProto.getDefaultInstance(); + MachineLeaseIDProto.Builder builder = null; + boolean viaProto = false; + + public MachineLeaseIDPBImpl() { + builder = MachineLeaseIDProto.newBuilder(); + } + + public MachineLeaseIDPBImpl(MachineLeaseIDProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public MachineLeaseIDProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = MachineLeaseIDProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String getMachine() { + MachineLeaseIDProtoOrBuilder p = viaProto ? proto : builder; + return p.getMachine(); + } + + @Override + public void setMachine(String machine) { + maybeInitBuilder(); + if (machine == null) { + builder.clearMachine(); + return; + } + builder.setMachine(machine); + } + + @Override + public String getLeaseID() { + MachineLeaseIDProtoOrBuilder p = viaProto ? proto : builder; + return p.getLeaseID(); + } + + @Override + public void setLeaseID(String leaseID) { + maybeInitBuilder(); + if (leaseID == null) { + builder.clearLeaseID(); + return; + } + builder.setLeaseID(leaseID); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MachineLeaseInfoMapPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MachineLeaseInfoMapPBImpl.java new file mode 100644 index 0000000..ad717ba --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MachineLeaseInfoMapPBImpl.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.LeaseNode; +import org.apache.hadoop.yarn.api.records.MachineLeaseInfo; +import org.apache.hadoop.yarn.api.records.MachineLeaseInfoMap; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.MachineLeaseInfoProto; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.MachineLeaseInfoMapProto; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.MachineLeaseInfoMapProtoOrBuilder; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * a map of machine and lease status, persist in lease reclaim store + */ +public class MachineLeaseInfoMapPBImpl extends MachineLeaseInfoMap { + + MachineLeaseInfoMapProto proto = MachineLeaseInfoMapProto.getDefaultInstance(); + MachineLeaseInfoMapProto.Builder builder = null; + boolean viaProto = false; + private Map nodeToStatus = null; + + public MachineLeaseInfoMapPBImpl() { + builder = MachineLeaseInfoMapProto.newBuilder(); + } + + public MachineLeaseInfoMapPBImpl(MachineLeaseInfoMapProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public MachineLeaseInfoMapProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = MachineLeaseInfoMapProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.nodeToStatus != null && !this.nodeToStatus.isEmpty()) { + builder.clearNodeToStatus(); + for (Map.Entry entry : this.nodeToStatus.entrySet()) { + MachineLeaseInfoProto p = + MachineLeaseInfo.newInstance(entry.getKey(), entry.getValue()).getProto(); + builder.addNodeToStatus(p); + } + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + + + @Override + public Map getNodeToStatus() { + initNodeToStatus(); + return this.nodeToStatus; + } + + @Override + public void setNodeToStatus(Map status) { + maybeInitBuilder(); + this.nodeToStatus = new HashMap(); + if (status == null || status.size() <= 0) { + builder.clearNodeToStatus(); + return; + } + this.nodeToStatus.putAll(status); + } + + private void initNodeToStatus() { + if (this.nodeToStatus != null) { + return; + } + MachineLeaseInfoMapProtoOrBuilder p = viaProto ? proto : builder; + List statusList = p.getNodeToStatusList(); + + this.nodeToStatus = new HashMap(); + for (MachineLeaseInfoProto s : statusList) { + this.nodeToStatus.put(s.getMachine(), new LeaseNodePBImpl(s.getStatus())); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MachineLeaseInfoPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MachineLeaseInfoPBImpl.java new file mode 100644 index 0000000..9ae4591 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/MachineLeaseInfoPBImpl.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.LeaseNode; +import org.apache.hadoop.yarn.api.records.MachineLeaseInfo; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.MachineLeaseInfoProto; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.MachineLeaseInfoProtoOrBuilder; + +/** + * a machine and its lease status + */ +public class MachineLeaseInfoPBImpl extends MachineLeaseInfo { + + MachineLeaseInfoProto proto = MachineLeaseInfoProto.getDefaultInstance(); + MachineLeaseInfoProto.Builder builder = null; + boolean viaProto = false; + + public MachineLeaseInfoPBImpl() { + builder = MachineLeaseInfoProto.newBuilder(); + } + + public MachineLeaseInfoPBImpl(MachineLeaseInfoProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public MachineLeaseInfoProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = MachineLeaseInfoProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String getMachine() { + MachineLeaseInfoProtoOrBuilder p = viaProto ? proto : builder; + return p.getMachine(); + } + + @Override + public void setMachine(String machine) { + maybeInitBuilder(); + if (machine == null) { + builder.clearMachine(); + return; + } + builder.setMachine(machine); + } + + @Override + public LeaseNode getStatus() { + MachineLeaseInfoProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasStatus()) { + return null; + } + return new LeaseNodePBImpl(p.getStatus()); + } + + @Override + public void setStatus(LeaseNode status) { + maybeInitBuilder(); + if (status == null) { + builder.clearStatus(); + return; + } + builder.setStatus(((LeaseNodePBImpl) status).getProto()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/OperationClientInfoPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/OperationClientInfoPBImpl.java new file mode 100644 index 0000000..bb9cd58 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/OperationClientInfoPBImpl.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.OperationClientInfo; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.OperationClientInfoProto; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.OperationClientInfoProtoOrBuilder; + +/** + * debug info for a yarn client (e.g., a landlord client) + */ +public class OperationClientInfoPBImpl extends OperationClientInfo { + + OperationClientInfoProto proto = OperationClientInfoProto.getDefaultInstance(); + OperationClientInfoProto.Builder builder = null; + boolean viaProto = false; + + public OperationClientInfoPBImpl() { + builder = OperationClientInfoProto.newBuilder(); + } + + public OperationClientInfoPBImpl(OperationClientInfoProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public OperationClientInfoProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = OperationClientInfoProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String getUsername() { + OperationClientInfoProtoOrBuilder p = viaProto ? proto : builder; + return p.getUsername(); + } + + @Override + public void setUsername(String username) { + maybeInitBuilder(); + if (username == null) { + builder.clearUsername(); + return; + } + builder.setUsername(username); + } + + @Override + public String getHostname() { + OperationClientInfoProtoOrBuilder p = viaProto ? proto : builder; + return p.getHostname(); + } + + @Override + public void setHostname(String hostname) { + maybeInitBuilder(); + if (hostname == null) { + builder.clearHostname(); + return; + } + builder.setHostname(hostname); + } + + @Override + public String getAddress() { + OperationClientInfoProtoOrBuilder p = viaProto ? proto : builder; + return p.getAddress(); + } + + @Override + public void setAddress(String address) { + maybeInitBuilder(); + if (address == null) { + builder.clearAddress(); + return; + } + builder.setAddress(address); + } + + @Override + public long getTimestamp() { + OperationClientInfoProtoOrBuilder p = viaProto ? proto : builder; + return p.getTimestamp(); + } + + @Override + public void setTimestamp(long timestamp) { + maybeInitBuilder(); + builder.setTimestamp(timestamp); + } + + @Override + public String getNote() { + OperationClientInfoProtoOrBuilder p = viaProto ? proto : builder; + return p.getNote(); + } + + @Override + public void setNote(String note) { + maybeInitBuilder(); + if (note == null) { + builder.clearNote(); + return; + } + builder.setNote(note); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateIDRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateIDRequestPBImpl.java new file mode 100644 index 0000000..f83e85c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateIDRequestPBImpl.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.UpdateIDRequest; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.UpdateIDRequestProto; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.UpdateIDRequestProtoOrBuilder; + +/** + * not used now + */ +public class UpdateIDRequestPBImpl extends UpdateIDRequest { + + UpdateIDRequestProto proto = UpdateIDRequestProto.getDefaultInstance(); + UpdateIDRequestProto.Builder builder = null; + boolean viaProto = false; + + public UpdateIDRequestPBImpl() { + builder = UpdateIDRequestProto.newBuilder(); + } + + public UpdateIDRequestPBImpl(UpdateIDRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public UpdateIDRequestProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UpdateIDRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public String getMachine() { + UpdateIDRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.getMachine(); + } + + @Override + public void setMachine(String machine) { + maybeInitBuilder(); + if (machine == null) { + builder.clearMachine(); + return; + } + builder.setMachine(machine); + } + + @Override + public String getLeaseID() { + UpdateIDRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.getLeaseID(); + } + + @Override + public void setLeaseID(String leaseID) { + maybeInitBuilder(); + if (leaseID == null) { + builder.clearLeaseID(); + return; + } + builder.setLeaseID(leaseID); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateResourceRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateResourceRequestPBImpl.java new file mode 100644 index 0000000..bc22eee --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/UpdateResourceRequestPBImpl.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.UpdateResourceRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.UpdateResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.UpdateResourceRequestProtoOrBuilder; + +import java.util.HashSet; +import java.util.Set; + +/** + * lease/recliam request (which will update a list of machines resource) + * Note: endtime is UTC time, and unit is milliseconds + */ +public class UpdateResourceRequestPBImpl extends UpdateResourceRequest { + + UpdateResourceRequestProto proto = UpdateResourceRequestProto.getDefaultInstance(); + UpdateResourceRequestProto.Builder builder = null; + boolean viaProto = false; + + Set machines = null; + + public UpdateResourceRequestPBImpl() { + builder = UpdateResourceRequestProto.newBuilder(); + } + + public UpdateResourceRequestPBImpl(UpdateResourceRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + @Override + public UpdateResourceRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = UpdateResourceRequestProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.machines != null && !this.machines.isEmpty()) { + builder.clearMachines(); + builder.addAllMachines(this.machines); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + + @Override + public String getLandlord() { + UpdateResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.getLandlord(); + } + + @Override + public void setLandlord(String landlord) { + maybeInitBuilder(); + if (landlord == null) { + builder.clearLandlord(); + return; + } + builder.setLandlord(landlord); + } + + @Override + public Resource getResource() { + UpdateResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasResource()) { + return null; + } + return new ResourcePBImpl(p.getResource()); + } + + @Override + public void setResource(Resource resource) { + maybeInitBuilder(); + if (resource == null) { + builder.clearResource(); + return; + } + builder.setResource(((ResourcePBImpl) resource).getProto()); + } + + @Override + public long getEndTime() { + UpdateResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.getEndTime(); + } + + @Override + public void setEndTime(long endTime) { + maybeInitBuilder(); + builder.setEndTime(endTime); + } + + @Override + public Set getMachines() { + initMachines(); + return this.machines; + } + + @Override + public void setMachines(Set machines) { + maybeInitBuilder(); + this.machines = new HashSet(); + if (machines == null || machines.size() <= 0) { + builder.clearMachines(); + return; + } + this.machines.addAll(machines); + } + + + private void initMachines() { + if (this.machines != null) { + return; + } + UpdateResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + this.machines = new HashSet(); + this.machines.addAll(p.getMachinesList()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/leasereclaim/LeaseReclaimStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/leasereclaim/LeaseReclaimStore.java new file mode 100644 index 0000000..0802f89 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/leasereclaim/LeaseReclaimStore.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.leasereclaim; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.LeaseNode; +import org.apache.hadoop.yarn.api.records.LeaseRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.UpdateResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.normalize.NormalizeHostname; + +import java.io.Closeable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * lease reclaim store persists leased machines' lease status + */ +public abstract class LeaseReclaimStore implements Closeable { + private static final Log LOG = LogFactory.getLog(LeaseReclaimStore.class); + protected Configuration conf; + protected NormalizeHostname normalizer; + + public static LeaseReclaimStore newInstance(Configuration conf) throws Exception { + Class storeClass = + conf.getClass(YarnConfiguration.LEASE_STORE_CLASS, + null, LeaseReclaimStore.class); + if (storeClass == null) { + LOG.warn(YarnConfiguration.LEASE_STORE_CLASS + " is not configured!"); + return null; + } + LeaseReclaimStore store = + ReflectionUtils.newInstance(storeClass, conf); + store.init(conf); + return store; + } + + public String normalizeHostname(String hostname) { + return normalizer.normalizeHostname(hostname); + } + + public Map getMachineLeaseInfoMap(String landlord) throws Exception { + Map nodeToStatusMap = getMachineLeaseInfoMap(); + if (nodeToStatusMap == null) { + return null; + } + return filterMachineLeaseInfoMap(nodeToStatusMap, landlord); + } + + public Map getMachineLeaseInfoMapFromStore(String landlord) throws Exception { + Map nodeToStatusMap = getMachineLeaseInfoMapFromStore(); + if (nodeToStatusMap == null) { + return null; + } + return filterMachineLeaseInfoMap(nodeToStatusMap, landlord); + } + + // call by lease Manager + public LeaseNode getHostLeaseStatus(String host) { + String node = normalizeHostname(host); + return getMachineLeaseInfo(node); + } + + // call by lease Manager + public void startManager() throws Exception { + recover(); + startHandlingRequests(); + } + + // call by lease reclaim client + public void putNewLeaseRequestToStore(String landlord, Resource resource, long endTime, Set machines) + throws Exception { + UpdateResourceRequest updateRequest = UpdateResourceRequest.newInstance(landlord, resource, endTime, machines); + LeaseRequest request = LeaseRequest.newInstance(updateRequest, null, null); + putNewLeaseRequestToStore(request); + } + + protected void init(Configuration conf) throws Exception { + this.conf = conf; + this.normalizer = NormalizeHostname.newInstance(conf); + } + + private Map filterMachineLeaseInfoMap(Map nodeToStatusMap, String landlord) throws Exception { + if (landlord == null) return nodeToStatusMap; + + Map result = new HashMap(); + for (Map.Entry entry : nodeToStatusMap.entrySet()) { + if (entry.getValue().getLandlord().equals(landlord)) { + result.put(entry.getKey(), entry.getValue()); + } + } + return result; + } + + /* should be called before handling UpdateResourceRequest in each store class */ + protected LeaseNode beforeHandleUpdateResourceRequest(UpdateResourceRequest request, Set nodes) throws Exception { + if (nodes == null || request == null || request.getMachines() == null || request.getMachines().isEmpty()) { + LOG.warn("No valid machines in this request"); + return null; + } + nodes.clear(); + Set machines = request.getMachines(); + for (String machine : machines) { + nodes.add(normalizeHostname(machine)); + } + LeaseNode status = LeaseNode.newInstance(request.getLandlord(), request.getResource(), request.getEndTime()); + return status; + } + + /* should be called after handling UpdateResourceRequest in each store class */ + protected void afterHandleUpdateResourceRequest(Set nodes, LeaseNode status) throws Exception { + // Note: calling updateNodeResource in NM HB handler instead of calling Lease Manager + // updateNodeResource(Set nodes, LeaseNode status) here + } + + // Implement according to store class + // Note: all nodes handled in the store class should be normalized: + // 1) for Lease Manager, hosts are normalized before getxxx and updatexxx + // 2) any internal store functions are handling normalized nodes + + // call by lease Manager + protected abstract void recover() throws Exception; + + // call by lease Manager + protected abstract void startHandlingRequests() throws Exception; + + // call by lease Manager + protected abstract LeaseNode getMachineLeaseInfo(String node); + + // call by lease Manager + protected abstract Map getMachineLeaseInfoMap(); + + // call by client + protected abstract Map getMachineLeaseInfoMapFromStore() throws Exception; + + // call by client + protected abstract void putNewLeaseRequestToStore(LeaseRequest request) throws Exception; + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/leasereclaim/ZookeeperLeaseReclaimStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/leasereclaim/ZookeeperLeaseReclaimStore.java new file mode 100644 index 0000000..5f2a70f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/leasereclaim/ZookeeperLeaseReclaimStore.java @@ -0,0 +1,872 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.leasereclaim; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ZKUtil; +import org.apache.hadoop.yarn.api.records.LeaseNode; +import org.apache.hadoop.yarn.api.records.LeaseRequest; +import org.apache.hadoop.yarn.api.records.MachineLeaseInfoMap; +import org.apache.hadoop.yarn.api.records.UpdateResourceRequest; +import org.apache.hadoop.yarn.api.records.impl.pb.LeaseRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.MachineLeaseInfoMapPBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos; +import org.apache.hadoop.yarn.proto.YarnLeaseReclaimProtos.MachineLeaseInfoMapProto; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +/** + * lease reclaim store implemented on zookeeper + * Note: all nodes handled in this class should be normalized + */ +public class ZookeeperLeaseReclaimStore extends LeaseReclaimStore { + protected static final Log LOG = LogFactory + .getLog(ZookeeperLeaseReclaimStore.class); + + private static final String REQUEST_QUEUE_SUBFOLDER = "Queue"; + private static final String HANDLED_REQUESTS_SUBFOLDER = "Handled"; + private static final String STATUS_SUBFOLDER = "Status"; + private static final String REQUEST_ZNODE_PREFIX = "Request"; + + private String zkWorkingPath; + private String requestQueuePath; + private String handledRequestsPath; + private String statusPath; + + private QueueWatcher queueWatcher = null; + private Thread cleanerThread = null; + /** timeout for cleaner thread */ + private static final long CLEANER_THREAD_TIMEOUT = TimeUnit.DAYS.toMillis(7); + /** check interval for cleaner thread */ + private static final long CLEANER_THREAD_MAX_INTERVAL = TimeUnit.DAYS.toMillis(1); + + // estimate the max size of one MachineLeaseInfoMap is less than 256B, + // so, one 1MB ZNode can record about 4K MachineLeaseInfoMap info; + // If the info are distributed to the buckets uniformly, the max records + // num is less than STATUS_ZK_BUCKETS_NUM * 4K. + // TODO in the future: add features to support: + // if the STATUS_ZK_BUCKETS_NUM should be updated, how to upgrade. + private static final int STATUS_ZK_BUCKETS_NUM = 53; + private ConcurrentMap machineLeaseInfoMapBuckets[] = + new ConcurrentMap[STATUS_ZK_BUCKETS_NUM]; + + ///////////////////// + // pure ZK part // + ///////////////////// + /** + * copy from zookeeperNodeLabelsStore.java, and change a little + */ + + private static final String ZK_DIR_SEPARATOR = "/"; + private static final int ZNODE_MATCH_ANY_VERSION = -1; + + private ZooKeeper zkClient; + private int numRetries; + private long zkResyncWaitTime; + private long zkRetryInterval; + private ZooKeeper activeZkClient; + private List zkAcl; + private List zkAuths; + private String zkHost; + private int zkSessionTimeout; + + + String getNodePath(String root, String nodeName) { + return (root + ZK_DIR_SEPARATOR + nodeName); + } + + private List getZKAcls(Configuration conf) throws Exception { + // Parse authentication from configuration. + String zkAclConf = + conf.get(YarnConfiguration.RM_ZK_ACL, + YarnConfiguration.DEFAULT_RM_ZK_ACL); + try { + zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf); + return ZKUtil.parseACLs(zkAclConf); + } catch (Exception e) { + LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL); + throw e; + } + } + + private List getZKAuths(Configuration conf) + throws Exception { + String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH); + try { + zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf); + if (zkAuthConf != null) { + return ZKUtil.parseAuth(zkAuthConf); + } else { + return Collections.emptyList(); + } + } catch (Exception e) { + LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH); + throw e; + } + } + + private synchronized void createConnection() throws IOException, + InterruptedException { + closeZkClients(); + for (int retries = 0; retries < numRetries && zkClient == null; retries++) { + try { + activeZkClient = getNewZooKeeper(); + zkClient = activeZkClient; + for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { + zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth()); + } + } catch (IOException ioe) { + // Retry in case of network failures + LOG.info("Failed to connect to the ZooKeeper on attempt - " + + (retries + 1)); + ioe.printStackTrace(); + } + } + if (zkClient == null) { + LOG.error("Unable to connect to Zookeeper"); + throw new YarnRuntimeException("Unable to connect to Zookeeper"); + } + notifyAll(); + LOG.info("Created new ZK connection " + zkClient); + } + + private final class ForwardingWatcher implements Watcher { + private ZooKeeper watchedZkClient; + + public ForwardingWatcher(ZooKeeper client) { + this.watchedZkClient = client; + } + + @Override + public void process(WatchedEvent event) { + try { + processWatchEvent(watchedZkClient, event); + } catch (Throwable t) { + LOG.error("Failed to process forwarding watcher event " + event + ": " + + StringUtils.stringifyException(t)); + } + } + } + + // use watch to handle the SyncConnected, Disconnected, Expired event. + private synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) + throws Exception { + // only process watcher event from current ZooKeeper Client session. + if (zk != activeZkClient) { + LOG.info("Ignore forwarding watcher event type: " + event.getType() + " with state:" + + event.getState() + " for path:" + event.getPath() + + " from old session"); + return; + } + + Watcher.Event.EventType eventType = event.getType(); + LOG.info("Forwarding Watcher event type: " + eventType + " with state:" + + event.getState() + " for path:" + event.getPath() + " for " + this); + + if (eventType == Watcher.Event.EventType.None) { + + // the connection state has changed + switch (event.getState()) { + case SyncConnected: + LOG.info("ZK Session connected"); + if (zkClient == null) { + // the SyncConnected must be from the client that sent Disconnected + zkClient = activeZkClient; + notifyAll(); + LOG.info("ZK Session restored"); + } + break; + case Disconnected: + LOG.info("ZK Session disconnected"); + zkClient = null; + break; + case Expired: + // the connection got terminated because of session timeout + // call listener to reconnect + LOG.info("ZK Session expired"); + createConnection(); + break; + default: + LOG.error("Unexpected Zookeeper" + " watch event state: " + + event.getState()); + break; + } + } + } + + protected synchronized ZooKeeper getNewZooKeeper() throws IOException, + InterruptedException { + ZooKeeper zk = new ZooKeeper(zkHost, zkSessionTimeout, null); + zk.register(new ForwardingWatcher(zk)); + return zk; + } + + private synchronized void closeZkClients() throws IOException { + zkClient = null; + if (activeZkClient != null) { + try { + LOG.info("close zkclient " + activeZkClient); + activeZkClient.close(); + } catch (InterruptedException e) { + throw new IOException("Interrupted while closing ZK", e); + } + activeZkClient = null; + } + } + + private Stat existsPath(final String path, final Watcher watch) throws Exception { + return new ZKAction() { + @Override + Stat run() throws KeeperException, InterruptedException { + return zkClient.exists(path, watch); + } + }.runWithRetries(); + } + + private String createPath(final String path, final byte[] data) throws Exception { + return new ZKAction() { + @Override + String run() throws KeeperException, InterruptedException { + return zkClient.create(path, data, zkAcl, CreateMode.PERSISTENT); + } + }.runWithRetries(); + } + + private String createSequentialPath(final String path, final byte[] data) throws Exception { + return new ZKAction() { + @Override + String run() throws KeeperException, InterruptedException { + return zkClient.create(path, data, zkAcl, CreateMode.PERSISTENT_SEQUENTIAL); + } + }.runWithRetries(); + } + + private String createSequentialPathWithParents(String path, byte[] data) throws Exception { + if (path.length() > 0 && path.lastIndexOf(ZK_DIR_SEPARATOR) > 0) { + String temp = path.substring(0, path.lastIndexOf(ZK_DIR_SEPARATOR)); + createPathWithParents(temp, null); + return createSequentialPath(path, data); + } else { + return ""; + } + } + + private void createPathWithParents(String path, byte[] data) throws Exception { + if (path.length() > 0 && null == existsPath(path, null)) { + String temp = path.substring(0, path.lastIndexOf(ZK_DIR_SEPARATOR)); + createPathWithParents(temp, null); + createPath(path, data); + } else { + return; + } + } + + private List getChildren(final String path, final Watcher watch) throws Exception { + return new ZKAction>() { + @Override + List run() throws KeeperException, InterruptedException { + return zkClient.getChildren(path, watch); + } + }.runWithRetries(); + } + + private byte[] getData(final String path, final Watcher watch) throws Exception { + return new ZKAction() { + @Override + public byte[] run() throws KeeperException, InterruptedException { + return zkClient.getData(path, watch, null); + } + }.runWithRetries(); + } + + private Stat setData(final String path, final byte[] data, final int version) + throws Exception { + return new ZKAction() { + @Override + public Stat run() throws KeeperException, InterruptedException { + return zkClient.setData(path, data, version); + } + }.runWithRetries(); + } + + private synchronized List doStoreMulti(final List opList) throws Exception { + return new ZKAction>() { + @Override + public List run() throws KeeperException, InterruptedException { + return zkClient.multi(opList); + } + }.runWithRetries(); + } + + private synchronized void delPath(final String path, final int version) + throws Exception { + new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + zkClient.delete(path, version); + return null; + } + }.runWithRetries(); + } + + private synchronized void delPathWithChildren(String path, int version) + throws Exception { + List children = this.getChildren(path, null); + for (String child : children) { + delPathWithChildren(getNodePath(path, child), version); + } + delPath(path, version); + } + + private abstract class ZKAction { + private boolean hasDeleteNodeOp = false; + void setHasDeleteNodeOp(boolean hasDeleteOp) { + this.hasDeleteNodeOp = hasDeleteOp; + } + // run() expects synchronization on ZookeeperLeaseReclaimStore.this + abstract T run() throws KeeperException, InterruptedException; + + T runWithCheck() throws Exception { + long startTime = System.currentTimeMillis(); + synchronized (ZookeeperLeaseReclaimStore.this) { + while (zkClient == null) { + ZookeeperLeaseReclaimStore.this.wait(zkResyncWaitTime); + if (zkClient != null) { + break; + } + if (System.currentTimeMillis() - startTime > zkResyncWaitTime) { + throw new IOException("Wait for ZKClient creation timed out"); + } + } + return run(); + } + } + + private boolean shouldRetry(KeeperException.Code code) { + switch (code) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + return true; + default: + break; + } + return false; + } + + private boolean shouldRetryWithNewConnection(KeeperException.Code code) { + switch (code) { + case SESSIONEXPIRED: + case SESSIONMOVED: + return true; + default: + break; + } + return false; + } + + T runWithRetries() throws Exception { + int retry = 0; + while (true) { + try { + return runWithCheck(); + } catch (KeeperException ke) { + if (ke.code() == KeeperException.Code.NODEEXISTS) { + LOG.info("znode already exists!"); + return null; + } + if (hasDeleteNodeOp && ke.code() == KeeperException.Code.NONODE) { + LOG.info("znode has already been deleted!"); + return null; + } + + LOG.info("Exception while executing a ZK operation.", ke); + retry++; + if (shouldRetry(ke.code()) && retry < numRetries) { + LOG.info("Retrying operation on ZK. Retry no. " + retry); + Thread.sleep(zkRetryInterval); + continue; + } + if (shouldRetryWithNewConnection(ke.code()) && retry < numRetries) { + LOG.info("Retrying operation on ZK with new Connection. " + + "Retry no. " + retry); + Thread.sleep(zkRetryInterval); + createConnection(); + syncInternal(ke.getPath()); + continue; + } + LOG.info("Maxed out ZK retries. Giving up!"); + throw ke; + } + } + } + + + class ZKSyncOperationCallback implements AsyncCallback.VoidCallback { + @Override + public void processResult(int rc, String path, Object ctx) { + if (rc == KeeperException.Code.OK.intValue()) { + LOG.info("ZooKeeper sync operation succeeded. path: " + path); + } else { + LOG.fatal("ZooKeeper sync operation failed. Waiting for session " + + "timeout. path: " + path); + } + } + } + + private void syncInternal(final String path) throws InterruptedException { + if (path == null) { + LOG.error("the sync path is null."); + return; + } + final ZKSyncOperationCallback cb = new ZKSyncOperationCallback(); + final String pathForSync = path; + try { + new ZKAction() { + @Override + Void run() throws KeeperException, InterruptedException { + zkClient.sync(pathForSync, cb, null); + return null; + } + }.runWithRetries(); + } catch (Exception e) { + LOG.fatal("sync failed."); + } + } + } + + + + + ///////////////////// + // logic part // + ///////////////////// + @Override + protected void init(Configuration conf) throws Exception { + super.init(conf); + + zkHost = conf.get(YarnConfiguration.RM_ZK_ADDRESS); + if (zkHost == null) { + throw new YarnRuntimeException("No server address specified for " + + "zookeeper state store for node labels recovery. " + + YarnConfiguration.RM_ZK_ADDRESS + " is not configured."); + } + + zkSessionTimeout = + conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, + YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS); + zkAcl = getZKAcls(conf); + zkAuths = getZKAuths(conf); + + numRetries = + conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES, + YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES); + + zkRetryInterval = + conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS); + + zkResyncWaitTime = zkRetryInterval * numRetries; + + createConnection(); + + /** set paths */ + zkWorkingPath = conf.get(YarnConfiguration.ZK_LEASE_STORE_PATH, + YarnConfiguration.DEFAULT_ZK_LEASE_STORE_PATH); + requestQueuePath = zkWorkingPath + ZK_DIR_SEPARATOR + REQUEST_QUEUE_SUBFOLDER; + handledRequestsPath = zkWorkingPath + ZK_DIR_SEPARATOR + HANDLED_REQUESTS_SUBFOLDER; + statusPath = zkWorkingPath + ZK_DIR_SEPARATOR + STATUS_SUBFOLDER; + } + + /** init paths */ + private void initPaths() throws Exception { + /** create lease path if not exist */ + if (null == existsPath(zkWorkingPath, null)) { + createPathWithParents(zkWorkingPath, null); + } + + /** create queue path if not exist */ + if (null == existsPath(requestQueuePath, null)) { + createPathWithParents(requestQueuePath, null); + } + + /** create handled requests path if not exist */ + if (null == existsPath(handledRequestsPath, null)) { + createPathWithParents(handledRequestsPath, null); + } + /** create status path if not exist */ + if (null == existsPath(statusPath, null)) { + createPathWithParents(statusPath, null); + } + } + + @Override + public void close() throws IOException { + closeZkClients(); + cleanerThread.interrupt(); + try { + cleanerThread.join(); + } catch (InterruptedException e) { + } + } + + private boolean isValidBucketId(String bucketId) + { + try { + int id = Integer.parseInt(bucketId); + return (id >= 0 && id < STATUS_ZK_BUCKETS_NUM); + } catch (NumberFormatException e) { + LOG.error("fail to get bucket Id from Znode name"); + } + return false; + } + + private int getMachineLeaseInfoMapBucketSize() { + return STATUS_ZK_BUCKETS_NUM; + } + + private int nodeToBucketId(String node) { + return Math.abs(node.hashCode() % STATUS_ZK_BUCKETS_NUM); + } + + protected void recover() throws Exception { + initPaths(); + List statusBuckets = getChildren(statusPath, null); + for (String bucketId : statusBuckets) { + Map info = getMachineLeaseInfoMapBucketFromStore(bucketId); + if (info != null) { + initMachineLeaseInfoMapBucket(Integer.parseInt(bucketId), info); + LOG.info("Recovering: Get " + info.size() + " lease node info for bucket " + bucketId); + } + } + /* init empty buckets too */ + for (int bucketId = 0; bucketId < getMachineLeaseInfoMapBucketSize(); bucketId++) { + if (null == getMachineLeaseInfoMapBucket(bucketId)) { + initMachineLeaseInfoMapBucket(bucketId); + } + } + } + + private void updateMachineLeaseInfoMapBucketToStore( + Map machineLeaseInfoMap, int bucketId) throws Exception { + String id = Integer.toString(bucketId); + String bucketPath = getNodePath(statusPath, id); + MachineLeaseInfoMap info = MachineLeaseInfoMap.newInstance(machineLeaseInfoMap); + byte[] data = + ((MachineLeaseInfoMapPBImpl) info).getProto().toByteArray(); + + if (null == existsPath(bucketPath, null)) { + createPath(bucketPath, data); + } else { + setData(bucketPath, data, ZNODE_MATCH_ANY_VERSION); + } + } + + private void updateMachineLeaseInfoMapBucket( + String node, LeaseNode status, int bucketId) throws Exception { + Map info = getMachineLeaseInfoMapBucket(bucketId); + if (info == null) { + throw new Exception("Get null MachineLeaseInfoMapBucket[" + bucketId + "]. Init Error!"); + } + info.put(node, status); + //TODO in the future: record the node status explicitly here, don't remove it + /* + if (status.isLeased()) { + info.put(node, status); + } else { + info.remove(node); + } + */ + } + + private void updateMachineLeaseInfoMapToAll( + Set nodes, LeaseNode status) throws Exception { + HashSet updatedBuckets = new HashSet(); + // update machineLeaseInfoMapBuckets[] first + for (String node : nodes) { + int bucketId = nodeToBucketId(node); + updatedBuckets.add(bucketId); + updateMachineLeaseInfoMapBucket(node, status, bucketId); + } + // update machineLeaseInfoMap to store + for (int id : updatedBuckets) { + updateMachineLeaseInfoMapBucketToStore(getMachineLeaseInfoMapBucket(id), id); + } + } + + private Map getMachineLeaseInfoMapBucketFromStore(String bucketId) throws Exception { + if (!isValidBucketId(bucketId)) return null; + + String bucketPath = getNodePath(statusPath, bucketId); + byte[] bucketData = getData(bucketPath, null); + MachineLeaseInfoMap info = bucketData != null ? + new MachineLeaseInfoMapPBImpl(MachineLeaseInfoMapProto.parseFrom(bucketData)) + : null; + if (info != null) { + return info.getNodeToStatus(); + } + return null; + } + + private Map getMachineLeaseInfoMapBucket(int bucketId) { + return machineLeaseInfoMapBuckets[bucketId]; + } + + private void initMachineLeaseInfoMapBucket(int bucketId) { + machineLeaseInfoMapBuckets[bucketId] = new ConcurrentHashMap(); + } + + private void initMachineLeaseInfoMapBucket(int bucketId, Map info) { + machineLeaseInfoMapBuckets[bucketId] = new ConcurrentHashMap(info); + } + + private final class QueueWatcher implements Watcher { + Object locker = new Object(); + + @Override + public void process(WatchedEvent event) { + try { + Watcher.Event.EventType eventType = event.getType(); + Watcher.Event.KeeperState eventState = event.getState(); + LOG.info("Queue Watcher event type: " + eventType + " with state:" + + eventState + " for path:" + event.getPath() + " for " + this); + if (eventType == Watcher.Event.EventType.NodeChildrenChanged + || eventState == Watcher.Event.KeeperState.SyncConnected) { + handleRequests(); + } + + } catch (Throwable t) { + LOG.error("Failed to process queue watcher event " + event + ": " + + StringUtils.stringifyException(t)); + } + } + + public void start() throws Exception { + handleRequests(); + } + + private void handleRequests() throws Exception { + synchronized (locker) { + // get children, and watch the path again + List children = getChildren(requestQueuePath, this); + //handling new requests + for (String child : children) { + // handle request and persist to status folder + handleRequest(child); + // after persist to status folder, move the request to old requests path + moveHandledRequest(child); + } + } + } + + private void handleRequest(String requestZNode) { + try { + String requestPath = getNodePath(requestQueuePath, requestZNode); + byte[] requestData = getData(requestPath, null); + + LeaseRequest request = requestData != null ? + new LeaseRequestPBImpl(YarnLeaseReclaimProtos.LeaseRequestProto.parseFrom(requestData)) + : null; + if (request == null) { + LOG.warn("Get null request data from " + requestPath); + return; + } + LOG.info("Get lease request: " + request.toString()); + // handle UpdateResourceRequest + handleUpdateResourceRequest(request.getUpdateResource()); + // TODO in the future: + // for the coding of other types of request, will work on it by requirement + } catch (Exception e) { + LOG.error("handle Request " + requestZNode + " failed. "); + e.printStackTrace(); + } + } + + private void handleUpdateResourceRequest(UpdateResourceRequest request) throws Exception { + Set nodes = new HashSet(); + LeaseNode status = beforeHandleUpdateResourceRequest(request, nodes); + if (status == null) { + return; + } + updateMachineLeaseInfoMapToAll(nodes, status); + + afterHandleUpdateResourceRequest(nodes, status); + } + + private void moveHandledRequest(String requestZNode) { + try { + String oldPath = getNodePath(requestQueuePath, requestZNode); + byte[] requestData = getData(oldPath, null); + + String newPath = getNodePath(handledRequestsPath, requestZNode); + + // corner case: the request index is increased back to 0 before they are cleaned + if (null != existsPath(newPath, null)) { + newPath += "_ts"; + newPath += System.currentTimeMillis(); + LOG.warn("The handledRequest path has been exist, change new path to " + newPath + "!"); + } + + List execOpList = new ArrayList(); + Op createOp = null; + createOp = Op.create(newPath, requestData, zkAcl, CreateMode.PERSISTENT); + execOpList.add(createOp); + Op deleteOp = null; + deleteOp = Op.delete(oldPath, ZNODE_MATCH_ANY_VERSION); + execOpList.add(deleteOp); + doStoreMulti(execOpList); + } catch (Exception e) { + LOG.error("move Request " + requestZNode + " failed. "); + e.printStackTrace(); + } + } + } + + private final class CleanerThread extends Thread { + long timeoutMS; + long intervalMS; + + public CleanerThread(long timeoutMSValue) { + timeoutMS = timeoutMSValue; + intervalMS = timeoutMSValue/2; + //one day interval + if (intervalMS > CLEANER_THREAD_MAX_INTERVAL) { + intervalMS = CLEANER_THREAD_MAX_INTERVAL; + } + } + + @Override + public void run() { + while (true) { + try { + cleanOldLeaseRequestInStore(timeoutMS); + } catch (Exception e) { + LOG.error("clean old requests failed!"); + e.printStackTrace(); + } + try { + Thread.sleep(intervalMS); + } catch (InterruptedException e) { + LOG.warn(this.getClass().getName() + + " is interrupted. Exiting."); + e.printStackTrace(); + break; + } + } + + } + + private void cleanOldLeaseRequestInStore(long timeoutMs) throws Exception { + List children = getChildren(handledRequestsPath, null); + for (String child : children) { + try { + String requestPath = getNodePath(handledRequestsPath, child); + byte[] requestData = getData(requestPath, null); + LeaseRequest request = requestData != null ? + new LeaseRequestPBImpl(YarnLeaseReclaimProtos.LeaseRequestProto.parseFrom(requestData)) + : null; + if (request != null + && request.getClientDebugInfo() != null + && request.getClientDebugInfo().getTimestamp() > System.currentTimeMillis() - timeoutMs) { + // request not timeout, keep it + continue; + } + delPath(requestPath, ZNODE_MATCH_ANY_VERSION); + } catch (Exception e) { + LOG.error("clean old request " + child + " failed!"); + e.printStackTrace(); + } + } + } + } + + + protected void startHandlingRequests() throws Exception { + queueWatcher = new QueueWatcher(); + queueWatcher.start(); + cleanerThread = new CleanerThread(CLEANER_THREAD_TIMEOUT); + cleanerThread.start(); + } + + protected LeaseNode getMachineLeaseInfo(String node) { + int bucketId = nodeToBucketId(node); + Map info = getMachineLeaseInfoMapBucket(bucketId); + if (info != null && info.containsKey(node)) { + return info.get(node); + } + return null; + } + + protected Map getMachineLeaseInfoMap() { + Map machineLeaseInfoMap = new HashMap(); + for (int bucketId = 0; bucketId < getMachineLeaseInfoMapBucketSize(); bucketId++) { + Map info = getMachineLeaseInfoMapBucket(bucketId); + if (info != null) { + machineLeaseInfoMap.putAll(info); + } + } + return machineLeaseInfoMap; + } + + protected Map getMachineLeaseInfoMapFromStore() throws Exception { + Map machineLeaseInfoMap = new HashMap(); + List statusBuckets = getChildren(statusPath, null); + for (String bucketId : statusBuckets) { + Map info = getMachineLeaseInfoMapBucketFromStore(bucketId); + if (info != null) { + machineLeaseInfoMap.putAll(info); + } + } + return machineLeaseInfoMap; + } + + protected void putNewLeaseRequestToStore(LeaseRequest request) throws Exception { + byte[] data = + ((LeaseRequestPBImpl)request).getProto().toByteArray(); + String newPath = createSequentialPath(requestQueuePath+ZK_DIR_SEPARATOR+REQUEST_ZNODE_PREFIX, data); + if (newPath == null || newPath.isEmpty()) { + LOG.error("Create New Request Path failed, the new path is empty."); + throw new IOException("Create New Request Path failed, the new path is empty."); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/normalize/LowerCaseNormalizeHostname.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/normalize/LowerCaseNormalizeHostname.java new file mode 100644 index 0000000..1a342f5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/normalize/LowerCaseNormalizeHostname.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util.normalize; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * normalize the hostname to lower case + */ +public class LowerCaseNormalizeHostname extends NormalizeHostname { + protected static final Log LOG = LogFactory + .getLog(LowerCaseNormalizeHostname.class); + + public String normalizeHostname(String hostname) { + String host = hostname.toLowerCase(); + LOG.info("Normalize hostname from " + hostname + " to " + host); + return host.trim(); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/normalize/NormalizeHostname.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/normalize/NormalizeHostname.java new file mode 100644 index 0000000..590496e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/normalize/NormalizeHostname.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util.normalize; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * normalize hostnames to the unified format + */ +public abstract class NormalizeHostname { + private static final Log LOG = LogFactory.getLog(NormalizeHostname.class); + protected Configuration conf; + + public static NormalizeHostname newInstance(Configuration conf) throws Exception { + Class normalizeClass = + conf.getClass(YarnConfiguration.NORMALIZE_HOSTNAME_CLASS, + LowerCaseNormalizeHostname.class, NormalizeHostname.class); + NormalizeHostname normalizer = + ReflectionUtils.newInstance(normalizeClass, conf); + return normalizer; + } + + public abstract String normalizeHostname(String hostname); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index ed220c0..b5f0e45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -2586,6 +2586,27 @@ The arguments to pass to the Node label script. yarn.nodemanager.node-labels.provider.script.opts + + + + The normalize class + yarn.normalize.hostname.class + org.apache.hadoop.yarn.util.normalize.LowerCaseNormalizeHostname + + + + + The class used for lease reclaim store + yarn.lease.store.class + + + + The root path for lease reclaim store on zookeeper + yarn.lease.store.zk.root + /Lease + + + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/leasereclaim/TestZookeeperLeaseReclaimStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/leasereclaim/TestZookeeperLeaseReclaimStore.java new file mode 100644 index 0000000..d941d68 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/leasereclaim/TestZookeeperLeaseReclaimStore.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.leasereclaim; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ClientBaseWithFixes; +import org.apache.hadoop.yarn.api.records.LeaseNode; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.zookeeper.TestableZooKeeper; +import org.apache.zookeeper.ZooKeeper; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * test cases for lease reclaim store basic functions + */ +public class TestZookeeperLeaseReclaimStore { + + static class TestZKClient extends ClientBaseWithFixes { + public TestableZooKeeper createClient() throws IOException, InterruptedException { + return super.createClient(); + } + } + + + /** + * use internal zookeeper for Test use + */ + public class InternalZookeeperLeaseReclaimStore extends ZookeeperLeaseReclaimStore { + @Override + public ZooKeeper getNewZooKeeper() throws IOException, InterruptedException { + return testZkclient.createClient(); + } + } + + static final TestZKClient testZkclient = new TestZKClient(); + Configuration conf = null; + LeaseReclaimStore store = null; + String landlord = "testor"; + + @Before + public void before() throws Exception { + testZkclient.setUp(); + conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, "127.0.0.1:2181"); + conf.set(YarnConfiguration.RM_ZK_NUM_RETRIES, "10"); + //conf.set(YarnConfiguration.LEASE_STORE_CLASS, "org.apache.hadoop.yarn.leasereclaim.TestZookeeperLeaseReclaimStore$InternalZookeeperLeaseReclaimStore"); + //store = LeaseReclaimStore.newInstance(conf); + store = new InternalZookeeperLeaseReclaimStore(); + store.init(conf); + store.startManager(); + } + + @After + public void after() throws Exception { + testZkclient.tearDown(); + store.close(); + } + + private Resource getResource(int memory, int vCores) { + if (memory < 0 || vCores < 0) return null; + return Resource.newInstance(memory, vCores); + } + + private long getEndTime(long seconds) { + if (seconds < 0) return LeaseNode.RESOURCE_NEVER_TIMEOUT; + return System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(seconds); + } + + private void verifyLeaseStatus(Map statusMap, Resource resource, long endTime, Set nodes) { + Assert.assertEquals(statusMap.size(), nodes.size()); + for (String node: nodes) { + Assert.assertTrue(statusMap.containsKey(node)); + Assert.assertTrue(statusMap.get(node).getResource().equals(resource)); + Assert.assertEquals(statusMap.get(node).getEndTime(), endTime); + } + } + + private void verifyLeaseTimeout(Set nodes) { + for (String node: nodes) { + LeaseNode status = store.getHostLeaseStatus(node); + Assert.assertTrue(status.isTimeout()); + Assert.assertTrue(status.getValidResource().equals(LeaseNode.NONE_LEASED)); + } + } + + private void verifyReclaimed(Set nodes) { + for (String node: nodes) { + LeaseNode status = store.getHostLeaseStatus(node); + Assert.assertTrue(status.isReclaimed()); + Assert.assertTrue(status.getValidResource().equals(LeaseNode.NONE_LEASED)); + } + } + + + @Test(timeout = 120000) + public void testLease() throws Exception { + //lease + long leaseSeconds = 10; + Resource resource = getResource(1024, 5); + long endTime = getEndTime(leaseSeconds); + Set nodes = new HashSet(); + nodes.add("node1"); + nodes.add("node2"); + store.putNewLeaseRequestToStore(landlord, resource, endTime, nodes); + + // read from store and verify it + Map nodeToStatusMap = null; + while (nodeToStatusMap == null || nodeToStatusMap.size() < nodes.size()) { + nodeToStatusMap = store.getMachineLeaseInfoMapFromStore(null); + Thread.sleep(1000); + } + verifyLeaseStatus(nodeToStatusMap, resource, endTime, nodes); + + // wait the lease timeout, verify it + Thread.sleep(TimeUnit.SECONDS.toMillis(leaseSeconds)); + + verifyLeaseTimeout(nodes); + } + + @Test(timeout = 120000) + public void testReclaim() throws Exception { + //lease + long leaseSeconds = -1; + Resource resource = getResource(1024, 5); + long endTime = getEndTime(leaseSeconds); + Set nodes = new HashSet(); + nodes.add("node1"); + nodes.add("node2"); + store.putNewLeaseRequestToStore(landlord, resource, endTime, nodes); + + // read from store and verify it + Map nodeToStatusMap = null; + while (nodeToStatusMap == null || nodeToStatusMap.size() < nodes.size()) { + nodeToStatusMap = store.getMachineLeaseInfoMapFromStore(null); + Thread.sleep(1000); + } + verifyLeaseStatus(nodeToStatusMap, resource, endTime, nodes); + + // reclaim the nodes, verify it + resource = getResource(0, 0); + store.putNewLeaseRequestToStore(landlord, resource, endTime, nodes); + + // read from store and verify it + nodeToStatusMap = null; + while (nodeToStatusMap == null || nodeToStatusMap.size() < nodes.size()) { + nodeToStatusMap = store.getMachineLeaseInfoMapFromStore(null); + Thread.sleep(1000); + } + verifyReclaimed(nodes); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java index caa0ff13..f9676b6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.leasereclaim.LeaseManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -97,6 +98,7 @@ private RMNodeLabelsManager nodeLabelManager; private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater; + private LeaseManager leaseManager; private long epoch; private Clock systemClock = SystemClock.getInstance(); private long schedulerRecoveryStartTime = 0; @@ -421,6 +423,18 @@ public void setRMDelegatedNodeLabelsUpdater( @Private @Unstable + public LeaseManager getLeaseManager() { + return leaseManager; + } + + @Private + @Unstable + public void setLeaseManager(LeaseManager mgr) { + leaseManager = mgr; + } + + @Private + @Unstable public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { this.schedulerRecoveryStartTime = systemClock.getTime(); this.schedulerRecoveryWaitTime = waitTime; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 2ba445c..5fdc35b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.leasereclaim.LeaseManager; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; @@ -132,6 +133,10 @@ void setRMTimelineCollectorManager( void setRMDelegatedNodeLabelsUpdater( RMDelegatedNodeLabelsUpdater nodeLabelsUpdater); + LeaseManager getLeaseManager(); + + public void setLeaseManager(LeaseManager mgr); + long getEpoch(); ReservationSystem getReservationSystem(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 1e702de..a1c81e4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.leasereclaim.LeaseManager; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; @@ -441,6 +442,14 @@ public void setRMDelegatedNodeLabelsUpdater( delegatedNodeLabelsUpdater); } + public LeaseManager getLeaseManager() { + return activeServiceContext.getLeaseManager(); + } + + public void setLeaseManager(LeaseManager mgr) { + activeServiceContext.setLeaseManager(mgr); + } + public void setSchedulerRecoveryStartAndWaitTime(long waitTime) { activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 0c1df33..87db20a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.leasereclaim.LeaseManager; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher; @@ -692,6 +693,11 @@ protected void serviceInit(Configuration configuration) throws Exception { new RMNMInfo(rmContext, scheduler); + // leaseManager will work if there's lease/reclaim configuration + LeaseManager leaseManager = new LeaseManager(rmContext); + addService(leaseManager); + rmContext.setLeaseManager(leaseManager); + super.serviceInit(conf); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 9d480f3..f7449fd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -364,17 +364,22 @@ public RegisterNodeManagerResponse registerNodeManager( response.setResource(capability); } - // Check if this node has minimum allocations - if (capability.getMemorySize() < minAllocMb - || capability.getVirtualCores() < minAllocVcores) { - String message = - "NodeManager from " + host - + " doesn't satisfy minimum allocations, Sending SHUTDOWN" - + " signal to the NodeManager."; - LOG.info(message); - response.setDiagnosticsMessage(message); - response.setNodeAction(NodeAction.SHUTDOWN); - return response; + if (rmContext.getLeaseManager() != null && rmContext.getLeaseManager().isLeaseReclaimEnabled()) { + // if lease/reclaim is enabled, NM can be configured with 0 resource + capability = rmContext.getLeaseManager().updateNodeCapabilityByLeasedResource(host, capability); + } else { + // Check if this node has minimum allocations + if (capability.getMemorySize() < minAllocMb + || capability.getVirtualCores() < minAllocVcores) { + String message = + "NodeManager from " + host + + " doesn't satisfy minimum allocations, Sending SHUTDOWN" + + " signal to the NodeManager."; + LOG.info(message); + response.setDiagnosticsMessage(message); + response.setNodeAction(NodeAction.SHUTDOWN); + return response; + } } response.setContainerTokenMasterKey(containerTokenSecretManager @@ -549,6 +554,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeHeartBeatResponse); } + // check if lease endtime is reached || resource needs to be updated + if (rmContext.getLeaseManager() != null && rmContext.getLeaseManager().isLeaseReclaimEnabled()) { + rmContext.getLeaseManager().updateNodeResource(nodeId); + } + // 4. Send status to RMNode, saving the latest response. RMNodeStatusEvent nodeStatusEvent = new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/leasereclaim/LeaseManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/leasereclaim/LeaseManager.java new file mode 100644 index 0000000..9b96b50 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/leasereclaim/LeaseManager.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.leasereclaim; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.LeaseNode; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.leasereclaim.LeaseReclaimStore; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * LeaseManager: + * 1) use leasereclaimstore to: handle landlords' lease/reclaim requests, save lease nodes status in memory, + * and persist lease nodes status in store, etc. + * 2) provide APIs for RM use: get NM leased-resource/endtime, update total capability if needed, etc.; + */ +public class LeaseManager extends AbstractService { + private static final Log LOG = LogFactory.getLog(LeaseManager.class); + + private Configuration conf; + private LeaseReclaimStore leaseReclaimStore = null; + private boolean leaseReclaimEnabled = false; + //TODO: remove disconnected host if needed + private ConcurrentMap hostRegisteredCapacityMap = new ConcurrentHashMap(); + + + private static final int DEFAULT_OVERCOMMITTIMEOUT = 0; + + private RMContextImpl rmContext; + + private Resource getRegisteredCapacity(String host) { + Resource r = hostRegisteredCapacityMap.get(host); + if (r == null) { + LOG.warn("Registered capacity get failed on an unrecognized node: " + host); + } + return r; + } + + private void setRegisteredCapacity(String host, Resource resource) { + hostRegisteredCapacityMap.put(host, resource); + } + + public LeaseManager(RMContextImpl rmContextInfo) throws Exception { + super(LeaseManager.class.getName()); + rmContext = rmContextInfo; + + } + + @Override + protected void serviceInit(Configuration yarnConf) throws Exception { + LOG.info("Lease Manager init"); + conf = yarnConf; + leaseReclaimStore = null; + leaseReclaimEnabled = false; + } + + @Override + protected void serviceStart() throws Exception { + LOG.info("Lease Manager start"); + leaseReclaimStore = LeaseReclaimStore.newInstance(conf); + if (leaseReclaimStore == null) { + return; + } + leaseReclaimEnabled = true; + leaseReclaimStore.startManager(); + } + + @Override + public void serviceStop() throws Exception { + LOG.info("Lease Manager stop"); + if (leaseReclaimEnabled) { + leaseReclaimStore.close(); + leaseReclaimStore = null; + leaseReclaimEnabled = false; + } + } + + public boolean isLeaseReclaimEnabled() { + return leaseReclaimEnabled; + } + + private LeaseNode getHostLeaseStatus(String host) { + return leaseReclaimStore.getHostLeaseStatus(host); + } + + // It's called when NM is registered or configured capability is updated + public Resource updateNodeCapabilityByLeasedResource(String host, Resource capability) { + /** record the configured capability for lease operation use */ + setRegisteredCapacity(host, capability); + /** return the LeasedResource if the node is leased */ + LeaseNode status = getHostLeaseStatus(host); + if (status != null) { + Resource resource = status.getValidResource(); + // if no specified resource in Lease Status, use the registered capacity + if (resource != null) { + LOG.info("Update node " + host + " capability from " + capability + " to " + resource); + return resource; + } + } + // TODO in the future: if no lease status recorded for the NM, use its registered capability. + return capability; + } + + // update RMNode resource when handling lease request, should be called in LeaseReclaimStore.startHandlingRequests + // in handleUpdateResourceRequest() functions + // Note: + // Considering the NM HB is frequently and the endtime checking is needed, use updateNodeResource + // in NM HB handler to update NM resource, instead of call this API in LeaseReclaimStore; + // So, this API is not used currently; + public boolean updateNodeResource(Set nodes, LeaseNode status) { + Resource resource = status.isLeased() ? status.getResource() : status.NONE_LEASED; + boolean allSuccess = true; + + for (NodeId nodeId : rmContext.getRMNodes().keySet()) { + String nodeName = leaseReclaimStore.normalizeHostname(nodeId.getHost()); + if (nodes.contains(nodeName)) { + RMNode node = rmContext.getRMNodes().get(nodeId); + if (resource == null) { + resource = getRegisteredCapacity(nodeId.getHost()); + } + if (node == null || resource == null) { + LOG.warn("Resource update get failed on an unrecognized node: " + nodeId); + allSuccess = false; + } else { + ResourceOption newResourceOption = ResourceOption.newInstance(resource, DEFAULT_OVERCOMMITTIMEOUT); + // update resource to RMNode + rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption)); + LOG.info("Update resource on node(" + node.getNodeID() + + ") with resource(" + newResourceOption.toString() + ")"); + } + } + } + if (!allSuccess) { + LOG.warn("Some nodes resource update failed!"); + } + return allSuccess; + } + + // check if RMNode resource needs updated: if needed, update it; + // Note: + // The original purpose for this function is + // in NM HB handler, check whether lease node endtime is reached, if reached, update the resource; + // Now, considering the NM HB is frequently and the endtime checking is needed, use it to update NM resource too, + // so no need to call updateNodeResource in leaseReclaimStore while handling lease requests. + public void updateNodeResource(NodeId nodeId) { + LeaseNode status = getHostLeaseStatus(nodeId.getHost()); + if (status == null) { + // no lease/reclaim status recorded, don't update resource + //TODO in the future: may need to change the behavior if add leaseUID in the future + return; + } + Resource resource = status.isLeased() ? status.getResource() : status.NONE_LEASED; + if (resource == null) { + resource = getRegisteredCapacity(nodeId.getHost()); + } + RMNode node = rmContext.getRMNodes().get(nodeId); + + if (node == null || resource == null) { + LOG.warn("Resource update get failed on an unrecognized node: " + nodeId); + } else { + if (!resource.equals(node.getTotalCapability())) { + // update resource for RMNode + ResourceOption newResourceOption = ResourceOption.newInstance(resource, DEFAULT_OVERCOMMITTIMEOUT); + rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption)); + LOG.info("Update resource on node(" + node.getNodeID() + " " + node.getTotalCapability() + + ") to resource(" + newResourceOption.toString() + ")"); + } + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index ee62a70..2531c34 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1122,7 +1122,7 @@ private synchronized void nodeUpdate(RMNode nm) { /** * Process resource update on a node. */ - private synchronized void updateNodeAndQueueResource(RMNode nm, + protected synchronized void updateNodeAndQueueResource(RMNode nm, ResourceOption resourceOption) { updateNodeResource(nm, resourceOption); Resource clusterResource = getClusterResource(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PreemptCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PreemptCapacityScheduler.java new file mode 100644 index 0000000..24a3b09 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PreemptCapacityScheduler.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.util.resource.Resources; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * it will preempt containers when resource is over used + */ +@LimitedPrivate("yarn") +@Evolving +@SuppressWarnings("unchecked") +public class PreemptCapacityScheduler extends CapacityScheduler { + + private static final Log LOG = LogFactory.getLog(PreemptCapacityScheduler.class); + + + /** for some corner cases, the preempt action is needed when adding a new node: + * e.g., here's a case: + * at first, RM running with a non preemptable scheduler, and NM X has n containers, but it has been reclaimed; + * then, When restart RM with preemptable scheduler, the NM containers should be preempted when adding node; + */ + @Override + public synchronized void recoverContainersOnNode( + List containerReports, RMNode nm) { + super.recoverContainersOnNode(containerReports, nm); + preemptOverrunContainersWhenNeeded(nm); + } + + @Override + /** + * Process resource update on a node. + */ + protected synchronized void updateNodeAndQueueResource(RMNode nm, + ResourceOption resourceOption) { + super.updateNodeResource(nm, resourceOption); + preemptOverrunContainersWhenNeeded(nm); + } + + public void preemptOverrunContainersWhenNeeded(RMNode nm) { + // Preempt running containers if needed + FiCaSchedulerNode node = this.getNode(nm.getNodeID()); + LOG.debug("--- availableResource on node " + nm.getNodeID().toString() + + ": " + node.getUnallocatedResource().toString()); + + if (node.getUnallocatedResource().compareTo(Resources.none()) < 0) { + preemptOverrunContainers(node); + } + } + + //TODO: Temp preempt code. + // Later, will use policy configuration instead of these code here + /** + * Preempt enough running container on node so that there is NOT any negative + * available resources. + * + * @param node + */ + private void preemptOverrunContainers(FiCaSchedulerNode node) { + LOG.debug("--- preemptOverrunContainers on node " + + node.getNodeID().toString()); + + // select containers to be preempted + Map> toPreempt = getContainersToPreempt(node); + // kill the selected containers + for (Map.Entry> e : toPreempt + .entrySet()) { + for (RMContainer container : e.getValue()) { + markContainerForKillable(container); + LOG.debug("--- preempted : " + container.toString()); + } + } + } + + /** + * Select one or more containers to kill in order to return over-used resources + * TODO Refine the container selection logic + * @param node + * @return + */ + private Map> getContainersToPreempt( + FiCaSchedulerNode node) { + Resource resToObtain = Resource.newInstance(node.getUnallocatedResource().getMemorySize() + , node.getUnallocatedResource().getVirtualCores()); + LOG.debug("--- resToObtain : " + resToObtain.toString()); + + Map> ret = + new HashMap>(); + if (resToObtain.getMemorySize() >= 0 && resToObtain.getVirtualCores() >= 0) { + LOG.debug("--- no need to preempt any container for having enough resources"); + return ret; + } + + // drop container reservation if any + if (node.getReservedContainer() != null) { + killReservedContainer(node.getReservedContainer()); + Resources.addTo(resToObtain, node.getReservedContainer() + .getContainer().getResource()); + LOG.debug("--- dropped a container reservation"); + } + + List containers = node.getCopiedListOfRunningContainers(); + while ((resToObtain.getMemorySize() < 0 || resToObtain.getVirtualCores() < 0) + && !containers.isEmpty()) { + RMContainer c = containers.remove(containers.size() - 1); + if (ret.containsKey(c.getApplicationAttemptId())) { + ret.get(c.getApplicationAttemptId()).add(c); + } else { + Set conts = new HashSet(); + conts.add(c); + ret.put(c.getApplicationAttemptId(), conts); + } + Resources.addTo(resToObtain, c.getContainer().getResource()); + } + + if (resToObtain.getMemorySize() < 0 || resToObtain.getVirtualCores() < 0) { + LOG.error("--- cannot get back enough resources even ALL containers are killed; REALLY STRANGE!!!"); + } else { + LOG.debug("--- resToObtain is changed to : " + resToObtain.toString()); + } + + return ret; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java index 5dfee89..4ebfcde 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java @@ -1225,6 +1225,7 @@ public void testCancelWithMultipleAppSubmissions() throws Exception{ Assert.assertFalse(Renewer.cancelled); MockRM.finishAMAndVerifyAppState(app2, rm, nm1, am2); + Thread.sleep(10); // app2 completes, app1 is still running, check the token is not cancelled Assert.assertTrue(renewer.getAllTokens().containsKey(token1)); Assert.assertTrue(dttr.referringAppIds.contains(app1.getApplicationId())); @@ -1243,6 +1244,7 @@ public void testCancelWithMultipleAppSubmissions() throws Exception{ Assert.assertFalse(Renewer.cancelled); MockRM.finishAMAndVerifyAppState(app1, rm, nm1, am1); + Thread.sleep(10); Assert.assertTrue(renewer.getAllTokens().containsKey(token1)); Assert.assertFalse(dttr.referringAppIds.contains(app1.getApplicationId())); Assert.assertTrue(dttr.referringAppIds.contains(app3.getApplicationId())); @@ -1250,6 +1252,7 @@ public void testCancelWithMultipleAppSubmissions() throws Exception{ Assert.assertFalse(Renewer.cancelled); MockRM.finishAMAndVerifyAppState(app3, rm, nm1, am3); + Thread.sleep(10); Assert.assertFalse(renewer.getAllTokens().containsKey(token1)); Assert.assertTrue(dttr.referringAppIds.isEmpty()); Assert.assertTrue(dttr.isTimerCancelled());