diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java
new file mode 100644
index 0000000..2106765
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.exception;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ *
+ * Logical Error codes from FederationStateStore.
+ *
+ */
+@Public
+@Unstable
+public enum FederationStateStoreErrorCode {
+
+ MEMBERSHIP_INSERT_FAIL(1101, "Insert failed in Table Membership."),
+
+ MEMBERSHIP_DELETE_FAIL(1102, "Delete failed in Table Membership."),
+
+ MEMBERSHIP_SELECT_FAIL(1103, "Select failed in Table Membership."),
+
+ MEMBERSHIP_UPDATE_FAIL(1104, "Update failed in Table Membership."),
+
+ APPLICATIONSHOMESUBCLUSTER_INSERT_FAIL(1201,
+ "Insert failed in Table ApplicationsHomeSubCluster."),
+
+ APPLICATIONSHOMESUBCLUSTER_DELETE_FAIL(1202,
+ "Delete failed in Table ApplicationsHomeSubCluster."),
+
+ APPLICATIONSHOMESUBCLUSTER_SELECT_FAIL(1203,
+ "Select failed in Table ApplicationsHomeSubCluster."),
+
+ APPLICATIONSHOMESUBCLUSTER_UPDATE_FAIL(1204,
+ "Update failed in Table ApplicationsHomeSubCluster."),
+
+ POLICY_INSERT_FAIL(1301, "Insert failed in Table Policies."),
+
+ POLICY_DELETE_FAIL(1302, "Delete failed in Table Policies."),
+
+ POLICY_SELECT_FAIL(1303, "Select failed in Table Policies."),
+
+ POLICY_UPDATE_FAIL(1304, "Update failed in Table Policies.");
+
+ private final int id;
+ private final String msg;
+
+ FederationStateStoreErrorCode(int id, String msg) {
+ this.id = id;
+ this.msg = msg;
+ }
+
+ public int getId() {
+ return this.id;
+ }
+
+ public String getMsg() {
+ return this.msg;
+ }
+
+ @Override
+ public String toString() {
+ return "\nError Code: " + this.id + "\nError Message: " + this.msg;
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java
new file mode 100644
index 0000000..81a9e99
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.exception;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Exception thrown by the FederationStateStore.
+ *
+ */
+public class FederationStateStoreException extends YarnException {
+
+ /**
+ * IDE auto-generated.
+ */
+ private static final long serialVersionUID = -6453353714832159296L;
+
+ private FederationStateStoreErrorCode code;
+
+ public FederationStateStoreException(FederationStateStoreErrorCode code) {
+ super();
+ this.code = code;
+ }
+
+ public FederationStateStoreErrorCode getCode() {
+ return code;
+ }
+
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreInvalidInputException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreInvalidInputException.java
new file mode 100644
index 0000000..37b3137
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreInvalidInputException.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.exception;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
+
+/**
+ * Exception thrown by the {@link FederationMembershipStateStoreInputValidator},
+ * {@link FederationApplicationHomeSubClusterStoreInputValidator},
+ * {@link FederationPolicyStoreInputValidator} if the input is invalid.
+ *
+ */
+public class FederationStateStoreInvalidInputException extends YarnException {
+
+ /**
+ * IDE auto-generated.
+ */
+ private static final long serialVersionUID = -7352144682711430801L;
+
+ public FederationStateStoreInvalidInputException(Throwable cause) {
+ super(cause);
+ }
+
+ public FederationStateStoreInvalidInputException(String message) {
+ super(message);
+ }
+
+ public FederationStateStoreInvalidInputException(String message,
+ Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/package-info.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/package-info.java
new file mode 100644
index 0000000..727606f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.exception;
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index 6e564dc..b0e6960 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
@@ -60,8 +61,11 @@
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.MonotonicClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* In-memory implementation of {@link FederationStateStore}.
@@ -74,6 +78,9 @@
private final MonotonicClock clock = new MonotonicClock();
+ public static final Logger LOG =
+ LoggerFactory.getLogger(MemoryFederationStateStore.class);
+
@Override
public void init(Configuration conf) {
membership = new ConcurrentHashMap();
@@ -94,7 +101,17 @@ public SubClusterRegisterResponse registerSubCluster(
FederationMembershipStateStoreInputValidator
.validateSubClusterRegisterRequest(request);
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
- membership.put(subClusterInfo.getSubClusterId(), subClusterInfo);
+
+ SubClusterInfo subClusterInfoToSave =
+ SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(),
+ subClusterInfo.getAMRMServiceAddress(),
+ subClusterInfo.getClientRMServiceAddress(),
+ subClusterInfo.getRMAdminServiceAddress(),
+ subClusterInfo.getRMWebServiceAddress(), clock.getTime(),
+ subClusterInfo.getState(), subClusterInfo.getLastStartTime(),
+ subClusterInfo.getCapability());
+
+ membership.put(subClusterInfo.getSubClusterId(), subClusterInfoToSave);
return SubClusterRegisterResponse.newInstance();
}
@@ -105,8 +122,10 @@ public SubClusterDeregisterResponse deregisterSubCluster(
.validateSubClusterDeregisterRequest(request);
SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
if (subClusterInfo == null) {
- throw new YarnException(
- "SubCluster " + request.getSubClusterId().toString() + " not found");
+ String errMsg =
+ "SubCluster " + request.getSubClusterId().toString() + " not found";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_FAIL, errMsg);
} else {
subClusterInfo.setState(request.getState());
}
@@ -124,8 +143,10 @@ public SubClusterHeartbeatResponse subClusterHeartbeat(
SubClusterInfo subClusterInfo = membership.get(subClusterId);
if (subClusterInfo == null) {
- throw new YarnException("Subcluster " + subClusterId.toString()
- + " does not exist; cannot heartbeat");
+ String errMsg = "Subcluster " + subClusterId.toString()
+ + " does not exist; cannot heartbeat";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_FAIL, errMsg);
}
subClusterInfo.setLastHeartBeat(clock.getTime());
@@ -143,8 +164,10 @@ public GetSubClusterInfoResponse getSubCluster(
.validateGetSubClusterInfoRequest(request);
SubClusterId subClusterId = request.getSubClusterId();
if (!membership.containsKey(subClusterId)) {
- throw new YarnException(
- "Subcluster " + subClusterId.toString() + " does not exist");
+ String errMsg =
+ "Subcluster " + subClusterId.toString() + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.MEMBERSHIP_SELECT_FAIL, errMsg);
}
return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId));
@@ -193,7 +216,10 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
if (!applications.containsKey(appId)) {
- throw new YarnException("Application " + appId + " does not exist");
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_UPDATE_FAIL,
+ errMsg);
}
applications.put(appId,
@@ -209,7 +235,10 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
.validateGetApplicationHomeSubClusterRequest(request);
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
- throw new YarnException("Application " + appId + " does not exist");
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_SELECT_FAIL,
+ errMsg);
}
return GetApplicationHomeSubClusterResponse.newInstance(
@@ -238,7 +267,10 @@ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
.validateDeleteApplicationHomeSubClusterRequest(request);
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
- throw new YarnException("Application " + appId + " does not exist");
+ String errMsg = "Application " + appId + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_DELETE_FAIL,
+ errMsg);
}
applications.remove(appId);
@@ -253,7 +285,9 @@ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
.validateGetSubClusterPolicyConfigurationRequest(request);
String queue = request.getQueue();
if (!policies.containsKey(queue)) {
- throw new YarnException("Policy for queue " + queue + " does not exist");
+ String errMsg = "Policy for queue " + queue + " does not exist";
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ FederationStateStoreErrorCode.POLICY_SELECT_FAIL, errMsg);
}
return GetSubClusterPolicyConfigurationResponse
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java
index f13c8f1..6193351 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterInfo.java
@@ -260,4 +260,62 @@ public String toString() {
+ ", getState() = " + getState() + ", getLastStartTime() = "
+ getLastStartTime() + ", getCapability() = " + getCapability() + "]";
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ SubClusterInfo other = (SubClusterInfo) obj;
+ if (!this.getSubClusterId().equals(other.getSubClusterId())) {
+ return false;
+ }
+ if (!this.getAMRMServiceAddress().equals(other.getAMRMServiceAddress())) {
+ return false;
+ }
+ if (!this.getClientRMServiceAddress()
+ .equals(other.getClientRMServiceAddress())) {
+ return false;
+ }
+ if (!this.getRMAdminServiceAddress()
+ .equals(other.getRMAdminServiceAddress())) {
+ return false;
+ }
+ if (!this.getRMWebServiceAddress().equals(other.getRMWebServiceAddress())) {
+ return false;
+ }
+ if (!this.getState().equals(other.getState())) {
+ return false;
+ }
+ return this.getLastStartTime() == other.getLastStartTime();
+ // Capability and HeartBeat are not necessary to compare 2 SubClusters
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((getSubClusterId() == null) ? 0 : getSubClusterId().hashCode());
+ result = prime * result + ((getAMRMServiceAddress() == null) ? 0
+ : getAMRMServiceAddress().hashCode());
+ result = prime * result + ((getClientRMServiceAddress() == null) ? 0
+ : getClientRMServiceAddress().hashCode());
+ result = prime * result + ((getRMAdminServiceAddress() == null) ? 0
+ : getRMAdminServiceAddress().hashCode());
+ result = prime * result + ((getRMWebServiceAddress() == null) ? 0
+ : getRMWebServiceAddress().hashCode());
+ result =
+ prime * result + ((getState() == null) ? 0 : getState().hashCode());
+ result = prime * result
+ + (int) (getLastStartTime() ^ (getLastStartTime() >>> 32));
+ return result;
+ // Capability and HeartBeat are not necessary to create the hash value
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java
index b650b5f..cfdd038 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterInfoPBImpl.java
@@ -82,22 +82,6 @@ private void mergeLocalToBuilder() {
}
@Override
- public int hashCode() {
- return getProto().hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null) {
- return false;
- }
- if (other.getClass().isAssignableFrom(this.getClass())) {
- return this.getProto().equals(this.getClass().cast(other).getProto());
- }
- return false;
- }
-
- @Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
index c14a452..d920144 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.federation.store.utils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
index ff9d8e9..ebe622b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
@@ -20,6 +20,7 @@
import java.net.URI;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
index 273a8ac..0df2d85 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.store.utils;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java
deleted file mode 100644
index ea1428d..0000000
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.store.utils;
-
-import org.apache.hadoop.yarn.exceptions.YarnException;
-
-/**
- * Exception thrown by the {@link FederationMembershipStateStoreInputValidator},
- * {@link FederationApplicationHomeSubClusterStoreInputValidator},
- * {@link FederationPolicyStoreInputValidator} if the input is invalid.
- *
- */
-public class FederationStateStoreInvalidInputException extends YarnException {
-
- /**
- * IDE auto-generated.
- */
- private static final long serialVersionUID = -7352144682711430801L;
-
- public FederationStateStoreInvalidInputException(Throwable cause) {
- super(cause);
- }
-
- public FederationStateStoreInvalidInputException(String message) {
- super(message);
- }
-
- public FederationStateStoreInvalidInputException(String message,
- Throwable cause) {
- super(message, cause);
- }
-}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
new file mode 100644
index 0000000..ad3af18
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.utils;
+
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.zaxxer.hikari.HikariDataSource;
+
+/**
+ * Common utility methods used by the store implementations.
+ *
+ */
+public final class FederationStateStoreUtils {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(FederationStateStoreUtils.class);
+
+ private FederationStateStoreUtils() {
+ }
+
+ /**
+ * Returns the SQL FederationStateStore connection to the pool.
+ *
+ * @param log the logger interface
+ * @param cstmt the interface used to execute SQL stored procedures
+ * @param conn the SQL connection
+ * @throws YarnException
+ */
+ public static void returnToPool(Logger log, CallableStatement cstmt,
+ Connection conn) throws YarnException {
+ if (cstmt != null) {
+ try {
+ cstmt.close();
+ } catch (SQLException e) {
+ logAndThrowException(log, "Exception while trying to close Statement",
+ e);
+ }
+ }
+
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ logAndThrowException(log, "Exception while trying to close Connection",
+ e);
+ }
+ }
+ }
+
+ /**
+ * Throws an exception due to an error in FederationStateStore.
+ *
+ * @param log the logger interface
+ * @param errMsg the error message
+ * @param t the throwable raised in the called class.
+ * @throws YarnException
+ */
+ public static void logAndThrowException(Logger log, String errMsg,
+ Throwable t) throws YarnException {
+ if (t != null) {
+ log.error(errMsg, t);
+ throw new YarnException(errMsg, t);
+ } else {
+ log.error(errMsg);
+ throw new YarnException(errMsg);
+ }
+ }
+
+ /**
+ * Throws an FederationStateStoreException due to an error in
+ * FederationStateStore.
+ *
+ * @param log the logger interface
+ * @param code FederationStateStoreErrorCode of the error
+ * @param errMsg the error message
+ * @throws YarnException
+ */
+ public static void logAndThrowStoreException(Logger log,
+ FederationStateStoreErrorCode code, String errMsg) throws YarnException {
+ log.error(errMsg + " " + code.toString());
+ throw new FederationStateStoreException(code);
+ }
+
+ /**
+ * Throws an FederationStateStoreException due to an error in
+ * FederationStateStore.
+ *
+ * @param log the logger interface
+ * @param code FederationStateStoreErrorCode of the error
+ * @throws YarnException
+ */
+ public static void logAndThrowStoreException(Logger log,
+ FederationStateStoreErrorCode code) throws YarnException {
+ log.error(code.toString());
+ throw new FederationStateStoreException(code);
+ }
+
+ /**
+ * Sets a specific value for a specific property of
+ * HikariDataSource SQL connections.
+ *
+ * @param dataSource the HikariDataSource connections
+ * @param property the property to set
+ * @param value the value to set
+ */
+ public static void setProperty(HikariDataSource dataSource, String property,
+ String value) {
+ LOG.info("Setting property {} with value {}", property, value);
+ if (property != null && !property.isEmpty() && value != null) {
+ dataSource.addDataSourceProperty(property, value);
+ }
+ }
+
+ /**
+ * Sets a specific username for HikariDataSource SQL connections.
+ *
+ * @param dataSource the HikariDataSource connections
+ * @param userNameDB the value to set
+ */
+ public static void setUsername(HikariDataSource dataSource,
+ String userNameDB) {
+ if (userNameDB != null) {
+ dataSource.setUsername(userNameDB);
+ LOG.debug("Setting non NULL Username for Store connection");
+ } else {
+ LOG.debug("NULL Username specified for Store connection, so ignoring");
+ }
+ }
+
+ /**
+ * Sets a specific password for HikariDataSource SQL connections.
+ *
+ * @param dataSource the HikariDataSource connections
+ * @param password the value to set
+ */
+ public static void setPassword(HikariDataSource dataSource, String password) {
+ if (password != null) {
+ dataSource.setPassword(password);
+ LOG.debug("Setting non NULL Credentials for Store connection");
+ } else {
+ LOG.debug("NULL Credentials specified for Store connection, so ignoring");
+ }
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index 63a5b65..94a8cfc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -19,11 +19,14 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@@ -67,9 +70,11 @@
protected abstract FederationStateStore createStateStore();
+ private Configuration conf;
+
@Before
public void before() throws IOException, YarnException {
- stateStore.init(new Configuration());
+ stateStore.init(conf);
}
@After
@@ -114,8 +119,9 @@ public void testDeregisterSubClusterUnknownSubCluster() throws Exception {
try {
stateStore.deregisterSubCluster(deregisterRequest);
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_FAIL,
+ e.getCode());
}
}
@@ -141,9 +147,9 @@ public void testGetSubClusterInfoUnknownSubCluster() throws Exception {
try {
stateStore.getSubCluster(request).getSubClusterInfo();
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(
- e.getMessage().startsWith("Subcluster SC does not exist"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(FederationStateStoreErrorCode.MEMBERSHIP_SELECT_FAIL,
+ e.getCode());
}
}
@@ -166,19 +172,25 @@ public void testGetAllSubClustersInfo() throws Exception {
stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest.newInstance(
subClusterId2, SubClusterState.SC_UNHEALTHY, "capability"));
- Assert.assertTrue(
- stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))
- .getSubClusters().contains(subClusterInfo1));
- Assert.assertFalse(
+ List subClustersActive =
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))
- .getSubClusters().contains(subClusterInfo2));
-
- Assert.assertTrue(
+ .getSubClusters();
+ List subClustersAll =
stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false))
- .getSubClusters().contains(subClusterInfo1));
- Assert.assertTrue(
- stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(false))
- .getSubClusters().contains(subClusterInfo2));
+ .getSubClusters();
+
+ // SC1 is the only active
+ Assert.assertEquals(1, subClustersActive.size());
+ SubClusterInfo sc1 = subClustersActive.get(0);
+ Assert.assertEquals(subClusterId1, sc1.getSubClusterId());
+
+ // SC1 and SC2 are the SubCluster present into the StateStore
+
+ Assert.assertEquals(2, subClustersAll.size());
+ Assert.assertTrue(subClustersAll.contains(sc1));
+ subClustersAll.remove(sc1);
+ SubClusterInfo sc2 = subClustersAll.get(0);
+ Assert.assertEquals(subClusterId2, sc2.getSubClusterId());
}
@Test
@@ -204,9 +216,9 @@ public void testSubClusterHeartbeatUnknownSubCluster() throws Exception {
try {
stateStore.subClusterHeartbeat(heartbeatRequest);
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage()
- .startsWith("Subcluster SC does not exist; cannot heartbeat"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_FAIL,
+ e.getCode());
}
}
@@ -265,9 +277,10 @@ public void testDeleteApplicationHomeSubCluster() throws Exception {
try {
queryApplicationHomeSC(appId);
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage()
- .startsWith("Application " + appId + " does not exist"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(
+ FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_SELECT_FAIL,
+ e.getCode());
}
}
@@ -281,9 +294,10 @@ public void testDeleteApplicationHomeSubClusterUnknownApp() throws Exception {
try {
stateStore.deleteApplicationHomeSubCluster(delRequest);
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage()
- .startsWith("Application " + appId.toString() + " does not exist"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(
+ FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_DELETE_FAIL,
+ e.getCode());
}
}
@@ -314,9 +328,10 @@ public void testGetApplicationHomeSubClusterUnknownApp() throws Exception {
try {
stateStore.getApplicationHomeSubCluster(request);
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage()
- .startsWith("Application " + appId.toString() + " does not exist"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(
+ FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_SELECT_FAIL,
+ e.getCode());
}
}
@@ -379,9 +394,10 @@ public void testUpdateApplicationHomeSubClusterUnknownApp() throws Exception {
try {
stateStore.updateApplicationHomeSubCluster((updateRequest));
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(e.getMessage()
- .startsWith("Application " + appId.toString() + " does not exist"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(
+ FederationStateStoreErrorCode.APPLICATIONSHOMESUBCLUSTER_UPDATE_FAIL,
+ e.getCode());
}
}
@@ -440,9 +456,9 @@ public void testGetPolicyConfigurationUnknownQueue() throws Exception {
try {
stateStore.getPolicyConfiguration(request);
Assert.fail();
- } catch (YarnException e) {
- Assert.assertTrue(
- e.getMessage().startsWith("Policy for queue Queue does not exist"));
+ } catch (FederationStateStoreException e) {
+ Assert.assertEquals(FederationStateStoreErrorCode.POLICY_SELECT_FAIL,
+ e.getCode());
}
}
@@ -537,4 +553,8 @@ private SubClusterPolicyConfiguration queryPolicy(String queue)
return result.getPolicyConfiguration();
}
+ protected void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
index 74404c7..64adab8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
/**
@@ -27,6 +28,7 @@
@Override
protected FederationStateStore createStateStore() {
+ super.setConf(new Configuration());
return new MemoryFederationStateStore();
}
-}
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
index b95f17a..8ac5e81 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
@@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;