diff --git hadoop-project/pom.xml hadoop-project/pom.xml index 3c3e7cf..38a3974 100755 --- hadoop-project/pom.xml +++ hadoop-project/pom.xml @@ -96,6 +96,7 @@ 1.0.0 3.0.3 + 2.4.11 1.8 @@ -1275,6 +1276,11 @@ ehcache ${ehcache.version} + + com.zaxxer + HikariCP-java7 + ${hikari.version} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 856beb9..da2d46d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2587,6 +2587,29 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = ""; + private static final String FEDERATION_STATESTORE_SQL_PREFIX = + FEDERATION_PREFIX + "state-store.sql."; + + public static final String FEDERATION_STATESTORE_SQL_USERNAME = + FEDERATION_STATESTORE_SQL_PREFIX + "username"; + + public static final String FEDERATION_STATESTORE_SQL_PASSWORD = + FEDERATION_STATESTORE_SQL_PREFIX + "password"; + + public static final String FEDERATION_STATESTORE_SQL_URL = + FEDERATION_STATESTORE_SQL_PREFIX + "url"; + + public static final String FEDERATION_STATESTORE_SQL_JDBC_CLASS = + FEDERATION_STATESTORE_SQL_PREFIX + "jdbc-class"; + + public static final String DEFAULT_FEDERATION_STATESTORE_SQL_JDBC_CLASS = + "org.hsqldb.jdbc.JDBCDataSource"; + + public static final String FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = + FEDERATION_STATESTORE_SQL_PREFIX + "max-connections"; + + public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 5ae8889..467166d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -116,6 +116,10 @@ org.ehcache ehcache + + com.zaxxer + HikariCP-java7 + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java new file mode 100644 index 0000000..c0a98a4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java @@ -0,0 +1,952 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.nio.ByteBuffer; +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.input.SwappedDataInputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode; +import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest; +import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse; +import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest; +import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator; +import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils; +import org.apache.hadoop.yarn.server.records.Version; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.zaxxer.hikari.HikariDataSource; + +/** + * SQL implementation of {@link FederationStateStore}. + */ +public class SQLFederationStateStore implements FederationStateStore { + + public static final Logger LOG = + LoggerFactory.getLogger(SQLFederationStateStore.class); + + // Stored procedures patterns + + private static final String CALL_SP_REGISTER_SUBCLUSTER = + "{call sp_registerSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}"; + + private static final String CALL_SP_DEREGISTER_SUBCLUSTER = + "{call sp_deregisterSubCluster(?, ?, ?)}"; + + private static final String CALL_SP_GET_SUBCLUSTER = + "{call sp_getSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}"; + + private static final String CALL_SP_GET_SUBCLUSTERS = + "{call sp_getSubClusters()}"; + + private static final String CALL_SP_SUBCLUSTER_HEARTBEAT = + "{call sp_subClusterHeartbeat(?, ?, ?, ?)}"; + + private static final String CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER = + "{call sp_addApplicationHomeSubCluster(?, ?, ?, ?)}"; + + private static final String CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER = + "{call sp_updateApplicationHomeSubCluster(?, ?, ?)}"; + + private static final String CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER = + "{call sp_deleteApplicationHomeSubCluster(?, ?)}"; + + private static final String CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER = + "{call sp_getApplicationHomeSubCluster(?, ?)}"; + + private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER = + "{call sp_getApplicationsHomeSubCluster()}"; + + private static final String CALL_SP_SET_POLICY_CONFIGURATION = + "{call sp_setPolicyConfiguration(?, ?, ?, ?)}"; + + private static final String CALL_SP_GET_POLICY_CONFIGURATION = + "{call sp_getPolicyConfiguration(?, ?, ?)}"; + + private static final String CALL_SP_GET_ALL_POLICY_CONFIGURATIONS = + "{call sp_getPoliciesConfigurations()}"; + + // SQL database configurations + + private String userNameDB; + private String passwordDB; + private String driverClass; + private String url; + private int maximumPoolSize; + private HikariDataSource dataSource = null; + + @Override + public void init(Configuration conf) throws YarnException { + driverClass = + conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_SQL_JDBC_CLASS); + maximumPoolSize = + conf.getInt(YarnConfiguration.FEDERATION_STATESTORE_SQL_MAXCONNECTIONS, + YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS); + + // An helper method avoids to assign a null value to these property + userNameDB = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME); + passwordDB = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD); + url = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL); + + try { + Class.forName(driverClass); + } catch (ClassNotFoundException e) { + FederationStateStoreUtils.logAndThrowException(LOG, + "Driver class not found.", e); + } + + // Create the data source to pool connections in a thread-safe manner + dataSource = new HikariDataSource(); + dataSource.setDataSourceClassName(driverClass); + FederationStateStoreUtils.setUsername(dataSource, userNameDB); + FederationStateStoreUtils.setPassword(dataSource, passwordDB); + FederationStateStoreUtils.setProperty(dataSource, + FederationStateStoreUtils.FEDERATION_STORE_URL, url); + dataSource.setMaximumPoolSize(maximumPoolSize); + LOG.info( + "Initialized connection pool to the Federation StateStore database at address: " + + url); + } + + @Override + public SubClusterRegisterResponse registerSubCluster( + SubClusterRegisterRequest registerSubClusterRequest) + throws YarnException { + + // Input validator + try { + FederationMembershipStateStoreInputValidator + .validateSubClusterRegisterRequest(registerSubClusterRequest); + } catch (FederationStateStoreInvalidInputException e) { + FederationStateStoreUtils.logAndThrowInvalidInputException(LOG, + "Unable to register the SubCluster." + e.getMessage()); + } + + CallableStatement cstmt = null; + Connection conn = null; + + SubClusterInfo subClusterInfo = + registerSubClusterRequest.getSubClusterInfo(); + SubClusterId subClusterId = subClusterInfo.getSubClusterId(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_REGISTER_SUBCLUSTER); + + // Set the parameters for the stored procedure + cstmt.setString(1, subClusterId.getId()); + cstmt.setString(2, subClusterInfo.getAMRMServiceAddress()); + cstmt.setString(3, subClusterInfo.getClientRMServiceAddress()); + cstmt.setString(4, subClusterInfo.getRMAdminServiceAddress()); + cstmt.setString(5, subClusterInfo.getRMWebServiceAddress()); + cstmt.setString(6, subClusterInfo.getState().toString()); + cstmt.setLong(7, subClusterInfo.getLastStartTime()); + cstmt.setString(8, subClusterInfo.getCapability()); + cstmt.registerOutParameter(9, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + // Check the ROWCOUNT value, if it is different from 1 it means the call + // did not add a new subcluster into FederationStateStore + if (cstmt.getInt(9) != 1) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.MEMBERSHIP_INSERT_FAIL); + } + + LOG.info( + "Registered the SubCluster " + subClusterId + " into the StateStore"); + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to register the SubCluster " + subClusterId + + " into the StateStore", + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return SubClusterRegisterResponse.newInstance(); + } + + @Override + public SubClusterDeregisterResponse deregisterSubCluster( + SubClusterDeregisterRequest subClusterDeregisterRequest) + throws YarnException { + + // Input validator + try { + FederationMembershipStateStoreInputValidator + .validateSubClusterDeregisterRequest(subClusterDeregisterRequest); + } catch (FederationStateStoreInvalidInputException e) { + FederationStateStoreUtils.logAndThrowInvalidInputException(LOG, + "Unable to deregister the SubCluster. " + e.getMessage()); + } + + CallableStatement cstmt = null; + Connection conn = null; + + SubClusterId subClusterId = subClusterDeregisterRequest.getSubClusterId(); + SubClusterState state = subClusterDeregisterRequest.getState(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_DEREGISTER_SUBCLUSTER); + + // Set the parameters for the stored procedure + cstmt.setString(1, subClusterId.getId()); + cstmt.setString(2, state.toString()); + cstmt.registerOutParameter(3, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + // Check the ROWCOUNT value, if it is different from 1 it means the call + // did not deregister the subcluster into FederationStateStore + if (cstmt.getInt(3) != 1) { + String errMsg = "SubCluster " + subClusterId + " not found"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL, + errMsg); + } + + LOG.info("Deregistered the SubCluster " + subClusterId + " state to " + + state.toString()); + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to deregister the sub-cluster " + subClusterId + " state to " + + state.toString(), + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return SubClusterDeregisterResponse.newInstance(); + } + + @Override + public SubClusterHeartbeatResponse subClusterHeartbeat( + SubClusterHeartbeatRequest subClusterHeartbeatRequest) + throws YarnException { + + // Input validator + try { + FederationMembershipStateStoreInputValidator + .validateSubClusterHeartbeatRequest(subClusterHeartbeatRequest); + } catch (FederationStateStoreInvalidInputException e) { + FederationStateStoreUtils.logAndThrowInvalidInputException(LOG, + "Unable to heartbeat the SubCluster. " + e.getMessage()); + } + + CallableStatement cstmt = null; + Connection conn = null; + + SubClusterId subClusterId = subClusterHeartbeatRequest.getSubClusterId(); + SubClusterState state = subClusterHeartbeatRequest.getState(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_SUBCLUSTER_HEARTBEAT); + + // Set the parameters for the stored procedure + cstmt.setString(1, subClusterId.getId()); + cstmt.setString(2, state.toString()); + cstmt.setString(3, subClusterHeartbeatRequest.getCapability()); + cstmt.registerOutParameter(4, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + // Check the ROWCOUNT value, if it is different from 1 it means the call + // did not update the subcluster into FederationStateStore + if (cstmt.getInt(4) != 1) { + String errMsg = "Subcluster " + subClusterId.toString() + + " does not exist; cannot heartbeat"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL, + errMsg); + } + + LOG.info("Heartbeated the StateStore for the specified SubCluster " + + subClusterId); + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to heartbeat the StateStore for the specified SubCluster " + + subClusterId, + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return SubClusterHeartbeatResponse.newInstance(); + } + + @Override + public GetSubClusterInfoResponse getSubCluster( + GetSubClusterInfoRequest subClusterRequest) throws YarnException { + + // Input validator + try { + FederationMembershipStateStoreInputValidator + .validateGetSubClusterInfoRequest(subClusterRequest); + } catch (FederationStateStoreInvalidInputException e) { + FederationStateStoreUtils.logAndThrowInvalidInputException(LOG, + "Unable to obtain the infomation about the SubCluster. " + + e.getMessage()); + } + + CallableStatement cstmt = null; + Connection conn = null; + + SubClusterInfo subClusterInfo = null; + SubClusterId subClusterId = subClusterRequest.getSubClusterId(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTER); + cstmt.setString(1, subClusterId.getId()); + + // Set the parameters for the stored procedure + cstmt.registerOutParameter(2, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(3, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(4, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(5, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(6, java.sql.Types.TIMESTAMP); + cstmt.registerOutParameter(7, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(8, java.sql.Types.BIGINT); + cstmt.registerOutParameter(9, java.sql.Types.VARCHAR); + + // Execute the query + cstmt.execute(); + + String amRMAddress = cstmt.getString(2); + String clientRMAddress = cstmt.getString(3); + String rmAdminAddress = cstmt.getString(4); + String webAppAddress = cstmt.getString(5); + + Timestamp heartBeatTimeStamp = cstmt.getTimestamp(6); + long lastHeartBeat = + heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0; + + SubClusterState state = fromStringToSubClusterState(cstmt.getString(7)); + long lastStartTime = cstmt.getLong(8); + String capability = cstmt.getString(9); + + subClusterInfo = SubClusterInfo.newInstance(subClusterId, amRMAddress, + clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state, + lastStartTime, capability); + + // Check if the output it is a valid subcluster + try { + FederationMembershipStateStoreInputValidator + .checkSubClusterInfo(subClusterInfo); + } catch (FederationStateStoreInvalidInputException e) { + String errMsg = + "SubCluster " + subClusterId.toString() + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL, + errMsg); + } + LOG.debug("Got the information about the specified SubCluster " + + subClusterId); + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the SubCluster information for " + subClusterId, e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return GetSubClusterInfoResponse.newInstance(subClusterInfo); + } + + @Override + public GetSubClustersInfoResponse getSubClusters( + GetSubClustersInfoRequest subClustersRequest) throws YarnException { + CallableStatement cstmt = null; + Connection conn = null; + List subClusters = new ArrayList(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS); + + // Execute the query + ResultSet rs = cstmt.executeQuery(); + + while (rs.next()) { + + // Extract the output for each tuple + String subClusterName = rs.getString(1); + String amRMAddress = rs.getString(2); + String clientRMAddress = rs.getString(3); + String rmAdminAddress = rs.getString(4); + String webAppAddress = rs.getString(5); + long lastHeartBeat = rs.getTimestamp(6).getTime(); + SubClusterState state = fromStringToSubClusterState(rs.getString(7)); + long lastStartTime = rs.getLong(8); + String capability = rs.getString(9); + + SubClusterId subClusterId = SubClusterId.newInstance(subClusterName); + SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId, + amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress, + lastHeartBeat, state, lastStartTime, capability); + + // Check if the output it is a valid subcluster + try { + FederationMembershipStateStoreInputValidator + .checkSubClusterInfo(subClusterInfo); + } catch (FederationStateStoreInvalidInputException e) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.MEMBERSHIP_MULTIPLE_SELECT_FAIL); + } + + // Filter the inactive + if (!subClustersRequest.getFilterInactiveSubClusters() + || subClusterInfo.getState().isActive()) { + subClusters.add(subClusterInfo); + } + } + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the information for all the SubClusters ", e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return GetSubClustersInfoResponse.newInstance(subClusters); + } + + @Override + public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( + AddApplicationHomeSubClusterRequest request) throws YarnException { + + // Input validator + try { + FederationApplicationHomeSubClusterStoreInputValidator + .validateAddApplicationHomeSubClusterRequest(request); + } catch (FederationStateStoreInvalidInputException e) { + FederationStateStoreUtils.logAndThrowInvalidInputException(LOG, + "Unable to add the application. " + e.getMessage()); + } + + CallableStatement cstmt = null; + Connection conn = null; + + String subClusterHome = null; + ApplicationId appId = + request.getApplicationHomeSubCluster().getApplicationId(); + SubClusterId subClusterId = + request.getApplicationHomeSubCluster().getHomeSubCluster(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER); + + // Set the parameters for the stored procedure + cstmt.setString(1, appId.toString()); + cstmt.setString(2, subClusterId.getId()); + cstmt.registerOutParameter(3, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(4, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + subClusterHome = cstmt.getString(3); + SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome); + + // For failover reason, we check the returned SubClusterId. + // If it is equal to the subclusterId we sent, the call added the new + // application into FederationStateStore. If the call returns a different + // SubClusterId it means we already tried to insert this application but a + // component (Router/StateStore/RM) failed during the submission. + if (subClusterId.equals(subClusterIdHome)) { + // Check the ROWCOUNT value, if it is different from 1 it means the call + // did not add a new application into FederationStateStore + if (cstmt.getInt(4) != 1) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.APPLICATIONS_INSERT_FAIL); + } + LOG.info("Insert into the StateStore the application: " + appId + + " in SubCluster: " + subClusterHome); + } else { + // Check the ROWCOUNT value, if it is different from 0 it means the call + // did edited the table + if (cstmt.getInt(4) != 0) { + String errMsg = + "The application " + appId + " does exist but was overwritten"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.APPLICATIONS_INSERT_FAIL, errMsg); + } + LOG.info("Application: " + appId + " already present with SubCluster: " + + subClusterHome); + } + + } catch (SQLException e) { + FederationStateStoreUtils + .logAndThrowRetriableException(LOG, + "Unable to insert the newly generated application " + + request.getApplicationHomeSubCluster().getApplicationId(), + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return AddApplicationHomeSubClusterResponse + .newInstance(SubClusterId.newInstance(subClusterHome)); + } + + @Override + public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( + UpdateApplicationHomeSubClusterRequest request) throws YarnException { + + // Input validator + try { + FederationApplicationHomeSubClusterStoreInputValidator + .validateUpdateApplicationHomeSubClusterRequest(request); + } catch (FederationStateStoreInvalidInputException e) { + FederationStateStoreUtils.logAndThrowInvalidInputException(LOG, + "Unable to update the application. " + e.getMessage()); + } + + CallableStatement cstmt = null; + Connection conn = null; + + ApplicationId appId = + request.getApplicationHomeSubCluster().getApplicationId(); + SubClusterId subClusterId = + request.getApplicationHomeSubCluster().getHomeSubCluster(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER); + + // Set the parameters for the stored procedure + cstmt.setString(1, appId.toString()); + cstmt.setString(2, subClusterId.getId()); + cstmt.registerOutParameter(3, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + // Check the ROWCOUNT value, if it is different from 1 it means the call + // did not update the application into FederationStateStore + if (cstmt.getInt(3) != 1) { + String errMsg = "Application " + appId + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, errMsg); + } + + LOG.info( + "Update the SubCluster to {} for application {} in the StateStore", + subClusterId, appId); + + } catch (SQLException e) { + FederationStateStoreUtils + .logAndThrowRetriableException(LOG, + "Unable to update the application " + + request.getApplicationHomeSubCluster().getApplicationId(), + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return UpdateApplicationHomeSubClusterResponse.newInstance(); + } + + @Override + public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( + GetApplicationHomeSubClusterRequest request) throws YarnException { + // Input validator + try { + FederationApplicationHomeSubClusterStoreInputValidator + .validateGetApplicationHomeSubClusterRequest(request); + } catch (FederationStateStoreInvalidInputException e) { + FederationStateStoreUtils.logAndThrowInvalidInputException(LOG, + "Unable to obtain the application information. " + e.getMessage()); + } + + CallableStatement cstmt = null; + Connection conn = null; + + SubClusterId homeRM = null; + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER); + + // Set the parameters for the stored procedure + cstmt.setString(1, request.getApplicationId().toString()); + cstmt.registerOutParameter(2, java.sql.Types.VARCHAR); + + // Execute the query + cstmt.execute(); + + if (cstmt.getString(2) != null) { + homeRM = SubClusterId.newInstance(cstmt.getString(2)); + } else { + String errMsg = + "Application " + request.getApplicationId() + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL, + errMsg); + } + + LOG.debug("Got the information about the specified application " + + request.getApplicationId()); + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the application information for the specified application " + + request.getApplicationId(), + e); + } finally { + + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return GetApplicationHomeSubClusterResponse + .newInstance(ApplicationHomeSubCluster + .newInstance(request.getApplicationId(), homeRM)); + } + + @Override + public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( + GetApplicationsHomeSubClusterRequest request) throws YarnException { + CallableStatement cstmt = null; + Connection conn = null; + List appsHomeSubClusters = + new ArrayList(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER); + + // Execute the query + ResultSet rs = cstmt.executeQuery(); + + while (rs.next()) { + + // Extract the output for each tuple + String applicationId = rs.getString(1); + String homeSubCluster = rs.getString(2); + + appsHomeSubClusters.add(ApplicationHomeSubCluster.newInstance( + ApplicationId.fromString(applicationId), + SubClusterId.newInstance(homeSubCluster))); + } + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the information for all the applications ", e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return GetApplicationsHomeSubClusterResponse + .newInstance(appsHomeSubClusters); + } + + @Override + public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( + DeleteApplicationHomeSubClusterRequest request) throws YarnException { + + // Input validator + try { + FederationApplicationHomeSubClusterStoreInputValidator + .validateDeleteApplicationHomeSubClusterRequest(request); + } catch (FederationStateStoreInvalidInputException e) { + FederationStateStoreUtils.logAndThrowInvalidInputException(LOG, + "Unable to delete the application. " + e.getMessage()); + } + + CallableStatement cstmt = null; + Connection conn = null; + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER); + + // Set the parameters for the stored procedure + cstmt.setString(1, request.getApplicationId().toString()); + cstmt.registerOutParameter(2, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + // Check the ROWCOUNT value, if it is different from 1 it means the call + // did not delete the application from FederationStateStore + if (cstmt.getInt(2) != 1) { + String errMsg = + "Application " + request.getApplicationId() + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, errMsg); + } + + LOG.info("Delete from the StateStore the application: " + + request.getApplicationId()); + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to delete the application " + request.getApplicationId(), e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return DeleteApplicationHomeSubClusterResponse.newInstance(); + } + + @Override + public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration( + GetSubClusterPolicyConfigurationRequest request) throws YarnException { + + // Input validator + try { + FederationPolicyStoreInputValidator + .validateGetSubClusterPolicyConfigurationRequest(request); + } catch (FederationStateStoreInvalidInputException e) { + FederationStateStoreUtils.logAndThrowInvalidInputException(LOG, + "Unable to insert the policy. " + e.getMessage()); + } + + CallableStatement cstmt = null; + Connection conn = null; + SubClusterPolicyConfiguration subClusterPolicyConfiguration = null; + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_GET_POLICY_CONFIGURATION); + + // Set the parameters for the stored procedure + cstmt.setString(1, request.getQueue()); + cstmt.registerOutParameter(2, java.sql.Types.VARCHAR); + cstmt.registerOutParameter(3, java.sql.Types.VARBINARY); + + // Execute the query + cstmt.executeUpdate(); + + // Check if the output it is a valid policy + if (cstmt.getString(2) != null && cstmt.getBytes(3) != null) { + subClusterPolicyConfiguration = + SubClusterPolicyConfiguration.newInstance(request.getQueue(), + cstmt.getString(2), ByteBuffer.wrap(cstmt.getBytes(3))); + LOG.debug("Selected from StateStore the policy for the queue: " + + request.getQueue()); + } else { + String errMsg = + "Policy for queue " + request.getQueue() + " does not exist"; + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, errMsg); + } + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to select the policy for the queue :" + request.getQueue(), + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return GetSubClusterPolicyConfigurationResponse + .newInstance(subClusterPolicyConfiguration); + } + + @Override + public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration( + SetSubClusterPolicyConfigurationRequest request) throws YarnException { + + // Input validator + try { + FederationPolicyStoreInputValidator + .validateSetSubClusterPolicyConfigurationRequest(request); + } catch (FederationStateStoreInvalidInputException e) { + FederationStateStoreUtils.logAndThrowInvalidInputException(LOG, + "Unable to insert the policy. " + e.getMessage()); + } + + CallableStatement cstmt = null; + Connection conn = null; + + SubClusterPolicyConfiguration policyConf = request.getPolicyConfiguration(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_SET_POLICY_CONFIGURATION); + + // Set the parameters for the stored procedure + cstmt.setString(1, policyConf.getQueue()); + cstmt.setString(2, policyConf.getType()); + cstmt.setBytes(3, new byte[policyConf.getParams().remaining()]); + cstmt.registerOutParameter(4, java.sql.Types.INTEGER); + + // Execute the query + cstmt.executeUpdate(); + + // Check the ROWCOUNT value, if it is different from 1 it means the call + // did not add a new policy into FederationStateStore + if (cstmt.getInt(4) != 1) { + FederationStateStoreUtils.logAndThrowStoreException(LOG, + FederationStateStoreErrorCode.POLICY_INSERT_FAIL); + } + + LOG.info("Insert into the state store the policy for the queue: " + + policyConf.getQueue()); + + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to insert the newly generated policy for the queue :" + + policyConf.getQueue(), + e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + return SetSubClusterPolicyConfigurationResponse.newInstance(); + } + + @Override + public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations( + GetSubClusterPoliciesConfigurationsRequest request) throws YarnException { + + CallableStatement cstmt = null; + Connection conn = null; + List policyConfigurations = + new ArrayList(); + + try { + conn = getConnection(); + cstmt = conn.prepareCall(CALL_SP_GET_ALL_POLICY_CONFIGURATIONS); + + // Execute the query + ResultSet rs = cstmt.executeQuery(); + + while (rs.next()) { + + // Extract the output for each tuple + String queue = rs.getString(1); + String type = rs.getString(2); + byte[] policyInfo = rs.getBytes(3); + + SubClusterPolicyConfiguration subClusterPolicyConfiguration = + SubClusterPolicyConfiguration.newInstance(queue, type, + ByteBuffer.wrap(policyInfo)); + policyConfigurations.add(subClusterPolicyConfiguration); + } + } catch (SQLException e) { + FederationStateStoreUtils.logAndThrowRetriableException(LOG, + "Unable to obtain the policy information for all the queues.", e); + } finally { + // Return to the pool the CallableStatement and the Connection + FederationStateStoreUtils.returnToPool(LOG, cstmt, conn); + } + + return GetSubClusterPoliciesConfigurationsResponse + .newInstance(policyConfigurations); + } + + @Override + public Version getCurrentVersion() { + return null; + } + + @Override + public Version loadVersion() { + return null; + } + + @Override + public void close() throws Exception { + if (dataSource != null) { + dataSource.close(); + } + } + + /** + * Convert a string into {@code SubClusterState} + * + * @return the respective {@code SubClusterState} + */ + public SubClusterState fromStringToSubClusterState(String x) { + try { + return SubClusterState.valueOf(x); + } catch (Exception e) { + LOG.error("Invalid SubCluster State value in the StateStore does not" + + " match with the YARN Federation standard."); + return null; + } + } + + /** + * Get a connection from the DataSource pool. + * + * @return a connection from the DataSource pool. + * @throw SQLException + */ + public Connection getConnection() throws SQLException { + return dataSource.getConnection(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java index ebe622b..ecb15f9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java @@ -169,7 +169,7 @@ public static void validateGetSubClusterInfoRequest( * @throws FederationStateStoreInvalidInputException if the SubCluster Info * are invalid */ - private static void checkSubClusterInfo(SubClusterInfo subClusterInfo) + public static void checkSubClusterInfo(SubClusterInfo subClusterInfo) throws FederationStateStoreInvalidInputException { if (subClusterInfo == null) { String message = "Missing SubCluster Information." diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java index 7dbb20a..948fcbc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java @@ -30,12 +30,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.zaxxer.hikari.HikariDataSource; + /** * Common utility methods used by the store implementations. * */ public final class FederationStateStoreUtils { + public final static String FEDERATION_STORE_URL = "url"; + public static final Logger LOG = LoggerFactory.getLogger(FederationStateStoreUtils.class); @@ -152,4 +156,51 @@ public static void logAndThrowRetriableException(Logger log, String errMsg, throw new FederationStateStoreRetriableException(errMsg); } } + + /** + * Sets a specific value for a specific property of + * HikariDataSource SQL connections. + * + * @param dataSource the HikariDataSource connections + * @param property the property to set + * @param value the value to set + */ + public static void setProperty(HikariDataSource dataSource, String property, + String value) { + LOG.info("Setting property {} with value {}", property, value); + if (property != null && !property.isEmpty() && value != null) { + dataSource.addDataSourceProperty(property, value); + } + } + + /** + * Sets a specific username for HikariDataSource SQL connections. + * + * @param dataSource the HikariDataSource connections + * @param userNameDB the value to set + */ + public static void setUsername(HikariDataSource dataSource, + String userNameDB) { + if (userNameDB != null) { + dataSource.setUsername(userNameDB); + LOG.debug("Setting non NULL Username for Store connection"); + } else { + LOG.debug("NULL Username specified for Store connection, so ignoring"); + } + } + + /** + * Sets a specific password for HikariDataSource SQL connections. + * + * @param dataSource the HikariDataSource connections + * @param password the value to set + */ + public static void setPassword(HikariDataSource dataSource, String password) { + if (password != null) { + dataSource.setPassword(password); + LOG.debug("Setting non NULL Credentials for Store connection"); + } else { + LOG.debug("NULL Credentials specified for Store connection, so ignoring"); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java index 80b00ef..3154947 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java @@ -70,7 +70,7 @@ protected abstract FederationStateStore createStateStore(); - private Configuration conf; + protected Configuration conf; @Before public void before() throws IOException, YarnException { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java new file mode 100644 index 0000000..b54a7df --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import java.sql.Connection; +import java.sql.SQLException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HSQLDB implementation of {@link FederationStateStore}. + */ +public class HSQLDBFederationStateStore extends SQLFederationStateStore { + + private static final Logger LOG = + LoggerFactory.getLogger(HSQLDBFederationStateStore.class); + + private Connection conn; + + private static final String TABLE_APPLICATIONSHOMESUBCLUSTER = + " CREATE TABLE applicationsHomeSubCluster (" + + " applicationId varchar(64) NOT NULL," + + " homeSubCluster varchar(256) NOT NULL," + + " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))"; + + private static final String TABLE_MEMBERSHIP = + "CREATE TABLE membership ( subClusterId varchar(256) NOT NULL," + + " amRMServiceAddress varchar(256) NOT NULL," + + " clientRMServiceAddress varchar(256) NOT NULL," + + " rmAdminServiceAddress varchar(256) NOT NULL," + + " rmWebServiceAddress varchar(256) NOT NULL," + + " lastHeartBeat datetime NOT NULL, state varchar(32) NOT NULL," + + " lastStartTime bigint NULL, capability varchar(6000) NOT NULL," + + " CONSTRAINT pk_subClusterId PRIMARY KEY (subClusterId))"; + + private static final String TABLE_POLICIES = + "CREATE TABLE policies ( queue varchar(256) NOT NULL," + + " policyType varchar(256) NOT NULL, params varbinary(512)," + + " CONSTRAINT pk_queue PRIMARY KEY (queue))"; + + private static final String SP_REGISTERSUBCLUSTER = + "CREATE PROCEDURE sp_registerSubCluster(" + + " IN subClusterId_IN varchar(256)," + + " IN amRMServiceAddress_IN varchar(256)," + + " IN clientRMServiceAddress_IN varchar(256)," + + " IN rmAdminServiceAddress_IN varchar(256)," + + " IN rmWebServiceAddress_IN varchar(256)," + + " IN state_IN varchar(256)," + + " IN lastStartTime_IN bigint, IN capability_IN varchar(6000)," + + " OUT rowCount_OUT int)MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM membership WHERE (subClusterId = subClusterId_IN);" + + " INSERT INTO membership ( subClusterId," + + " amRMServiceAddress, clientRMServiceAddress," + + " rmAdminServiceAddress, rmWebServiceAddress," + + " lastHeartBeat, state, lastStartTime," + + " capability) VALUES ( subClusterId_IN," + + " amRMServiceAddress_IN, clientRMServiceAddress_IN," + + " rmAdminServiceAddress_IN, rmWebServiceAddress_IN," + + " NOW(), state_IN, lastStartTime_IN, capability_IN);" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_DEREGISTERSUBCLUSTER = + "CREATE PROCEDURE sp_deregisterSubCluster(" + + " IN subClusterId_IN varchar(256)," + + " IN state_IN varchar(64), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " UPDATE membership SET state = state_IN WHERE (" + + " subClusterId = subClusterId_IN AND state != state_IN);" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_SUBCLUSTERHEARTBEAT = + "CREATE PROCEDURE sp_subClusterHeartbeat(" + + " IN subClusterId_IN varchar(256), IN state_IN varchar(64)," + + " IN capability_IN varchar(6000), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC UPDATE membership" + + " SET capability = capability_IN, state = state_IN" + + " WHERE subClusterId = subClusterId_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_GETSUBCLUSTER = + "CREATE PROCEDURE sp_getSubCluster( IN subClusterId_IN varchar(256)," + + " OUT amRMServiceAddress_OUT varchar(256)," + + " OUT clientRMServiceAddress_OUT varchar(256)," + + " OUT rmAdminServiceAddress_OUT varchar(256)," + + " OUT rmWebServiceAddress_OUT varchar(256)," + + " OUT lastHeartBeat_OUT datetime, OUT state_OUT varchar(64)," + + " OUT lastStartTime_OUT bigint," + + " OUT capability_OUT varchar(6000))" + + " MODIFIES SQL DATA BEGIN ATOMIC SELECT amRMServiceAddress," + + " clientRMServiceAddress," + + " rmAdminServiceAddress, rmWebServiceAddress," + + " lastHeartBeat, state, lastStartTime, capability" + + " INTO amRMServiceAddress_OUT, clientRMServiceAddress_OUT," + + " rmAdminServiceAddress_OUT," + + " rmWebServiceAddress_OUT, lastHeartBeat_OUT," + + " state_OUT, lastStartTime_OUT, capability_OUT" + + " FROM membership WHERE subClusterId = subClusterId_IN; END"; + + private static final String SP_GETSUBCLUSTERS = + "CREATE PROCEDURE sp_getSubClusters()" + + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC" + + " DECLARE result CURSOR FOR" + + " SELECT * FROM membership; OPEN result; END"; + + private static final String SP_ADDAPPLICATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_addApplicationHomeSubCluster(" + + " IN applicationId_IN varchar(64)," + + " IN homeSubCluster_IN varchar(256)," + + " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " INSERT INTO applicationsHomeSubCluster (applicationId,homeSubCluster)" + + " (SELECT applicationId_IN, homeSubCluster_IN" + + " FROM applicationsHomeSubCluster" + + " WHERE applicationId = applicationId_IN" + + " HAVING COUNT(*) = 0 );" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;" + + " SELECT homeSubCluster INTO storedHomeSubCluster_OUT" + + " FROM applicationsHomeSubCluster" + + " WHERE applicationId = applicationID_IN; END"; + + private static final String SP_UPDATEAPPLICATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_updateApplicationHomeSubCluster(" + + " IN applicationId_IN varchar(64)," + + " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " UPDATE applicationsHomeSubCluster" + + " SET homeSubCluster = homeSubCluster_IN" + + " WHERE applicationId = applicationId_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_GETAPPLICATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_getApplicationHomeSubCluster(" + + " IN applicationId_IN varchar(64)," + + " OUT homeSubCluster_OUT varchar(256))" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " SELECT homeSubCluster INTO homeSubCluster_OUT" + + " FROM applicationsHomeSubCluster" + + " WHERE applicationId = applicationID_IN; END"; + + private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER = + "CREATE PROCEDURE sp_getApplicationsHomeSubCluster()" + + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC" + + " DECLARE result CURSOR FOR" + + " SELECT applicationId, homeSubCluster" + + " FROM applicationsHomeSubCluster; OPEN result; END"; + + private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER = + "CREATE PROCEDURE sp_deleteApplicationHomeSubCluster(" + + " IN applicationId_IN varchar(64), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM applicationsHomeSubCluster" + + " WHERE applicationId = applicationId_IN;" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_SETPOLICYCONFIGURATION = + "CREATE PROCEDURE sp_setPolicyConfiguration(" + + " IN queue_IN varchar(256), IN policyType_IN varchar(256)," + + " IN params_IN varbinary(512), OUT rowCount_OUT int)" + + " MODIFIES SQL DATA BEGIN ATOMIC" + + " DELETE FROM policies WHERE queue = queue_IN;" + + " INSERT INTO policies (queue, policyType, params)" + + " VALUES (queue_IN, policyType_IN, params_IN);" + + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END"; + + private static final String SP_GETPOLICYCONFIGURATION = + "CREATE PROCEDURE sp_getPolicyConfiguration(" + + " IN queue_IN varchar(256), OUT policyType_OUT varchar(256)," + + " OUT params_OUT varbinary(512)) MODIFIES SQL DATA BEGIN ATOMIC" + + " SELECT policyType, params INTO policyType_OUT, params_OUT" + + " FROM policies WHERE queue = queue_IN; END"; + + private static final String SP_GETPOLICIESCONFIGURATIONS = + "CREATE PROCEDURE sp_getPoliciesConfigurations()" + + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC" + + " DECLARE result CURSOR FOR" + + " SELECT * FROM policies; OPEN result; END"; + + public HSQLDBFederationStateStore() { + super(); + } + + @Override + public void init(Configuration conf) { + try { + super.init(conf); + } catch (YarnException e1) { + } + try { + conn = getConnection(); + + LOG.info("Database Init: Start"); + + conn.prepareStatement(TABLE_APPLICATIONSHOMESUBCLUSTER).execute(); + conn.prepareStatement(TABLE_MEMBERSHIP).execute(); + conn.prepareStatement(TABLE_POLICIES).execute(); + + conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute(); + conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute(); + conn.prepareStatement(SP_SUBCLUSTERHEARTBEAT).execute(); + conn.prepareStatement(SP_GETSUBCLUSTER).execute(); + conn.prepareStatement(SP_GETSUBCLUSTERS).execute(); + + conn.prepareStatement(SP_ADDAPPLICATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_UPDATEAPPLICATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_GETAPPLICATIONHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_GETAPPLICATIONSHOMESUBCLUSTER).execute(); + conn.prepareStatement(SP_DELETEAPPLICATIONHOMESUBCLUSTER).execute(); + + conn.prepareStatement(SP_SETPOLICYCONFIGURATION).execute(); + conn.prepareStatement(SP_GETPOLICYCONFIGURATION).execute(); + conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute(); + + LOG.info("Database Init: Complete"); + conn.close(); + } catch (SQLException e) { + LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage()); + } + } + + public void closeConnection() { + try { + conn.close(); + } catch (SQLException e) { + LOG.error( + "ERROR: failed to close connection to HSQLDB DB " + e.getMessage()); + } + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java index 64adab8..55aa381 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java @@ -28,7 +28,7 @@ @Override protected FederationStateStore createStateStore() { - super.setConf(new Configuration()); + conf = new Configuration(); return new MemoryFederationStateStore(); } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java new file mode 100644 index 0000000..d87a002 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.federation.store.impl; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.federation.store.FederationStateStore; + +/** + * Unit tests for SQLFederationStateStore. + */ +public class TestSQLFederationStateStore extends FederationStateStoreBaseTest { + + private static final String HSQLDB_DRIVER = "org.hsqldb.jdbc.JDBCDataSource"; + private static final String DATABASE_URL = "jdbc:hsqldb:mem:state"; + private static final String DATABASE_USERNAME = "SA"; + private static final String DATABASE_PASSWORD = ""; + + @Override + protected FederationStateStore createStateStore() { + + conf = new YarnConfiguration(); + + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS, + HSQLDB_DRIVER); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME, + DATABASE_USERNAME); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD, + DATABASE_PASSWORD); + conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL, + DATABASE_URL + System.currentTimeMillis()); + + return new HSQLDBFederationStateStore(); + } +} \ No newline at end of file