diff --git hadoop-project/pom.xml hadoop-project/pom.xml index 01bf7be..8125908 100644 --- hadoop-project/pom.xml +++ hadoop-project/pom.xml @@ -92,6 +92,7 @@ 1.0.0 3.0.3 + 2.3.13 1.8 @@ -1170,6 +1171,11 @@ ehcache ${ehcache.version} + + com.zaxxer + HikariCP-java6 + ${hikari.version} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 1faf754..d81425b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -117,6 +117,10 @@ org.ehcache ehcache + + com.zaxxer + HikariCP-java6 + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java index 6e564dc..ea8cb50 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java @@ -60,8 +60,12 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreErrorCode; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.util.MonotonicClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * In-memory implementation of {@link FederationStateStore}. @@ -74,6 +78,9 @@ private final MonotonicClock clock = new MonotonicClock(); + public static final Logger LOG = + LoggerFactory.getLogger(MemoryFederationStateStore.class); + @Override public void init(Configuration conf) { membership = new ConcurrentHashMap(); @@ -94,7 +101,17 @@ public SubClusterRegisterResponse registerSubCluster( FederationMembershipStateStoreInputValidator .validateSubClusterRegisterRequest(request); SubClusterInfo subClusterInfo = request.getSubClusterInfo(); - membership.put(subClusterInfo.getSubClusterId(), subClusterInfo); + + SubClusterInfo subClusterInfoToSave = + SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(), + subClusterInfo.getAMRMServiceAddress(), + subClusterInfo.getClientRMServiceAddress(), + subClusterInfo.getRMAdminServiceAddress(), + subClusterInfo.getRMWebServiceAddress(), clock.getTime(), + subClusterInfo.getState(), subClusterInfo.getLastStartTime(), + subClusterInfo.getCapability()); + + membership.put(subClusterInfo.getSubClusterId(), subClusterInfoToSave); return SubClusterRegisterResponse.newInstance(); } @@ -105,8 +122,10 @@ public SubClusterDeregisterResponse deregisterSubCluster( .validateSubClusterDeregisterRequest(request); SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId()); if (subClusterInfo == null) { - throw new YarnException( - "SubCluster " + request.getSubClusterId().toString() + " not found"); + String errMsg = + "SubCluster " + request.getSubClusterId().toString() + " not found"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_FAIL, errMsg); } else { subClusterInfo.setState(request.getState()); } @@ -124,8 +143,10 @@ public SubClusterHeartbeatResponse subClusterHeartbeat( SubClusterInfo subClusterInfo = membership.get(subClusterId); if (subClusterInfo == null) { - throw new YarnException("Subcluster " + subClusterId.toString() - + " does not exist; cannot heartbeat"); + String errMsg = "Subcluster " + subClusterId.toString() + + " does not exist; cannot heartbeat"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_FAIL, errMsg); } subClusterInfo.setLastHeartBeat(clock.getTime()); @@ -143,8 +164,10 @@ public GetSubClusterInfoResponse getSubCluster( .validateGetSubClusterInfoRequest(request); SubClusterId subClusterId = request.getSubClusterId(); if (!membership.containsKey(subClusterId)) { - throw new YarnException( - "Subcluster " + subClusterId.toString() + " does not exist"); + String errMsg = + "Subcluster " + subClusterId.toString() + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.MEMBERSHIP_SELECT_FAIL, errMsg); } return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId)); @@ -193,7 +216,10 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( ApplicationId appId = request.getApplicationHomeSubCluster().getApplicationId(); if (!applications.containsKey(appId)) { - throw new YarnException("Application " + appId + " does not exist"); + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_UPDATE_FAIL, + errMsg); } applications.put(appId, @@ -209,7 +235,10 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( .validateGetApplicationHomeSubClusterRequest(request); ApplicationId appId = request.getApplicationId(); if (!applications.containsKey(appId)) { - throw new YarnException("Application " + appId + " does not exist"); + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_SELECT_FAIL, + errMsg); } return GetApplicationHomeSubClusterResponse.newInstance( @@ -238,7 +267,10 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( .validateDeleteApplicationHomeSubClusterRequest(request); ApplicationId appId = request.getApplicationId(); if (!applications.containsKey(appId)) { - throw new YarnException("Application " + appId + " does not exist"); + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_DELETE_FAIL, + errMsg); } applications.remove(appId); @@ -253,7 +285,9 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( .validateGetSubClusterPolicyConfigurationRequest(request); String queue = request.getQueue(); if (!policies.containsKey(queue)) { - throw new YarnException("Policy for queue " + queue + " does not exist"); + String errMsg = "Policy for queue " + queue + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.POLICY_SELECT_FAIL, errMsg); } return GetSubClusterPolicyConfigurationResponse diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java index f13c8f1..241e37b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java @@ -260,4 +260,62 @@ public String toString() { + ", getState() = " + getState() + ", getLastStartTime() = " + getLastStartTime() + ", getCapability() = " + getCapability() + "]"; } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + SubClusterInfo other = (SubClusterInfo) obj; + if (!this.getSubClusterId().equals(other.getSubClusterId())) { + return false; + } + if (!this.getAMRMServiceAddress().equals(other.getAMRMServiceAddress())) { + return false; + } + if (!this.getClientRMServiceAddress() + .equals(other.getClientRMServiceAddress())) { + return false; + } + if (!this.getRMAdminServiceAddress() + .equals(other.getRMAdminServiceAddress())) { + return false; + } + if (!this.getRMWebServiceAddress().equals(other.getRMWebServiceAddress())) { + return false; + } + if (!this.getState().equals(other.getState())) { + return false; + } + return this.getLastStartTime() == other.getLastStartTime(); + // Capability and HeartBeat are not necessary to compare the information + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((getSubClusterId() == null) ? 0 : getSubClusterId().hashCode()); + result = prime * result + ((getAMRMServiceAddress() == null) ? 0 + : getAMRMServiceAddress().hashCode()); + result = prime * result + ((getClientRMServiceAddress() == null) ? 0 + : getClientRMServiceAddress().hashCode()); + result = prime * result + ((getRMAdminServiceAddress() == null) ? 0 + : getRMAdminServiceAddress().hashCode()); + result = prime * result + ((getRMWebServiceAddress() == null) ? 0 + : getRMWebServiceAddress().hashCode()); + result = + prime * result + ((getState() == null) ? 0 : getState().hashCode()); + result = prime * result + + (int) (getLastStartTime() ^ (getLastStartTime() >>> 32)); + return result; + // Capability and HeartBeat are not necessary to create the information + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java index b650b5f..cfdd038 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java @@ -82,22 +82,6 @@ private void mergeLocalToBuilder() { } @Override - public int hashCode() { - return getProto().hashCode(); - } - - @Override - public boolean equals(Object other) { - if (other == null) { - return false; - } - if (other.getClass().isAssignableFrom(this.getClass())) { - return this.getProto().equals(this.getClass().cast(other).getProto()); - } - return false; - } - - @Override public String toString() { return TextFormat.shortDebugString(getProto()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreErrorCode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreErrorCode.java new file mode 100644 index 0000000..f954f71 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreErrorCode.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.utils; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

+ * Logical Error codes from FederationStateStore. + *

+ */ +@Public +@Unstable +public enum FederationStateStoreErrorCode { + + MEMBERSHIP_INSERT_FAIL(1101, "Insert failed in Table Membership."), + + MEMBERSHIP_DELETE_FAIL(1102, "Delete failed in Table Membership."), + + MEMBERSHIP_SELECT_FAIL(1103, "Select failed in Table Membership."), + + MEMBERSHIP_UPDATE_FAIL(1104, "Update failed in Table Membership."), + + APPLICATIONSHOMESUBCLUSTER_INSERT_FAIL(1201, + "Insert failed in Table ApplicationsHomeSubCluster."), + + APPLICATIONSHOMESUBCLUSTER_DELETE_FAIL(1202, + "Delete failed in Table ApplicationsHomeSubCluster."), + + APPLICATIONSHOMESUBCLUSTER_SELECT_FAIL(1203, + "Select failed in Table ApplicationsHomeSubCluster."), + + APPLICATIONSHOMESUBCLUSTER_UPDATE_FAIL(1204, + "Update failed in Table ApplicationsHomeSubCluster."), + + POLICY_INSERT_FAIL(1301, "Insert failed in Table Policies."), + + POLICY_DELETE_FAIL(1302, "Delete failed in Table Policies."), + + POLICY_SELECT_FAIL(1303, "Select failed in Table Policies."), + + POLICY_UPDATE_FAIL(1304, "Update failed in Table Policies."); + + private final int id; + private final String msg; + + FederationStateStoreErrorCode(int id, String msg) { + this.id = id; + this.msg = msg; + } + + public int getId() { + return this.id; + } + + public String getMsg() { + return this.msg; + } + + @Override + public String toString() { + return "\nError Code: " + this.id + "\nError Message: " + this.msg; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreException.java new file mode 100644 index 0000000..ecbf847 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreException.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.utils; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Exception thrown by the FederationStateStore. + * + */ +public class FederationStateStoreException extends YarnException { + + /** + * IDE auto-generated + */ + private static final long serialVersionUID = -6453353714832159296L; + + private FederationStateStoreErrorCode code; + + public FederationStateStoreException(FederationStateStoreErrorCode code) { + super(); + this.code = code; + } + + public int getErrorCode() { + return code.getId(); + } + + public String getErrorMsg() { + return code.getMsg(); + } + + public FederationStateStoreErrorCode getCode() { + return code; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java new file mode 100644 index 0000000..9eb2490 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.utils; + +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.SQLException; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.zaxxer.hikari.HikariDataSource; + +/** + * Common utility methods used by the store implementations. + * + */ +public final class FederationStateStoreUtils { + + public static final Logger LOG = + LoggerFactory.getLogger(FederationStateStoreUtils.class); + + /** + * Returns the SQL FederationStateStore connection to the pool. + * + * @param LOG the logger interface + * @param cstmt the interface used to execute SQL stored procedures + * @param conn the SQL connection + * @throws YarnException + */ + public static void returnToPool(Logger LOG, CallableStatement cstmt, + Connection conn) throws YarnException { + if (cstmt != null) { + try { + cstmt.close(); + } catch (SQLException e) { + logAndThrowException(LOG, "Exception while trying to close Statement", + e); + } + } + + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + logAndThrowException(LOG, "Exception while trying to close Connection", + e); + } + } + } + + /** + * Throws an exception due to an error in FederationStateStore. + * + * @param LOG the logger interface + * @param errMsg the error message + * @param t the throwable raised in the called class. + * @throws YarnException + */ + public static void logAndThrowException(Logger LOG, String errMsg, + Throwable t) throws YarnException { + if (t != null) { + LOG.error(errMsg, t); + throw new YarnException(errMsg, t); + } else { + LOG.error(errMsg); + throw new YarnException(errMsg); + } + } + + /** + * Throws an FederationStateStoreException due to an error in + * <@Code>FederationStateStore<@Code>. + * + * @param LOG the logger interface + * @param errMsg the error message + * @param code FederationStateStoreErrorCode of the error + * @throws YarnException + */ + public static void logAndThrowStoreException(Logger LOG, + FederationStateStoreErrorCode code, String errMsg) throws YarnException { + LOG.error(errMsg + " " + code.toString()); + throw new FederationStateStoreException(code); + } + + /** + * Throws an FederationStateStoreException due to an error in + * <@Code>FederationStateStore<@Code>. + * + * @param LOG the logger interface + * @param errMsg the error message + * @param code FederationStateStoreErrorCode of the error + * @throws YarnException + */ + public static void logAndThrowStoreException(Logger LOG, + FederationStateStoreErrorCode code) throws YarnException { + LOG.error(code.toString()); + throw new FederationStateStoreException(code); + } + + /** + * Sets a specific value for a specific property of + * HikariDataSource SQL connections. + * + * @param dataSource the HikariDataSource connections + * @param property the property to set + * @param value the value to set + */ + public static void setProperty(HikariDataSource dataSource, String property, + String value) { + LOG.info("Setting property {} with value {}", property, value); + if (property != null && !property.isEmpty() && value != null) { + dataSource.addDataSourceProperty(property, value); + } + } + + /** + * Sets a specific username for HikariDataSource SQL connections. + * + * @param dataSource the HikariDataSource connections + * @param userNameDB the value to set + */ + public static void setUsername(HikariDataSource dataSource, + String userNameDB) { + if (userNameDB != null) { + dataSource.setUsername(userNameDB); + LOG.debug("Setting non NULL Username for Store connection"); + } else { + LOG.debug("NULL Username specified for Store connection, so ignoring"); + } + } + + /** + * Sets a specific password for HikariDataSource SQL connections. + * + * @param dataSource the HikariDataSource connections + * @param password the value to set + */ + public static void setPassword(HikariDataSource dataSource, String password) { + if (password != null) { + dataSource.setPassword(password); + LOG.debug("Setting non NULL Credentials for Store connection"); + } else { + LOG.debug("NULL Credentials specified for Store connection, so ignoring"); + } + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index 63a5b65..0812e56 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -51,6 +52,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreErrorCode; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreException; import org.apache.hadoop.yarn.util.MonotonicClock; import org.junit.After; import org.junit.Assert; @@ -67,9 +70,11 @@ protected abstract FederationStateStore createStateStore(); + protected Configuration conf; + @Before public void before() throws IOException, YarnException { - stateStore.init(new Configuration()); + stateStore.init(conf); } @After @@ -114,8 +119,9 @@ public void testDeregisterSubClusterUnknownSubCluster() throws Exception { try { stateStore.deregisterSubCluster(deregisterRequest); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found")); + } catch (FederationStateStoreException e) { + Assert.assertEquals(FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_FAIL, + e.getCode()); } } @@ -141,9 +147,9 @@ public void testGetSubClusterInfoUnknownSubCluster() throws Exception { try { stateStore.getSubCluster(request).getSubClusterInfo(); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue( - e.getMessage().startsWith("Subcluster SC does not exist")); + } catch (FederationStateStoreException e) { + Assert.assertEquals(FederationStateStoreErrorCode.MEMBERSHIP_SELECT_FAIL, + e.getCode()); } } @@ -166,19 +172,25 @@ public void testGetAllSubClustersInfo() throws Exception { stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest.newInstance( subClusterId2, SubClusterState.SC_UNHEALTHY, "capability")); - Assert.assertTrue( - stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true)) - .getSubClusters().contains(subClusterInfo1)); - Assert.assertFalse( + List subClustersActive = stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true)) - .getSubClusters().contains(subClusterInfo2)); - - Assert.assertTrue( - stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false)) - .getSubClusters().contains(subClusterInfo1)); - Assert.assertTrue( + .getSubClusters(); + List subClustersAll = stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false)) - .getSubClusters().contains(subClusterInfo2)); + .getSubClusters(); + + // SC1 is the only active + Assert.assertEquals(1, subClustersActive.size()); + SubClusterInfo sc1 = subClustersActive.get(0); + Assert.assertEquals(subClusterId1, sc1.getSubClusterId()); + + // SC1 and SC2 are the SubCluster present into the StateStore + + Assert.assertEquals(2, subClustersAll.size()); + Assert.assertTrue(subClustersAll.contains(sc1)); + subClustersAll.remove(sc1); + SubClusterInfo sc2 = subClustersAll.get(0); + Assert.assertEquals(subClusterId2, sc2.getSubClusterId()); } @Test @@ -204,9 +216,9 @@ public void testSubClusterHeartbeatUnknownSubCluster() throws Exception { try { stateStore.subClusterHeartbeat(heartbeatRequest); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage() - .startsWith("Subcluster SC does not exist; cannot heartbeat")); + } catch (FederationStateStoreException e) { + Assert.assertEquals(FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_FAIL, + e.getCode()); } } @@ -265,9 +277,10 @@ public void testDeleteApplicationHomeSubCluster() throws Exception { try { queryApplicationHomeSC(appId); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage() - .startsWith("Application " + appId + " does not exist")); + } catch (FederationStateStoreException e) { + Assert.assertEquals( + FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_SELECT_FAIL, + e.getCode()); } } @@ -281,9 +294,10 @@ public void testDeleteApplicationHomeSubClusterUnknownApp() throws Exception { try { stateStore.deleteApplicationHomeSubCluster(delRequest); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage() - .startsWith("Application " + appId.toString() + " does not exist")); + } catch (FederationStateStoreException e) { + Assert.assertEquals( + FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_DELETE_FAIL, + e.getCode()); } } @@ -314,9 +328,10 @@ public void testGetApplicationHomeSubClusterUnknownApp() throws Exception { try { stateStore.getApplicationHomeSubCluster(request); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage() - .startsWith("Application " + appId.toString() + " does not exist")); + } catch (FederationStateStoreException e) { + Assert.assertEquals( + FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_SELECT_FAIL, + e.getCode()); } } @@ -379,9 +394,10 @@ public void testUpdateApplicationHomeSubClusterUnknownApp() throws Exception { try { stateStore.updateApplicationHomeSubCluster((updateRequest)); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue(e.getMessage() - .startsWith("Application " + appId.toString() + " does not exist")); + } catch (FederationStateStoreException e) { + Assert.assertEquals( + FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_UPDATE_FAIL, + e.getCode()); } } @@ -440,9 +456,9 @@ public void testGetPolicyConfigurationUnknownQueue() throws Exception { try { stateStore.getPolicyConfiguration(request); Assert.fail(); - } catch (YarnException e) { - Assert.assertTrue( - e.getMessage().startsWith("Policy for queue Queue does not exist")); + } catch (FederationStateStoreException e) { + Assert.assertEquals(FederationStateStoreErrorCode.POLICY_SELECT_FAIL, + e.getCode()); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java index 74404c7..55aa381 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.server.federation.store.impl; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; /** @@ -27,6 +28,7 @@ @Override protected FederationStateStore createStateStore() { + conf = new Configuration(); return new MemoryFederationStateStore(); } -} +} \ No newline at end of file