diff --git a/BUILDING.txt b/BUILDING.txt index cb3d68edfff..9d40d24524e 100644 --- a/BUILDING.txt +++ b/BUILDING.txt @@ -422,47 +422,6 @@ Building command example: Note that the command above manually specified the openssl library and include path. This is necessary at least for Homebrewed OpenSSL. - ----------------------------------------------------------------------------------- - -Building on CentOS 8 - ----------------------------------------------------------------------------------- - - -* Install development tools such as GCC, autotools, OpenJDK and Maven. - $ sudo dnf group install 'Development Tools' - $ sudo dnf install java-1.8.0-openjdk-devel maven - -* Install Protocol Buffers v2.5.0. - $ git clone https://github.com/protocolbuffers/protobuf - $ cd protobuf - $ git checkout v2.5.0 - $ autoreconf -i - $ ./configure --prefix=/usr/local - $ make - $ sudo make install - $ cd .. - -* Install libraries provided by CentOS 8. - $ sudo dnf install libtirpc-devel zlib-devel lz4-devel bzip2-devel openssl-devel cyrus-sasl-devel libpmem-devel - -* Install optional dependencies (snappy-devel). - $ sudo dnf --enablerepo=PowerTools snappy-devel - -* Install optional dependencies (libzstd-devel). - $ sudo dnf install https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm - $ sudo dnf --enablerepo=epel install libzstd-devel - -* Install optional dependencies (isa-l). - $ sudo dnf --enablerepo=PowerTools install nasm - $ git clone https://github.com/intel/isa-l - $ cd isa-l/ - $ ./autogen.sh - $ ./configure - $ make - $ sudo make install - ---------------------------------------------------------------------------------- Building on Windows diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/crypto/random/OpensslSecureRandom.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/crypto/random/OpensslSecureRandom.c index 3f141be05b5..26e1fa623e8 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/crypto/random/OpensslSecureRandom.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/crypto/random/OpensslSecureRandom.c @@ -42,18 +42,16 @@ #ifdef UNIX static void * (*dlsym_CRYPTO_malloc) (int, const char *, int); static void (*dlsym_CRYPTO_free) (void *); -#if OPENSSL_VERSION_NUMBER < 0x10100000L static int (*dlsym_CRYPTO_num_locks) (void); static void (*dlsym_CRYPTO_set_locking_callback) (void (*)()); static void (*dlsym_CRYPTO_set_id_callback) (unsigned long (*)()); static void (*dlsym_ENGINE_load_rdrand) (void); -static void (*dlsym_ENGINE_cleanup) (void); -#endif static ENGINE * (*dlsym_ENGINE_by_id) (const char *); static int (*dlsym_ENGINE_init) (ENGINE *); static int (*dlsym_ENGINE_set_default) (ENGINE *, unsigned int); static int (*dlsym_ENGINE_finish) (ENGINE *); static int (*dlsym_ENGINE_free) (ENGINE *); +static void (*dlsym_ENGINE_cleanup) (void); static int (*dlsym_RAND_bytes) (unsigned char *, int); static unsigned long (*dlsym_ERR_get_error) (void); #endif @@ -115,8 +113,6 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_crypto_random_OpensslSecureRandom_ dlerror(); // Clear any existing error LOAD_DYNAMIC_SYMBOL(dlsym_CRYPTO_malloc, env, openssl, "CRYPTO_malloc"); LOAD_DYNAMIC_SYMBOL(dlsym_CRYPTO_free, env, openssl, "CRYPTO_free"); -#if OPENSSL_VERSION_NUMBER < 0x10100000L - // pre-1.1.0 LOAD_DYNAMIC_SYMBOL(dlsym_CRYPTO_num_locks, env, openssl, "CRYPTO_num_locks"); LOAD_DYNAMIC_SYMBOL(dlsym_CRYPTO_set_locking_callback, \ env, openssl, "CRYPTO_set_locking_callback"); @@ -124,14 +120,13 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_crypto_random_OpensslSecureRandom_ openssl, "CRYPTO_set_id_callback"); LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_load_rdrand, env, \ openssl, "ENGINE_load_rdrand"); - LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_cleanup, env, openssl, "ENGINE_cleanup"); -#endif LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_by_id, env, openssl, "ENGINE_by_id"); LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_init, env, openssl, "ENGINE_init"); LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_set_default, env, \ openssl, "ENGINE_set_default"); LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_finish, env, openssl, "ENGINE_finish"); LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_free, env, openssl, "ENGINE_free"); + LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_cleanup, env, openssl, "ENGINE_cleanup"); LOAD_DYNAMIC_SYMBOL(dlsym_RAND_bytes, env, openssl, "RAND_bytes"); LOAD_DYNAMIC_SYMBOL(dlsym_ERR_get_error, env, openssl, "ERR_get_error"); #endif @@ -308,11 +303,9 @@ static unsigned long pthreads_thread_id(void) */ static ENGINE * openssl_rand_init(void) { -#if OPENSSL_VERSION_NUMBER < 0x10100000L locks_setup(); dlsym_ENGINE_load_rdrand(); -#endif ENGINE *eng = dlsym_ENGINE_by_id("rdrand"); int ret = -1; @@ -347,12 +340,11 @@ static void openssl_rand_clean(ENGINE *eng, int clean_locks) dlsym_ENGINE_finish(eng); dlsym_ENGINE_free(eng); } -#if OPENSSL_VERSION_NUMBER < 0x10100000L + dlsym_ENGINE_cleanup(); if (clean_locks) { locks_cleanup(); } -#endif } static int openssl_rand_bytes(unsigned char *buf, int num) diff --git a/hadoop-tools/hadoop-pipes/src/CMakeLists.txt b/hadoop-tools/hadoop-pipes/src/CMakeLists.txt index ce6ee317936..ff660bfafce 100644 --- a/hadoop-tools/hadoop-pipes/src/CMakeLists.txt +++ b/hadoop-tools/hadoop-pipes/src/CMakeLists.txt @@ -22,25 +22,6 @@ list(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/../../../hadoop-common-project include(HadoopCommon) find_package(OpenSSL REQUIRED) -find_package(PkgConfig QUIET) -pkg_check_modules(LIBTIRPC libtirpc) - -find_path(RPC_INCLUDE_DIRS NAMES rpc/rpc.h) - -if (NOT RPC_INCLUDE_DIRS) - find_path(TIRPC_INCLUDE_DIRS - NAMES netconfig.h - PATH_SUFFIXES tirpc - HINTS ${LIBTIRPC_INCLUDE_DIRS} - ) - - find_library(TIRPC_LIBRARIES - NAMES tirpc - HINTS ${LIBTIRPC_LIBRARY_DIRS} - ) - - include_directories(${TIRPC_INCLUDE_DIRS}) -endif() include_directories( main/native/utils/api @@ -70,9 +51,6 @@ add_library(hadooputils STATIC main/native/utils/impl/StringUtils.cc main/native/utils/impl/SerialUtils.cc ) -if (NOT RPC_INCLUDE_DIRS AND LIBTIRPC_FOUND) - target_link_libraries(hadooputils tirpc) -endif() add_library(hadooppipes STATIC main/native/pipes/impl/HadoopPipes.cc diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml index 39ec605096d..af0240f7713 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml @@ -140,6 +140,16 @@ curator-test test + + com.sun.jersey.jersey-test-framework + jersey-test-framework-core + + + com.sun.jersey.jersey-test-framework + jersey-test-framework-core + 1.19 + test + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c index a3057e63998..c6187d9a222 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c @@ -72,7 +72,7 @@ static void display_usage(FILE *stream) { " initialize container: %2d appid tokens nm-local-dirs " "nm-log-dirs cmd app...\n" " launch container: %2d appid containerid workdir " - "container-script tokens pidfile nm-local-dirs nm-log-dirs resources ", + "container-script tokens http-option pidfile nm-local-dirs nm-log-dirs resources ", INITIALIZE_CONTAINER, LAUNCH_CONTAINER); if(is_tc_support_enabled()) { @@ -81,17 +81,16 @@ static void display_usage(FILE *stream) { fprintf(stream, "\n"); } - if(is_docker_support_enabled()) { - fprintf(stream, - " launch docker container: %2d appid containerid workdir " - "container-script tokens pidfile nm-local-dirs nm-log-dirs " - "docker-command-file resources ", LAUNCH_DOCKER_CONTAINER); - } else { - fprintf(stream, - "[DISABLED] launch docker container: %2d appid containerid workdir " - "container-script tokens pidfile nm-local-dirs nm-log-dirs " - "docker-command-file resources ", LAUNCH_DOCKER_CONTAINER); - } + fputs( + " where http-option is one of:\n" + " --http\n" + " --https keystorepath truststorepath\n", stream); + + de = is_docker_support_enabled() ? enabled : disabled; + fprintf(stream, + "%11s launch docker container:%2d appid containerid workdir " + "container-script tokens http-option pidfile nm-local-dirs nm-log-dirs " + "docker-command-file resources ", de, LAUNCH_DOCKER_CONTAINER); if(is_tc_support_enabled()) { fprintf(stream, "optional-tc-command-file\n"); @@ -99,7 +98,12 @@ static void display_usage(FILE *stream) { fprintf(stream, "\n"); } - fprintf(stream, + fputs( + " where http-option is one of:\n" + " --http\n" + " --https keystorepath truststorepath\n", stream); + + fprintf(stream, " signal container: %2d container-pid signal\n" " delete as user: %2d relative-path\n" " list as user: %2d relative-path\n", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DBManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DBManager.java new file mode 100644 index 00000000000..13529baebe7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DBManager.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; +import org.fusesource.leveldbjni.JniDBFactory; +import org.fusesource.leveldbjni.internal.NativeDB; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBException; +import org.iq80.leveldb.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.Timer; +import java.util.TimerTask; +import java.util.function.Consumer; + +import static org.fusesource.leveldbjni.JniDBFactory.bytes; + +public class DBManager implements Closeable { + public static final Logger LOG = + LoggerFactory.getLogger(DBManager.class); + private DB db; + private Timer compactionTimer; + + public DB initDatabase(File configurationFile, Options options, + Consumer initMethod) throws Exception { + try { + db = JniDBFactory.factory.open(configurationFile, options); + } catch (NativeDB.DBException e) { + if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { + LOG.info("Creating configuration version/database at {}", + configurationFile); + options.createIfMissing(true); + try { + db = JniDBFactory.factory.open(configurationFile, options); + initMethod.accept(db); + } catch (DBException dbErr) { + throw new IOException(dbErr.getMessage(), dbErr); + } + } else { + throw e; + } + } + + return db; + } + + public void close() throws IOException { + if (compactionTimer != null) { + compactionTimer.cancel(); + compactionTimer = null; + } + if (db != null) { + db.close(); + db = null; + } + } + + public void storeVersion(String versionKey, Version versionValue) { + byte[] data = ((VersionPBImpl) versionValue).getProto().toByteArray(); + db.put(bytes(versionKey), data); + } + + public Version loadVersion(String versionKey) throws Exception { + Version version = null; + try { + byte[] data = db.get(bytes(versionKey)); + if (data != null) { + version = new VersionPBImpl(YarnServerCommonProtos.VersionProto + .parseFrom(data)); + } + } catch (DBException e) { + throw new IOException(e); + } + return version; + } + + @VisibleForTesting + public void setDb(DB db) { + this.db = db; + } + + public void startCompactionTimer(long compactionIntervalMsec, + String className) { + if (compactionIntervalMsec > 0) { + compactionTimer = new Timer( + className + " compaction timer", true); + compactionTimer.schedule(new CompactionTimerTask(), + compactionIntervalMsec, compactionIntervalMsec); + } + } + + private class CompactionTimerTask extends TimerTask { + @Override + public void run() { + long start = Time.monotonicNow(); + LOG.info("Starting full compaction cycle"); + try { + db.compactRange(null, null); + } catch (DBException e) { + LOG.error("Error compacting database", e); + } + long duration = Time.monotonicNow() - start; + LOG.info("Full compaction cycle completed in " + duration + " msec"); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index a2c8dd2cfa8..6076112600a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -29,9 +29,8 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map.Entry; -import java.util.Timer; -import java.util.TimerTask; +import org.apache.hadoop.yarn.server.resourcemanager.DBManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -40,13 +39,11 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.token.delegation.DelegationKey; -import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; @@ -54,7 +51,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; -import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -64,8 +60,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; -import org.fusesource.leveldbjni.JniDBFactory; -import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBException; import org.iq80.leveldb.Options; @@ -98,7 +92,7 @@ .newInstance(1, 1); private DB db; - private Timer compactionTimer; + private DBManager dbManager = new DBManager(); private long compactionIntervalMsec; private String getApplicationNodeKey(ApplicationId appId) { @@ -130,7 +124,7 @@ private String getReservationNodeKey(String planName, } @Override - protected void initInternal(Configuration conf) throws Exception { + protected void initInternal(Configuration conf) { compactionIntervalMsec = conf.getLong( YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000; @@ -155,55 +149,20 @@ private Path createStorageDir() throws IOException { @Override protected void startInternal() throws Exception { - db = openDatabase(); - startCompactionTimer(); - } - - protected DB openDatabase() throws Exception { Path storeRoot = createStorageDir(); Options options = new Options(); options.createIfMissing(false); LOG.info("Using state database at " + storeRoot + " for recovery"); File dbfile = new File(storeRoot.toString()); - try { - db = JniDBFactory.factory.open(dbfile, options); - } catch (NativeDB.DBException e) { - if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { - LOG.info("Creating state database at " + dbfile); - options.createIfMissing(true); - try { - db = JniDBFactory.factory.open(dbfile, options); - // store version - storeVersion(); - } catch (DBException dbErr) { - throw new IOException(dbErr.getMessage(), dbErr); - } - } else { - throw e; - } - } - return db; - } - - private void startCompactionTimer() { - if (compactionIntervalMsec > 0) { - compactionTimer = new Timer( - this.getClass().getSimpleName() + " compaction timer", true); - compactionTimer.schedule(new CompactionTimerTask(), - compactionIntervalMsec, compactionIntervalMsec); - } + db = dbManager.initDatabase(dbfile, options, (database) -> + storeVersion(CURRENT_VERSION_INFO)); + dbManager.startCompactionTimer(compactionIntervalMsec, + this.getClass().getSimpleName()); } @Override protected void closeInternal() throws Exception { - if (compactionTimer != null) { - compactionTimer.cancel(); - compactionTimer = null; - } - if (db != null) { - db.close(); - db = null; - } + dbManager.close(); } @VisibleForTesting @@ -218,33 +177,22 @@ DB getDatabase() { @Override protected Version loadVersion() throws Exception { - Version version = null; - try { - byte[] data = db.get(bytes(VERSION_NODE)); - if (data != null) { - version = new VersionPBImpl(VersionProto.parseFrom(data)); - } - } catch (DBException e) { - throw new IOException(e); - } - return version; + return dbManager.loadVersion(VERSION_NODE); } @Override protected void storeVersion() throws Exception { - dbStoreVersion(CURRENT_VERSION_INFO); - } - - void dbStoreVersion(Version state) throws IOException { - String key = VERSION_NODE; - byte[] data = ((VersionPBImpl) state).getProto().toByteArray(); try { - db.put(bytes(key), data); + storeVersion(CURRENT_VERSION_INFO); } catch (DBException e) { throw new IOException(e); } } + protected void storeVersion(Version version) { + dbManager.storeVersion(VERSION_NODE, version); + } + @Override protected Version getCurrentVersion() { return CURRENT_VERSION_INFO; @@ -279,9 +227,7 @@ public RMState loadState() throws Exception { private void loadReservationState(RMState rmState) throws IOException { int numReservations = 0; - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); + try (LeveldbIterator iter = new LeveldbIterator(db)) { iter.seek(bytes(RM_RESERVATION_KEY_PREFIX)); while (iter.hasNext()) { Entry entry = iter.next(); @@ -313,10 +259,6 @@ private void loadReservationState(RMState rmState) throws IOException { } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } LOG.info("Recovered " + numReservations + " reservations"); } @@ -331,9 +273,7 @@ private void loadRMDTSecretManagerState(RMState state) throws IOException { private int loadRMDTSecretManagerKeys(RMState state) throws IOException { int numKeys = 0; - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); + try (LeveldbIterator iter = new LeveldbIterator(db)) { iter.seek(bytes(RM_DT_MASTER_KEY_KEY_PREFIX)); while (iter.hasNext()) { Entry entry = iter.next(); @@ -352,10 +292,6 @@ private int loadRMDTSecretManagerKeys(RMState state) throws IOException { } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } return numKeys; } @@ -373,9 +309,7 @@ private DelegationKey loadDelegationKey(byte[] data) throws IOException { private int loadRMDTSecretManagerTokens(RMState state) throws IOException { int numTokens = 0; - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); + try (LeveldbIterator iter = new LeveldbIterator(db)) { iter.seek(bytes(RM_DT_TOKEN_KEY_PREFIX)); while (iter.hasNext()) { Entry entry = iter.next(); @@ -397,17 +331,13 @@ private int loadRMDTSecretManagerTokens(RMState state) throws IOException { } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } return numTokens; } private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data) throws IOException { - RMDelegationTokenIdentifierData tokenData = null; + RMDelegationTokenIdentifierData tokenData; DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); try { tokenData = RMStateStoreUtils.readRMDelegationTokenIdentifierData(in); @@ -419,7 +349,7 @@ private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data) private void loadRMDTSecretManagerTokenSequenceNumber(RMState state) throws IOException { - byte[] data = null; + byte[] data; try { data = db.get(bytes(RM_DT_SEQUENCE_NUMBER_KEY)); } catch (DBException e) { @@ -438,9 +368,7 @@ private void loadRMDTSecretManagerTokenSequenceNumber(RMState state) private void loadRMApps(RMState state) throws IOException { int numApps = 0; int numAppAttempts = 0; - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); + try (LeveldbIterator iter = new LeveldbIterator(db)) { iter.seek(bytes(RM_APP_KEY_PREFIX)); while (iter.hasNext()) { Entry entry = iter.next(); @@ -460,10 +388,6 @@ private void loadRMApps(RMState state) throws IOException { } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } LOG.info("Recovered " + numApps + " applications and " + numAppAttempts + " application attempts"); @@ -519,7 +443,7 @@ private ApplicationStateData createApplicationState(String appIdStr, @VisibleForTesting ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException { String appKey = getApplicationNodeKey(appId); - byte[] data = null; + byte[] data; try { data = db.get(bytes(appKey)); } catch (DBException e) { @@ -535,7 +459,7 @@ ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException { ApplicationAttemptStateData loadRMAppAttemptState( ApplicationAttemptId attemptId) throws IOException { String attemptKey = getApplicationAttemptNodeKey(attemptId); - byte[] data = null; + byte[] data; try { data = db.get(bytes(attemptKey)); } catch (DBException e) { @@ -643,8 +567,7 @@ protected void removeApplicationStateInternal(ApplicationStateData appState) appState.getApplicationSubmissionContext().getApplicationId(); String appKey = getApplicationNodeKey(appId); try { - WriteBatch batch = db.createWriteBatch(); - try { + try (WriteBatch batch = db.createWriteBatch()) { batch.delete(bytes(appKey)); for (ApplicationAttemptId attemptId : appState.attempts.keySet()) { String attemptKey = getApplicationAttemptNodeKey(appKey, attemptId); @@ -655,8 +578,6 @@ protected void removeApplicationStateInternal(ApplicationStateData appState) + appState.attempts.size() + " attempts" + " at " + appKey); } db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { throw new IOException(e); @@ -668,8 +589,7 @@ protected void storeReservationState( ReservationAllocationStateProto reservationAllocation, String planName, String reservationIdName) throws Exception { try { - WriteBatch batch = db.createWriteBatch(); - try { + try (WriteBatch batch = db.createWriteBatch()) { String key = getReservationNodeKey(planName, reservationIdName); if (LOG.isDebugEnabled()) { LOG.debug("Storing state for reservation " + reservationIdName @@ -677,8 +597,6 @@ protected void storeReservationState( } batch.put(bytes(key), reservationAllocation.toByteArray()); db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { throw new IOException(e); @@ -689,8 +607,7 @@ protected void storeReservationState( protected void removeReservationState(String planName, String reservationIdName) throws Exception { try { - WriteBatch batch = db.createWriteBatch(); - try { + try (WriteBatch batch = db.createWriteBatch()) { String reservationKey = getReservationNodeKey(planName, reservationIdName); batch.delete(bytes(reservationKey)); @@ -699,8 +616,6 @@ protected void removeReservationState(String planName, + " plan " + planName + " at " + reservationKey); } db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { throw new IOException(e); @@ -716,23 +631,20 @@ private void storeOrUpdateRMDT(RMDelegationTokenIdentifier tokenId, LOG.debug("Storing token to " + tokenKey); } try { - WriteBatch batch = db.createWriteBatch(); - try { + try (WriteBatch batch = db.createWriteBatch()) { batch.put(bytes(tokenKey), tokenData.toByteArray()); - if(!isUpdate) { + if (!isUpdate) { ByteArrayOutputStream bs = new ByteArrayOutputStream(); try (DataOutputStream ds = new DataOutputStream(bs)) { ds.writeInt(tokenId.getSequenceNumber()); } if (LOG.isDebugEnabled()) { LOG.debug("Storing " + tokenId.getSequenceNumber() + " to " - + RM_DT_SEQUENCE_NUMBER_KEY); + + RM_DT_SEQUENCE_NUMBER_KEY); } batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray()); } db.write(batch); - } finally { - batch.close(); } } catch (DBException e) { throw new IOException(e); @@ -775,11 +687,8 @@ protected void storeRMDTMasterKeyState(DelegationKey masterKey) LOG.debug("Storing token master key to " + dbKey); } ByteArrayOutputStream os = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(os); - try { + try (DataOutputStream out = new DataOutputStream(os)) { masterKey.write(out); - } finally { - out.close(); } try { db.put(bytes(dbKey), os.toByteArray()); @@ -836,9 +745,7 @@ public synchronized void removeApplication(ApplicationId removeAppId) @VisibleForTesting int getNumEntriesInDatabase() throws IOException { int numEntries = 0; - LeveldbIterator iter = null; - try { - iter = new LeveldbIterator(db); + try (LeveldbIterator iter = new LeveldbIterator(db)) { iter.seekToFirst(); while (iter.hasNext()) { Entry entry = iter.next(); @@ -847,26 +754,12 @@ int getNumEntriesInDatabase() throws IOException { } } catch (DBException e) { throw new IOException(e); - } finally { - if (iter != null) { - iter.close(); - } } return numEntries; } - private class CompactionTimerTask extends TimerTask { - @Override - public void run() { - long start = Time.monotonicNow(); - LOG.info("Starting full compaction cycle"); - try { - db.compactRange(null, null); - } catch (DBException e) { - LOG.error("Error compacting database", e); - } - long duration = Time.monotonicNow() - start; - LOG.info("Full compaction cycle completed in " + duration + " msec"); - } + @VisibleForTesting + protected void setDbManager(DBManager dbManager) { + this.dbManager = dbManager; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java index 2605e0e1462..b347aba9b4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java @@ -19,20 +19,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.resourcemanager.DBManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; import org.apache.hadoop.yarn.server.records.Version; -import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.fusesource.leveldbjni.JniDBFactory; -import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBComparator; import org.iq80.leveldb.DBException; @@ -52,9 +48,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.function.Consumer; import static org.fusesource.leveldbjni.JniDBFactory.bytes; @@ -73,6 +66,8 @@ private static final String CONF_VERSION_KEY = "conf-version"; private DB db; + private DBManager dbManager; + private DBManager versionDbManager; private DB versionDb; private long maxLogs; private Configuration conf; @@ -81,23 +76,25 @@ @VisibleForTesting protected static final Version CURRENT_VERSION_INFO = Version .newInstance(0, 1); - private long compactionIntervalMsec; @Override public void initialize(Configuration config, Configuration schedConf, RMContext rmContext) throws IOException { this.conf = config; this.initSchedConf = schedConf; + this.dbManager = new DBManager(); + this.versionDbManager = new DBManager(); try { initDatabase(); this.maxLogs = config.getLong( YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS); - this.compactionIntervalMsec = config.getLong( + long compactionIntervalMsec = config.getLong( YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS, YarnConfiguration .DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000; - startCompactionTimer(); + dbManager.startCompactionTimer(compactionIntervalMsec, + this.getClass().getSimpleName()); } catch (Exception e) { throw new IOException(e); } @@ -116,7 +113,7 @@ private void initDatabase() throws Exception { confOptions.createIfMissing(false); File confVersionFile = new File(confVersion.toString()); - versionDb = initDatabaseHelper(confVersionFile, confOptions, + versionDb = versionDbManager.initDatabase(confVersionFile, confOptions, this::initVersionDb); Path storeRoot = createStorageDir(DB_NAME); @@ -156,7 +153,7 @@ public String name() { }); LOG.info("Using conf database at {}", storeRoot); File dbFile = new File(storeRoot.toString()); - db = initDatabaseHelper(dbFile, options, this::initDb); + db = dbManager.initDatabase(dbFile, options, this::initDb); } private void initVersionDb(DB database) { @@ -172,30 +169,6 @@ private void initDb(DB database) { increaseConfigVersion(); } - private DB initDatabaseHelper(File configurationFile, Options options, - Consumer initMethod) throws Exception { - DB database; - try { - database = JniDBFactory.factory.open(configurationFile, options); - } catch (NativeDB.DBException e) { - if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { - LOG.info("Creating configuration version/database at {}", - configurationFile); - options.createIfMissing(true); - try { - database = JniDBFactory.factory.open(configurationFile, options); - initMethod.accept(database); - } catch (DBException dbErr) { - throw new IOException(dbErr.getMessage(), dbErr); - } - } else { - throw e; - } - } - - return database; - } - private Path createStorageDir(String storageName) throws IOException { Path root = getStorageDir(storageName); FileSystem fs = FileSystem.getLocal(conf); @@ -214,12 +187,8 @@ private Path getStorageDir(String storageName) throws IOException { @Override public void close() throws IOException { - if (db != null) { - db.close(); - } - if (versionDb != null) { - versionDb.close(); - } + dbManager.close(); + versionDbManager.close(); } @Override @@ -313,28 +282,9 @@ public long getConfigVersion() { return null; // unimplemented } - private void startCompactionTimer() { - if (compactionIntervalMsec > 0) { - Timer compactionTimer = new Timer( - this.getClass().getSimpleName() + " compaction timer", true); - compactionTimer.schedule(new CompactionTimerTask(), - compactionIntervalMsec, compactionIntervalMsec); - } - } - @Override public Version getConfStoreVersion() throws Exception { - Version version = null; - try { - byte[] data = db.get(bytes(VERSION_KEY)); - if (data != null) { - version = new VersionPBImpl(YarnServerCommonProtos.VersionProto - .parseFrom(data)); - } - } catch (DBException e) { - throw new IOException(e); - } - return version; + return dbManager.loadVersion(VERSION_KEY); } @VisibleForTesting @@ -345,37 +295,20 @@ public Version getConfStoreVersion() throws Exception { @Override public void storeVersion() throws Exception { - storeVersion(CURRENT_VERSION_INFO); - } - - @VisibleForTesting - protected void storeVersion(Version version) throws Exception { - byte[] data = ((VersionPBImpl) version).getProto() - .toByteArray(); try { - db.put(bytes(VERSION_KEY), data); + storeVersion(CURRENT_VERSION_INFO); } catch (DBException e) { throw new IOException(e); } } + @VisibleForTesting + protected void storeVersion(Version version) { + dbManager.storeVersion(VERSION_KEY, version); + } + @Override public Version getCurrentVersion() { return CURRENT_VERSION_INFO; } - - private class CompactionTimerTask extends TimerTask { - @Override - public void run() { - long start = Time.monotonicNow(); - LOG.info("Starting full compaction cycle"); - try { - db.compactRange(null, null); - } catch (DBException e) { - LOG.error("Error compacting database", e); - } - long duration = Time.monotonicNow() - start; - LOG.info("Full compaction cycle completed in {} msec", duration); - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java index 576ee7f38d6..68409534aa3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java @@ -25,14 +25,17 @@ import java.io.File; import java.io.IOException; +import java.util.function.Consumer; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.DBManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.fusesource.leveldbjni.JniDBFactory; import org.iq80.leveldb.DB; +import org.iq80.leveldb.Options; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -125,15 +128,19 @@ public void testReservation() throws Exception { } @Test(timeout = 60000) - public void testCompactionCycle() throws Exception { + public void testCompactionCycle() { final DB mockdb = mock(DB.class); conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1); - stateStore = new LeveldbRMStateStore() { + stateStore = new LeveldbRMStateStore(); + DBManager dbManager = new DBManager() { @Override - protected DB openDatabase() throws Exception { + public DB initDatabase(File configurationFile, Options options, + Consumer initMethod) { return mockdb; } }; + dbManager.setDb(mockdb); + stateStore.setDbManager(dbManager); stateStore.init(conf); stateStore.start(); verify(mockdb, timeout(10000)).compactRange( @@ -175,12 +182,12 @@ public boolean isFinalStateValid() throws Exception { } @Override - public void writeVersion(Version version) throws Exception { - stateStore.dbStoreVersion(version); + public void writeVersion(Version version) { + stateStore.storeVersion(version); } @Override - public Version getCurrentVersion() throws Exception { + public Version getCurrentVersion() { return stateStore.getCurrentVersion(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java index b8a6bdafbe1..ed1159c3d30 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java @@ -79,24 +79,12 @@ public void testNullConfigurationUpdate() throws Exception { confStore.close(); } - void prepareLogMutation(String... values) + void prepareLogMutation(String key, String value) throws Exception { - Map updates = new HashMap<>(); - String key; - String value; - - if (values.length % 2 != 0) { - throw new IllegalArgumentException("The number of parameters should be " + - "even."); - } - - for (int i = 1; i <= values.length; i += 2) { - key = values[i - 1]; - value = values[i]; - updates.put(key, value); - } + Map update = new HashMap<>(); + update.put(key, value); YarnConfigurationStore.LogMutation mutation = - new YarnConfigurationStore.LogMutation(updates, TEST_USER); + new YarnConfigurationStore.LogMutation(update, TEST_USER); confStore.logMutation(mutation); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java index bd30c30567f..dfb5a29fe70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assume.assumeFalse; /** * Base class for the persistent {@link YarnConfigurationStore} @@ -97,9 +96,6 @@ public void testVersion() throws Exception { @Test public void testMaxLogs() throws Exception { - assumeFalse("test should be skipped for TestFSSchedulerConfigurationStore", - this instanceof TestFSSchedulerConfigurationStore); - conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2); confStore.initialize(conf, schedConf, rmContext); LinkedList logs = confStore.getLogs(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java index 87a6a3da5a4..c3ef164a937 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import org.apache.commons.io.FileUtils; @@ -30,34 +31,36 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; /** * Tests {@link FSSchedulerConfigurationStore}. */ -public class TestFSSchedulerConfigurationStore extends - PersistentConfigurationStoreBaseTest { +public class TestFSSchedulerConfigurationStore { + private static final String TEST_USER = "test"; + private FSSchedulerConfigurationStore configurationStore; + private Configuration conf; private File testSchedulerConfigurationDir; @Before - @Override public void setUp() throws Exception { - super.setUp(); + configurationStore = new FSSchedulerConfigurationStore(); testSchedulerConfigurationDir = new File( TestFSSchedulerConfigurationStore.class.getResource("").getPath() + FSSchedulerConfigurationStore.class.getSimpleName()); testSchedulerConfigurationDir.mkdirs(); + conf = new Configuration(); conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH, testSchedulerConfigurationDir.getAbsolutePath()); } @@ -78,15 +81,6 @@ public void tearDown() throws Exception { FileUtils.deleteDirectory(testSchedulerConfigurationDir); } - @Test - public void checkVersion() { - try { - confStore.checkVersion(); - } catch (Exception e) { - fail("checkVersion throw exception"); - } - } - @Test public void confirmMutationWithValid() throws Exception { conf.setInt( @@ -95,26 +89,26 @@ public void confirmMutationWithValid() throws Exception { conf.set("b", "b"); conf.set("c", "c"); writeConf(conf); - confStore.initialize(conf, conf, null); - Configuration storeConf = confStore.retrieve(); + configurationStore.initialize(conf, conf, null); + Configuration storeConf = configurationStore.retrieve(); compareConfig(conf, storeConf); Configuration expectConfig = new Configuration(conf); expectConfig.unset("a"); expectConfig.set("b", "bb"); - prepareLogMutation("a", null, "b", "bb"); - confStore.confirmMutation(true); - storeConf = confStore.retrieve(); + prepareParameterizedLogMutation(configurationStore, true, + "a", null, "b", "bb"); + storeConf = configurationStore.retrieve(); assertNull(storeConf.get("a")); assertEquals("bb", storeConf.get("b")); assertEquals("c", storeConf.get("c")); compareConfig(expectConfig, storeConf); - prepareLogMutation("a", null, "b", "bbb"); - confStore.confirmMutation(true); - storeConf = confStore.retrieve(); + prepareParameterizedLogMutation(configurationStore, true, + "a", null, "b", "bbb"); + storeConf = configurationStore.retrieve(); assertNull(storeConf.get("a")); assertEquals("bbb", storeConf.get("b")); assertEquals("c", storeConf.get("c")); @@ -126,53 +120,17 @@ public void confirmMutationWithInvalid() throws Exception { conf.set("b", "b"); conf.set("c", "c"); writeConf(conf); - confStore.initialize(conf, conf, null); - Configuration storeConf = confStore.retrieve(); + configurationStore.initialize(conf, conf, null); + Configuration storeConf = configurationStore.retrieve(); compareConfig(conf, storeConf); - prepareLogMutation("a", null, "b", "bb"); - confStore.confirmMutation(false); - - storeConf = confStore.retrieve(); + prepareParameterizedLogMutation(configurationStore, false, + "a", null, "b", "bb"); + storeConf = configurationStore.retrieve(); compareConfig(conf, storeConf); } - @Test - public void testConfigRetrieval() throws Exception { - Configuration schedulerConf = new Configuration(); - schedulerConf.set("a", "a"); - schedulerConf.setLong("long", 1L); - schedulerConf.setBoolean("boolean", true); - writeConf(schedulerConf); - - confStore.initialize(conf, conf, null); - Configuration storedConfig = confStore.retrieve(); - - compareConfig(schedulerConf, storedConfig); - } - - @Test - public void testFormatConfiguration() throws Exception { - Configuration persistedSchedConf = new Configuration(); - persistedSchedConf.set("a", "a"); - writeConf(persistedSchedConf); - confStore.initialize(conf, conf, null); - Configuration storedConfig = confStore.retrieve(); - assertEquals("Retrieved config should match the stored one", "a", - storedConfig.get("a")); - confStore.format(); - try { - confStore.retrieve(); - fail("Expected an IOException with message containing \"no capacity " + - "scheduler file in\" to be thrown"); - } catch (IOException e) { - assertThat("Exception message should contain the predefined string.", - e.getMessage(), - CoreMatchers.containsString("no capacity scheduler file in")); - } - } - @Test public void testFileSystemClose() throws Exception { MiniDFSCluster hdfsCluster = null; @@ -188,16 +146,18 @@ public void testFileSystemClose() throws Exception { fs.mkdirs(path); } + FSSchedulerConfigurationStore configStore = + new FSSchedulerConfigurationStore(); hdfsConfig.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH, path.toString()); - confStore.initialize(hdfsConfig, hdfsConfig, null); + configStore.initialize(hdfsConfig, hdfsConfig, null); // Close the FileSystem object and validate fs.close(); try { - prepareLogMutation("key", "val"); - confStore.confirmMutation(true); + prepareParameterizedLogMutation(configStore, true, + "testkey", "testvalue"); } catch (IOException e) { if (e.getMessage().contains("Filesystem closed")) { fail("FSSchedulerConfigurationStore failed to handle " + @@ -216,6 +176,48 @@ public void testFileSystemClose() throws Exception { } } + @Test + public void testFormatConfiguration() throws Exception { + Configuration schedulerConf = new Configuration(); + schedulerConf.set("a", "a"); + writeConf(schedulerConf); + configurationStore.initialize(conf, conf, null); + Configuration storedConfig = configurationStore.retrieve(); + assertEquals("a", storedConfig.get("a")); + configurationStore.format(); + try { + configurationStore.retrieve(); + fail("Expected an IOException with message containing \"no capacity " + + "scheduler file in\" to be thrown"); + } catch (IOException e) { + assertThat(e.getMessage(), + CoreMatchers.containsString("no capacity scheduler file in")); + } + } + + @Test + public void retrieve() throws Exception { + Configuration schedulerConf = new Configuration(); + schedulerConf.set("a", "a"); + schedulerConf.setLong("long", 1L); + schedulerConf.setBoolean("boolean", true); + writeConf(schedulerConf); + + configurationStore.initialize(conf, conf, null); + Configuration storedConfig = configurationStore.retrieve(); + + compareConfig(schedulerConf, storedConfig); + } + + @Test + public void checkVersion() { + try { + configurationStore.checkVersion(); + } catch (Exception e) { + fail("checkVersion throw exception"); + } + } + private void compareConfig(Configuration schedulerConf, Configuration storedConfig) { for (Map.Entry entry : schedulerConf) { @@ -229,13 +231,26 @@ private void compareConfig(Configuration schedulerConf, } } - @Override - public YarnConfigurationStore createConfStore() { - return new FSSchedulerConfigurationStore(); - } + private void prepareParameterizedLogMutation( + FSSchedulerConfigurationStore configStore, + boolean validityFlag, String... values) throws Exception { + Map updates = new HashMap<>(); + String key; + String value; + + if (values.length % 2 != 0) { + throw new IllegalArgumentException("The number of parameters should be " + + "even."); + } + + for (int i = 1; i <= values.length; i += 2) { + key = values[i - 1]; + value = values[i]; + updates.put(key, value); + } - @Override - Version getVersion() { - return null; + LogMutation logMutation = new LogMutation(updates, TEST_USER); + configStore.logMutation(logMutation); + configStore.confirmMutation(validityFlag); } } \ No newline at end of file