diff --git a/BUILDING.txt b/BUILDING.txt
index cb3d68edfff..9d40d24524e 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -422,47 +422,6 @@ Building command example:
Note that the command above manually specified the openssl library and include
path. This is necessary at least for Homebrewed OpenSSL.
-
-----------------------------------------------------------------------------------
-
-Building on CentOS 8
-
-----------------------------------------------------------------------------------
-
-
-* Install development tools such as GCC, autotools, OpenJDK and Maven.
- $ sudo dnf group install 'Development Tools'
- $ sudo dnf install java-1.8.0-openjdk-devel maven
-
-* Install Protocol Buffers v2.5.0.
- $ git clone https://github.com/protocolbuffers/protobuf
- $ cd protobuf
- $ git checkout v2.5.0
- $ autoreconf -i
- $ ./configure --prefix=/usr/local
- $ make
- $ sudo make install
- $ cd ..
-
-* Install libraries provided by CentOS 8.
- $ sudo dnf install libtirpc-devel zlib-devel lz4-devel bzip2-devel openssl-devel cyrus-sasl-devel libpmem-devel
-
-* Install optional dependencies (snappy-devel).
- $ sudo dnf --enablerepo=PowerTools snappy-devel
-
-* Install optional dependencies (libzstd-devel).
- $ sudo dnf install https://dl.fedoraproject.org/pub/epel/epel-release-latest-8.noarch.rpm
- $ sudo dnf --enablerepo=epel install libzstd-devel
-
-* Install optional dependencies (isa-l).
- $ sudo dnf --enablerepo=PowerTools install nasm
- $ git clone https://github.com/intel/isa-l
- $ cd isa-l/
- $ ./autogen.sh
- $ ./configure
- $ make
- $ sudo make install
-
----------------------------------------------------------------------------------
Building on Windows
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/crypto/random/OpensslSecureRandom.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/crypto/random/OpensslSecureRandom.c
index 3f141be05b5..26e1fa623e8 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/crypto/random/OpensslSecureRandom.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/crypto/random/OpensslSecureRandom.c
@@ -42,18 +42,16 @@
#ifdef UNIX
static void * (*dlsym_CRYPTO_malloc) (int, const char *, int);
static void (*dlsym_CRYPTO_free) (void *);
-#if OPENSSL_VERSION_NUMBER < 0x10100000L
static int (*dlsym_CRYPTO_num_locks) (void);
static void (*dlsym_CRYPTO_set_locking_callback) (void (*)());
static void (*dlsym_CRYPTO_set_id_callback) (unsigned long (*)());
static void (*dlsym_ENGINE_load_rdrand) (void);
-static void (*dlsym_ENGINE_cleanup) (void);
-#endif
static ENGINE * (*dlsym_ENGINE_by_id) (const char *);
static int (*dlsym_ENGINE_init) (ENGINE *);
static int (*dlsym_ENGINE_set_default) (ENGINE *, unsigned int);
static int (*dlsym_ENGINE_finish) (ENGINE *);
static int (*dlsym_ENGINE_free) (ENGINE *);
+static void (*dlsym_ENGINE_cleanup) (void);
static int (*dlsym_RAND_bytes) (unsigned char *, int);
static unsigned long (*dlsym_ERR_get_error) (void);
#endif
@@ -115,8 +113,6 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_crypto_random_OpensslSecureRandom_
dlerror(); // Clear any existing error
LOAD_DYNAMIC_SYMBOL(dlsym_CRYPTO_malloc, env, openssl, "CRYPTO_malloc");
LOAD_DYNAMIC_SYMBOL(dlsym_CRYPTO_free, env, openssl, "CRYPTO_free");
-#if OPENSSL_VERSION_NUMBER < 0x10100000L
- // pre-1.1.0
LOAD_DYNAMIC_SYMBOL(dlsym_CRYPTO_num_locks, env, openssl, "CRYPTO_num_locks");
LOAD_DYNAMIC_SYMBOL(dlsym_CRYPTO_set_locking_callback, \
env, openssl, "CRYPTO_set_locking_callback");
@@ -124,14 +120,13 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_crypto_random_OpensslSecureRandom_
openssl, "CRYPTO_set_id_callback");
LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_load_rdrand, env, \
openssl, "ENGINE_load_rdrand");
- LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_cleanup, env, openssl, "ENGINE_cleanup");
-#endif
LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_by_id, env, openssl, "ENGINE_by_id");
LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_init, env, openssl, "ENGINE_init");
LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_set_default, env, \
openssl, "ENGINE_set_default");
LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_finish, env, openssl, "ENGINE_finish");
LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_free, env, openssl, "ENGINE_free");
+ LOAD_DYNAMIC_SYMBOL(dlsym_ENGINE_cleanup, env, openssl, "ENGINE_cleanup");
LOAD_DYNAMIC_SYMBOL(dlsym_RAND_bytes, env, openssl, "RAND_bytes");
LOAD_DYNAMIC_SYMBOL(dlsym_ERR_get_error, env, openssl, "ERR_get_error");
#endif
@@ -308,11 +303,9 @@ static unsigned long pthreads_thread_id(void)
*/
static ENGINE * openssl_rand_init(void)
{
-#if OPENSSL_VERSION_NUMBER < 0x10100000L
locks_setup();
dlsym_ENGINE_load_rdrand();
-#endif
ENGINE *eng = dlsym_ENGINE_by_id("rdrand");
int ret = -1;
@@ -347,12 +340,11 @@ static void openssl_rand_clean(ENGINE *eng, int clean_locks)
dlsym_ENGINE_finish(eng);
dlsym_ENGINE_free(eng);
}
-#if OPENSSL_VERSION_NUMBER < 0x10100000L
+
dlsym_ENGINE_cleanup();
if (clean_locks) {
locks_cleanup();
}
-#endif
}
static int openssl_rand_bytes(unsigned char *buf, int num)
diff --git a/hadoop-tools/hadoop-pipes/src/CMakeLists.txt b/hadoop-tools/hadoop-pipes/src/CMakeLists.txt
index ce6ee317936..ff660bfafce 100644
--- a/hadoop-tools/hadoop-pipes/src/CMakeLists.txt
+++ b/hadoop-tools/hadoop-pipes/src/CMakeLists.txt
@@ -22,25 +22,6 @@ list(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/../../../hadoop-common-project
include(HadoopCommon)
find_package(OpenSSL REQUIRED)
-find_package(PkgConfig QUIET)
-pkg_check_modules(LIBTIRPC libtirpc)
-
-find_path(RPC_INCLUDE_DIRS NAMES rpc/rpc.h)
-
-if (NOT RPC_INCLUDE_DIRS)
- find_path(TIRPC_INCLUDE_DIRS
- NAMES netconfig.h
- PATH_SUFFIXES tirpc
- HINTS ${LIBTIRPC_INCLUDE_DIRS}
- )
-
- find_library(TIRPC_LIBRARIES
- NAMES tirpc
- HINTS ${LIBTIRPC_LIBRARY_DIRS}
- )
-
- include_directories(${TIRPC_INCLUDE_DIRS})
-endif()
include_directories(
main/native/utils/api
@@ -70,9 +51,6 @@ add_library(hadooputils STATIC
main/native/utils/impl/StringUtils.cc
main/native/utils/impl/SerialUtils.cc
)
-if (NOT RPC_INCLUDE_DIRS AND LIBTIRPC_FOUND)
- target_link_libraries(hadooputils tirpc)
-endif()
add_library(hadooppipes STATIC
main/native/pipes/impl/HadoopPipes.cc
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 39ec605096d..af0240f7713 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -140,6 +140,16 @@
curator-test
test
+
+ com.sun.jersey.jersey-test-framework
+ jersey-test-framework-core
+
+
+ com.sun.jersey.jersey-test-framework
+ jersey-test-framework-core
+ 1.19
+ test
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
index a3057e63998..c6187d9a222 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
@@ -72,7 +72,7 @@ static void display_usage(FILE *stream) {
" initialize container: %2d appid tokens nm-local-dirs "
"nm-log-dirs cmd app...\n"
" launch container: %2d appid containerid workdir "
- "container-script tokens pidfile nm-local-dirs nm-log-dirs resources ",
+ "container-script tokens http-option pidfile nm-local-dirs nm-log-dirs resources ",
INITIALIZE_CONTAINER, LAUNCH_CONTAINER);
if(is_tc_support_enabled()) {
@@ -81,17 +81,16 @@ static void display_usage(FILE *stream) {
fprintf(stream, "\n");
}
- if(is_docker_support_enabled()) {
- fprintf(stream,
- " launch docker container: %2d appid containerid workdir "
- "container-script tokens pidfile nm-local-dirs nm-log-dirs "
- "docker-command-file resources ", LAUNCH_DOCKER_CONTAINER);
- } else {
- fprintf(stream,
- "[DISABLED] launch docker container: %2d appid containerid workdir "
- "container-script tokens pidfile nm-local-dirs nm-log-dirs "
- "docker-command-file resources ", LAUNCH_DOCKER_CONTAINER);
- }
+ fputs(
+ " where http-option is one of:\n"
+ " --http\n"
+ " --https keystorepath truststorepath\n", stream);
+
+ de = is_docker_support_enabled() ? enabled : disabled;
+ fprintf(stream,
+ "%11s launch docker container:%2d appid containerid workdir "
+ "container-script tokens http-option pidfile nm-local-dirs nm-log-dirs "
+ "docker-command-file resources ", de, LAUNCH_DOCKER_CONTAINER);
if(is_tc_support_enabled()) {
fprintf(stream, "optional-tc-command-file\n");
@@ -99,7 +98,12 @@ static void display_usage(FILE *stream) {
fprintf(stream, "\n");
}
- fprintf(stream,
+ fputs(
+ " where http-option is one of:\n"
+ " --http\n"
+ " --https keystorepath truststorepath\n", stream);
+
+ fprintf(stream,
" signal container: %2d container-pid signal\n"
" delete as user: %2d relative-path\n"
" list as user: %2d relative-path\n",
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DBManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DBManager.java
new file mode 100644
index 00000000000..13529baebe7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DBManager.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.function.Consumer;
+
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+
+public class DBManager implements Closeable {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(DBManager.class);
+ private DB db;
+ private Timer compactionTimer;
+
+ public DB initDatabase(File configurationFile, Options options,
+ Consumer initMethod) throws Exception {
+ try {
+ db = JniDBFactory.factory.open(configurationFile, options);
+ } catch (NativeDB.DBException e) {
+ if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+ LOG.info("Creating configuration version/database at {}",
+ configurationFile);
+ options.createIfMissing(true);
+ try {
+ db = JniDBFactory.factory.open(configurationFile, options);
+ initMethod.accept(db);
+ } catch (DBException dbErr) {
+ throw new IOException(dbErr.getMessage(), dbErr);
+ }
+ } else {
+ throw e;
+ }
+ }
+
+ return db;
+ }
+
+ public void close() throws IOException {
+ if (compactionTimer != null) {
+ compactionTimer.cancel();
+ compactionTimer = null;
+ }
+ if (db != null) {
+ db.close();
+ db = null;
+ }
+ }
+
+ public void storeVersion(String versionKey, Version versionValue) {
+ byte[] data = ((VersionPBImpl) versionValue).getProto().toByteArray();
+ db.put(bytes(versionKey), data);
+ }
+
+ public Version loadVersion(String versionKey) throws Exception {
+ Version version = null;
+ try {
+ byte[] data = db.get(bytes(versionKey));
+ if (data != null) {
+ version = new VersionPBImpl(YarnServerCommonProtos.VersionProto
+ .parseFrom(data));
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ return version;
+ }
+
+ @VisibleForTesting
+ public void setDb(DB db) {
+ this.db = db;
+ }
+
+ public void startCompactionTimer(long compactionIntervalMsec,
+ String className) {
+ if (compactionIntervalMsec > 0) {
+ compactionTimer = new Timer(
+ className + " compaction timer", true);
+ compactionTimer.schedule(new CompactionTimerTask(),
+ compactionIntervalMsec, compactionIntervalMsec);
+ }
+ }
+
+ private class CompactionTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ long start = Time.monotonicNow();
+ LOG.info("Starting full compaction cycle");
+ try {
+ db.compactRange(null, null);
+ } catch (DBException e) {
+ LOG.error("Error compacting database", e);
+ }
+ long duration = Time.monotonicNow() - start;
+ LOG.info("Full compaction cycle completed in " + duration + " msec");
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
index a2c8dd2cfa8..6076112600a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -29,9 +29,8 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
+import org.apache.hadoop.yarn.server.resourcemanager.DBManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -40,13 +39,11 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
@@ -54,7 +51,6 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
-import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -64,8 +60,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
-import org.fusesource.leveldbjni.JniDBFactory;
-import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Options;
@@ -98,7 +92,7 @@
.newInstance(1, 1);
private DB db;
- private Timer compactionTimer;
+ private DBManager dbManager = new DBManager();
private long compactionIntervalMsec;
private String getApplicationNodeKey(ApplicationId appId) {
@@ -130,7 +124,7 @@ private String getReservationNodeKey(String planName,
}
@Override
- protected void initInternal(Configuration conf) throws Exception {
+ protected void initInternal(Configuration conf) {
compactionIntervalMsec = conf.getLong(
YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS,
YarnConfiguration.DEFAULT_RM_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
@@ -155,55 +149,20 @@ private Path createStorageDir() throws IOException {
@Override
protected void startInternal() throws Exception {
- db = openDatabase();
- startCompactionTimer();
- }
-
- protected DB openDatabase() throws Exception {
Path storeRoot = createStorageDir();
Options options = new Options();
options.createIfMissing(false);
LOG.info("Using state database at " + storeRoot + " for recovery");
File dbfile = new File(storeRoot.toString());
- try {
- db = JniDBFactory.factory.open(dbfile, options);
- } catch (NativeDB.DBException e) {
- if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
- LOG.info("Creating state database at " + dbfile);
- options.createIfMissing(true);
- try {
- db = JniDBFactory.factory.open(dbfile, options);
- // store version
- storeVersion();
- } catch (DBException dbErr) {
- throw new IOException(dbErr.getMessage(), dbErr);
- }
- } else {
- throw e;
- }
- }
- return db;
- }
-
- private void startCompactionTimer() {
- if (compactionIntervalMsec > 0) {
- compactionTimer = new Timer(
- this.getClass().getSimpleName() + " compaction timer", true);
- compactionTimer.schedule(new CompactionTimerTask(),
- compactionIntervalMsec, compactionIntervalMsec);
- }
+ db = dbManager.initDatabase(dbfile, options, (database) ->
+ storeVersion(CURRENT_VERSION_INFO));
+ dbManager.startCompactionTimer(compactionIntervalMsec,
+ this.getClass().getSimpleName());
}
@Override
protected void closeInternal() throws Exception {
- if (compactionTimer != null) {
- compactionTimer.cancel();
- compactionTimer = null;
- }
- if (db != null) {
- db.close();
- db = null;
- }
+ dbManager.close();
}
@VisibleForTesting
@@ -218,33 +177,22 @@ DB getDatabase() {
@Override
protected Version loadVersion() throws Exception {
- Version version = null;
- try {
- byte[] data = db.get(bytes(VERSION_NODE));
- if (data != null) {
- version = new VersionPBImpl(VersionProto.parseFrom(data));
- }
- } catch (DBException e) {
- throw new IOException(e);
- }
- return version;
+ return dbManager.loadVersion(VERSION_NODE);
}
@Override
protected void storeVersion() throws Exception {
- dbStoreVersion(CURRENT_VERSION_INFO);
- }
-
- void dbStoreVersion(Version state) throws IOException {
- String key = VERSION_NODE;
- byte[] data = ((VersionPBImpl) state).getProto().toByteArray();
try {
- db.put(bytes(key), data);
+ storeVersion(CURRENT_VERSION_INFO);
} catch (DBException e) {
throw new IOException(e);
}
}
+ protected void storeVersion(Version version) {
+ dbManager.storeVersion(VERSION_NODE, version);
+ }
+
@Override
protected Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
@@ -279,9 +227,7 @@ public RMState loadState() throws Exception {
private void loadReservationState(RMState rmState) throws IOException {
int numReservations = 0;
- LeveldbIterator iter = null;
- try {
- iter = new LeveldbIterator(db);
+ try (LeveldbIterator iter = new LeveldbIterator(db)) {
iter.seek(bytes(RM_RESERVATION_KEY_PREFIX));
while (iter.hasNext()) {
Entry entry = iter.next();
@@ -313,10 +259,6 @@ private void loadReservationState(RMState rmState) throws IOException {
}
} catch (DBException e) {
throw new IOException(e);
- } finally {
- if (iter != null) {
- iter.close();
- }
}
LOG.info("Recovered " + numReservations + " reservations");
}
@@ -331,9 +273,7 @@ private void loadRMDTSecretManagerState(RMState state) throws IOException {
private int loadRMDTSecretManagerKeys(RMState state) throws IOException {
int numKeys = 0;
- LeveldbIterator iter = null;
- try {
- iter = new LeveldbIterator(db);
+ try (LeveldbIterator iter = new LeveldbIterator(db)) {
iter.seek(bytes(RM_DT_MASTER_KEY_KEY_PREFIX));
while (iter.hasNext()) {
Entry entry = iter.next();
@@ -352,10 +292,6 @@ private int loadRMDTSecretManagerKeys(RMState state) throws IOException {
}
} catch (DBException e) {
throw new IOException(e);
- } finally {
- if (iter != null) {
- iter.close();
- }
}
return numKeys;
}
@@ -373,9 +309,7 @@ private DelegationKey loadDelegationKey(byte[] data) throws IOException {
private int loadRMDTSecretManagerTokens(RMState state) throws IOException {
int numTokens = 0;
- LeveldbIterator iter = null;
- try {
- iter = new LeveldbIterator(db);
+ try (LeveldbIterator iter = new LeveldbIterator(db)) {
iter.seek(bytes(RM_DT_TOKEN_KEY_PREFIX));
while (iter.hasNext()) {
Entry entry = iter.next();
@@ -397,17 +331,13 @@ private int loadRMDTSecretManagerTokens(RMState state) throws IOException {
}
} catch (DBException e) {
throw new IOException(e);
- } finally {
- if (iter != null) {
- iter.close();
- }
}
return numTokens;
}
private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data)
throws IOException {
- RMDelegationTokenIdentifierData tokenData = null;
+ RMDelegationTokenIdentifierData tokenData;
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
try {
tokenData = RMStateStoreUtils.readRMDelegationTokenIdentifierData(in);
@@ -419,7 +349,7 @@ private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data)
private void loadRMDTSecretManagerTokenSequenceNumber(RMState state)
throws IOException {
- byte[] data = null;
+ byte[] data;
try {
data = db.get(bytes(RM_DT_SEQUENCE_NUMBER_KEY));
} catch (DBException e) {
@@ -438,9 +368,7 @@ private void loadRMDTSecretManagerTokenSequenceNumber(RMState state)
private void loadRMApps(RMState state) throws IOException {
int numApps = 0;
int numAppAttempts = 0;
- LeveldbIterator iter = null;
- try {
- iter = new LeveldbIterator(db);
+ try (LeveldbIterator iter = new LeveldbIterator(db)) {
iter.seek(bytes(RM_APP_KEY_PREFIX));
while (iter.hasNext()) {
Entry entry = iter.next();
@@ -460,10 +388,6 @@ private void loadRMApps(RMState state) throws IOException {
}
} catch (DBException e) {
throw new IOException(e);
- } finally {
- if (iter != null) {
- iter.close();
- }
}
LOG.info("Recovered " + numApps + " applications and " + numAppAttempts
+ " application attempts");
@@ -519,7 +443,7 @@ private ApplicationStateData createApplicationState(String appIdStr,
@VisibleForTesting
ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException {
String appKey = getApplicationNodeKey(appId);
- byte[] data = null;
+ byte[] data;
try {
data = db.get(bytes(appKey));
} catch (DBException e) {
@@ -535,7 +459,7 @@ ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException {
ApplicationAttemptStateData loadRMAppAttemptState(
ApplicationAttemptId attemptId) throws IOException {
String attemptKey = getApplicationAttemptNodeKey(attemptId);
- byte[] data = null;
+ byte[] data;
try {
data = db.get(bytes(attemptKey));
} catch (DBException e) {
@@ -643,8 +567,7 @@ protected void removeApplicationStateInternal(ApplicationStateData appState)
appState.getApplicationSubmissionContext().getApplicationId();
String appKey = getApplicationNodeKey(appId);
try {
- WriteBatch batch = db.createWriteBatch();
- try {
+ try (WriteBatch batch = db.createWriteBatch()) {
batch.delete(bytes(appKey));
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
String attemptKey = getApplicationAttemptNodeKey(appKey, attemptId);
@@ -655,8 +578,6 @@ protected void removeApplicationStateInternal(ApplicationStateData appState)
+ appState.attempts.size() + " attempts" + " at " + appKey);
}
db.write(batch);
- } finally {
- batch.close();
}
} catch (DBException e) {
throw new IOException(e);
@@ -668,8 +589,7 @@ protected void storeReservationState(
ReservationAllocationStateProto reservationAllocation, String planName,
String reservationIdName) throws Exception {
try {
- WriteBatch batch = db.createWriteBatch();
- try {
+ try (WriteBatch batch = db.createWriteBatch()) {
String key = getReservationNodeKey(planName, reservationIdName);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing state for reservation " + reservationIdName
@@ -677,8 +597,6 @@ protected void storeReservationState(
}
batch.put(bytes(key), reservationAllocation.toByteArray());
db.write(batch);
- } finally {
- batch.close();
}
} catch (DBException e) {
throw new IOException(e);
@@ -689,8 +607,7 @@ protected void storeReservationState(
protected void removeReservationState(String planName,
String reservationIdName) throws Exception {
try {
- WriteBatch batch = db.createWriteBatch();
- try {
+ try (WriteBatch batch = db.createWriteBatch()) {
String reservationKey =
getReservationNodeKey(planName, reservationIdName);
batch.delete(bytes(reservationKey));
@@ -699,8 +616,6 @@ protected void removeReservationState(String planName,
+ " plan " + planName + " at " + reservationKey);
}
db.write(batch);
- } finally {
- batch.close();
}
} catch (DBException e) {
throw new IOException(e);
@@ -716,23 +631,20 @@ private void storeOrUpdateRMDT(RMDelegationTokenIdentifier tokenId,
LOG.debug("Storing token to " + tokenKey);
}
try {
- WriteBatch batch = db.createWriteBatch();
- try {
+ try (WriteBatch batch = db.createWriteBatch()) {
batch.put(bytes(tokenKey), tokenData.toByteArray());
- if(!isUpdate) {
+ if (!isUpdate) {
ByteArrayOutputStream bs = new ByteArrayOutputStream();
try (DataOutputStream ds = new DataOutputStream(bs)) {
ds.writeInt(tokenId.getSequenceNumber());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Storing " + tokenId.getSequenceNumber() + " to "
- + RM_DT_SEQUENCE_NUMBER_KEY);
+ + RM_DT_SEQUENCE_NUMBER_KEY);
}
batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
}
db.write(batch);
- } finally {
- batch.close();
}
} catch (DBException e) {
throw new IOException(e);
@@ -775,11 +687,8 @@ protected void storeRMDTMasterKeyState(DelegationKey masterKey)
LOG.debug("Storing token master key to " + dbKey);
}
ByteArrayOutputStream os = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(os);
- try {
+ try (DataOutputStream out = new DataOutputStream(os)) {
masterKey.write(out);
- } finally {
- out.close();
}
try {
db.put(bytes(dbKey), os.toByteArray());
@@ -836,9 +745,7 @@ public synchronized void removeApplication(ApplicationId removeAppId)
@VisibleForTesting
int getNumEntriesInDatabase() throws IOException {
int numEntries = 0;
- LeveldbIterator iter = null;
- try {
- iter = new LeveldbIterator(db);
+ try (LeveldbIterator iter = new LeveldbIterator(db)) {
iter.seekToFirst();
while (iter.hasNext()) {
Entry entry = iter.next();
@@ -847,26 +754,12 @@ int getNumEntriesInDatabase() throws IOException {
}
} catch (DBException e) {
throw new IOException(e);
- } finally {
- if (iter != null) {
- iter.close();
- }
}
return numEntries;
}
- private class CompactionTimerTask extends TimerTask {
- @Override
- public void run() {
- long start = Time.monotonicNow();
- LOG.info("Starting full compaction cycle");
- try {
- db.compactRange(null, null);
- } catch (DBException e) {
- LOG.error("Error compacting database", e);
- }
- long duration = Time.monotonicNow() - start;
- LOG.info("Full compaction cycle completed in " + duration + " msec");
- }
+ @VisibleForTesting
+ protected void setDbManager(DBManager dbManager) {
+ this.dbManager = dbManager;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
index 2605e0e1462..b347aba9b4a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java
@@ -19,20 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.server.resourcemanager.DBManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.records.Version;
-import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.fusesource.leveldbjni.JniDBFactory;
-import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBComparator;
import org.iq80.leveldb.DBException;
@@ -52,9 +48,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.function.Consumer;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
@@ -73,6 +66,8 @@
private static final String CONF_VERSION_KEY = "conf-version";
private DB db;
+ private DBManager dbManager;
+ private DBManager versionDbManager;
private DB versionDb;
private long maxLogs;
private Configuration conf;
@@ -81,23 +76,25 @@
@VisibleForTesting
protected static final Version CURRENT_VERSION_INFO = Version
.newInstance(0, 1);
- private long compactionIntervalMsec;
@Override
public void initialize(Configuration config, Configuration schedConf,
RMContext rmContext) throws IOException {
this.conf = config;
this.initSchedConf = schedConf;
+ this.dbManager = new DBManager();
+ this.versionDbManager = new DBManager();
try {
initDatabase();
this.maxLogs = config.getLong(
YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
- this.compactionIntervalMsec = config.getLong(
+ long compactionIntervalMsec = config.getLong(
YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS,
YarnConfiguration
.DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
- startCompactionTimer();
+ dbManager.startCompactionTimer(compactionIntervalMsec,
+ this.getClass().getSimpleName());
} catch (Exception e) {
throw new IOException(e);
}
@@ -116,7 +113,7 @@ private void initDatabase() throws Exception {
confOptions.createIfMissing(false);
File confVersionFile = new File(confVersion.toString());
- versionDb = initDatabaseHelper(confVersionFile, confOptions,
+ versionDb = versionDbManager.initDatabase(confVersionFile, confOptions,
this::initVersionDb);
Path storeRoot = createStorageDir(DB_NAME);
@@ -156,7 +153,7 @@ public String name() {
});
LOG.info("Using conf database at {}", storeRoot);
File dbFile = new File(storeRoot.toString());
- db = initDatabaseHelper(dbFile, options, this::initDb);
+ db = dbManager.initDatabase(dbFile, options, this::initDb);
}
private void initVersionDb(DB database) {
@@ -172,30 +169,6 @@ private void initDb(DB database) {
increaseConfigVersion();
}
- private DB initDatabaseHelper(File configurationFile, Options options,
- Consumer initMethod) throws Exception {
- DB database;
- try {
- database = JniDBFactory.factory.open(configurationFile, options);
- } catch (NativeDB.DBException e) {
- if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
- LOG.info("Creating configuration version/database at {}",
- configurationFile);
- options.createIfMissing(true);
- try {
- database = JniDBFactory.factory.open(configurationFile, options);
- initMethod.accept(database);
- } catch (DBException dbErr) {
- throw new IOException(dbErr.getMessage(), dbErr);
- }
- } else {
- throw e;
- }
- }
-
- return database;
- }
-
private Path createStorageDir(String storageName) throws IOException {
Path root = getStorageDir(storageName);
FileSystem fs = FileSystem.getLocal(conf);
@@ -214,12 +187,8 @@ private Path getStorageDir(String storageName) throws IOException {
@Override
public void close() throws IOException {
- if (db != null) {
- db.close();
- }
- if (versionDb != null) {
- versionDb.close();
- }
+ dbManager.close();
+ versionDbManager.close();
}
@Override
@@ -313,28 +282,9 @@ public long getConfigVersion() {
return null; // unimplemented
}
- private void startCompactionTimer() {
- if (compactionIntervalMsec > 0) {
- Timer compactionTimer = new Timer(
- this.getClass().getSimpleName() + " compaction timer", true);
- compactionTimer.schedule(new CompactionTimerTask(),
- compactionIntervalMsec, compactionIntervalMsec);
- }
- }
-
@Override
public Version getConfStoreVersion() throws Exception {
- Version version = null;
- try {
- byte[] data = db.get(bytes(VERSION_KEY));
- if (data != null) {
- version = new VersionPBImpl(YarnServerCommonProtos.VersionProto
- .parseFrom(data));
- }
- } catch (DBException e) {
- throw new IOException(e);
- }
- return version;
+ return dbManager.loadVersion(VERSION_KEY);
}
@VisibleForTesting
@@ -345,37 +295,20 @@ public Version getConfStoreVersion() throws Exception {
@Override
public void storeVersion() throws Exception {
- storeVersion(CURRENT_VERSION_INFO);
- }
-
- @VisibleForTesting
- protected void storeVersion(Version version) throws Exception {
- byte[] data = ((VersionPBImpl) version).getProto()
- .toByteArray();
try {
- db.put(bytes(VERSION_KEY), data);
+ storeVersion(CURRENT_VERSION_INFO);
} catch (DBException e) {
throw new IOException(e);
}
}
+ @VisibleForTesting
+ protected void storeVersion(Version version) {
+ dbManager.storeVersion(VERSION_KEY, version);
+ }
+
@Override
public Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
-
- private class CompactionTimerTask extends TimerTask {
- @Override
- public void run() {
- long start = Time.monotonicNow();
- LOG.info("Starting full compaction cycle");
- try {
- db.compactRange(null, null);
- } catch (DBException e) {
- LOG.error("Error compacting database", e);
- }
- long duration = Time.monotonicNow() - start;
- LOG.info("Full compaction cycle completed in {} msec", duration);
- }
- }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
index 576ee7f38d6..68409534aa3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
@@ -25,14 +25,17 @@
import java.io.File;
import java.io.IOException;
+import java.util.function.Consumer;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.DBManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -125,15 +128,19 @@ public void testReservation() throws Exception {
}
@Test(timeout = 60000)
- public void testCompactionCycle() throws Exception {
+ public void testCompactionCycle() {
final DB mockdb = mock(DB.class);
conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1);
- stateStore = new LeveldbRMStateStore() {
+ stateStore = new LeveldbRMStateStore();
+ DBManager dbManager = new DBManager() {
@Override
- protected DB openDatabase() throws Exception {
+ public DB initDatabase(File configurationFile, Options options,
+ Consumer initMethod) {
return mockdb;
}
};
+ dbManager.setDb(mockdb);
+ stateStore.setDbManager(dbManager);
stateStore.init(conf);
stateStore.start();
verify(mockdb, timeout(10000)).compactRange(
@@ -175,12 +182,12 @@ public boolean isFinalStateValid() throws Exception {
}
@Override
- public void writeVersion(Version version) throws Exception {
- stateStore.dbStoreVersion(version);
+ public void writeVersion(Version version) {
+ stateStore.storeVersion(version);
}
@Override
- public Version getCurrentVersion() throws Exception {
+ public Version getCurrentVersion() {
return stateStore.getCurrentVersion();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
index b8a6bdafbe1..ed1159c3d30 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationStoreBaseTest.java
@@ -79,24 +79,12 @@ public void testNullConfigurationUpdate() throws Exception {
confStore.close();
}
- void prepareLogMutation(String... values)
+ void prepareLogMutation(String key, String value)
throws Exception {
- Map updates = new HashMap<>();
- String key;
- String value;
-
- if (values.length % 2 != 0) {
- throw new IllegalArgumentException("The number of parameters should be " +
- "even.");
- }
-
- for (int i = 1; i <= values.length; i += 2) {
- key = values[i - 1];
- value = values[i];
- updates.put(key, value);
- }
+ Map update = new HashMap<>();
+ update.put(key, value);
YarnConfigurationStore.LogMutation mutation =
- new YarnConfigurationStore.LogMutation(updates, TEST_USER);
+ new YarnConfigurationStore.LogMutation(update, TEST_USER);
confStore.logMutation(mutation);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java
index bd30c30567f..dfb5a29fe70 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/PersistentConfigurationStoreBaseTest.java
@@ -25,7 +25,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-import static org.junit.Assume.assumeFalse;
/**
* Base class for the persistent {@link YarnConfigurationStore}
@@ -97,9 +96,6 @@ public void testVersion() throws Exception {
@Test
public void testMaxLogs() throws Exception {
- assumeFalse("test should be skipped for TestFSSchedulerConfigurationStore",
- this instanceof TestFSSchedulerConfigurationStore);
-
conf.setLong(YarnConfiguration.RM_SCHEDCONF_MAX_LOGS, 2);
confStore.initialize(conf, schedConf, rmContext);
LinkedList logs = confStore.getLogs();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
index 87a6a3da5a4..c3ef164a937 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
@@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
@@ -30,34 +31,36 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
/**
* Tests {@link FSSchedulerConfigurationStore}.
*/
-public class TestFSSchedulerConfigurationStore extends
- PersistentConfigurationStoreBaseTest {
+public class TestFSSchedulerConfigurationStore {
+ private static final String TEST_USER = "test";
+ private FSSchedulerConfigurationStore configurationStore;
+ private Configuration conf;
private File testSchedulerConfigurationDir;
@Before
- @Override
public void setUp() throws Exception {
- super.setUp();
+ configurationStore = new FSSchedulerConfigurationStore();
testSchedulerConfigurationDir = new File(
TestFSSchedulerConfigurationStore.class.getResource("").getPath()
+ FSSchedulerConfigurationStore.class.getSimpleName());
testSchedulerConfigurationDir.mkdirs();
+ conf = new Configuration();
conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH,
testSchedulerConfigurationDir.getAbsolutePath());
}
@@ -78,15 +81,6 @@ public void tearDown() throws Exception {
FileUtils.deleteDirectory(testSchedulerConfigurationDir);
}
- @Test
- public void checkVersion() {
- try {
- confStore.checkVersion();
- } catch (Exception e) {
- fail("checkVersion throw exception");
- }
- }
-
@Test
public void confirmMutationWithValid() throws Exception {
conf.setInt(
@@ -95,26 +89,26 @@ public void confirmMutationWithValid() throws Exception {
conf.set("b", "b");
conf.set("c", "c");
writeConf(conf);
- confStore.initialize(conf, conf, null);
- Configuration storeConf = confStore.retrieve();
+ configurationStore.initialize(conf, conf, null);
+ Configuration storeConf = configurationStore.retrieve();
compareConfig(conf, storeConf);
Configuration expectConfig = new Configuration(conf);
expectConfig.unset("a");
expectConfig.set("b", "bb");
- prepareLogMutation("a", null, "b", "bb");
- confStore.confirmMutation(true);
- storeConf = confStore.retrieve();
+ prepareParameterizedLogMutation(configurationStore, true,
+ "a", null, "b", "bb");
+ storeConf = configurationStore.retrieve();
assertNull(storeConf.get("a"));
assertEquals("bb", storeConf.get("b"));
assertEquals("c", storeConf.get("c"));
compareConfig(expectConfig, storeConf);
- prepareLogMutation("a", null, "b", "bbb");
- confStore.confirmMutation(true);
- storeConf = confStore.retrieve();
+ prepareParameterizedLogMutation(configurationStore, true,
+ "a", null, "b", "bbb");
+ storeConf = configurationStore.retrieve();
assertNull(storeConf.get("a"));
assertEquals("bbb", storeConf.get("b"));
assertEquals("c", storeConf.get("c"));
@@ -126,53 +120,17 @@ public void confirmMutationWithInvalid() throws Exception {
conf.set("b", "b");
conf.set("c", "c");
writeConf(conf);
- confStore.initialize(conf, conf, null);
- Configuration storeConf = confStore.retrieve();
+ configurationStore.initialize(conf, conf, null);
+ Configuration storeConf = configurationStore.retrieve();
compareConfig(conf, storeConf);
- prepareLogMutation("a", null, "b", "bb");
- confStore.confirmMutation(false);
-
- storeConf = confStore.retrieve();
+ prepareParameterizedLogMutation(configurationStore, false,
+ "a", null, "b", "bb");
+ storeConf = configurationStore.retrieve();
compareConfig(conf, storeConf);
}
- @Test
- public void testConfigRetrieval() throws Exception {
- Configuration schedulerConf = new Configuration();
- schedulerConf.set("a", "a");
- schedulerConf.setLong("long", 1L);
- schedulerConf.setBoolean("boolean", true);
- writeConf(schedulerConf);
-
- confStore.initialize(conf, conf, null);
- Configuration storedConfig = confStore.retrieve();
-
- compareConfig(schedulerConf, storedConfig);
- }
-
- @Test
- public void testFormatConfiguration() throws Exception {
- Configuration persistedSchedConf = new Configuration();
- persistedSchedConf.set("a", "a");
- writeConf(persistedSchedConf);
- confStore.initialize(conf, conf, null);
- Configuration storedConfig = confStore.retrieve();
- assertEquals("Retrieved config should match the stored one", "a",
- storedConfig.get("a"));
- confStore.format();
- try {
- confStore.retrieve();
- fail("Expected an IOException with message containing \"no capacity " +
- "scheduler file in\" to be thrown");
- } catch (IOException e) {
- assertThat("Exception message should contain the predefined string.",
- e.getMessage(),
- CoreMatchers.containsString("no capacity scheduler file in"));
- }
- }
-
@Test
public void testFileSystemClose() throws Exception {
MiniDFSCluster hdfsCluster = null;
@@ -188,16 +146,18 @@ public void testFileSystemClose() throws Exception {
fs.mkdirs(path);
}
+ FSSchedulerConfigurationStore configStore =
+ new FSSchedulerConfigurationStore();
hdfsConfig.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH,
path.toString());
- confStore.initialize(hdfsConfig, hdfsConfig, null);
+ configStore.initialize(hdfsConfig, hdfsConfig, null);
// Close the FileSystem object and validate
fs.close();
try {
- prepareLogMutation("key", "val");
- confStore.confirmMutation(true);
+ prepareParameterizedLogMutation(configStore, true,
+ "testkey", "testvalue");
} catch (IOException e) {
if (e.getMessage().contains("Filesystem closed")) {
fail("FSSchedulerConfigurationStore failed to handle " +
@@ -216,6 +176,48 @@ public void testFileSystemClose() throws Exception {
}
}
+ @Test
+ public void testFormatConfiguration() throws Exception {
+ Configuration schedulerConf = new Configuration();
+ schedulerConf.set("a", "a");
+ writeConf(schedulerConf);
+ configurationStore.initialize(conf, conf, null);
+ Configuration storedConfig = configurationStore.retrieve();
+ assertEquals("a", storedConfig.get("a"));
+ configurationStore.format();
+ try {
+ configurationStore.retrieve();
+ fail("Expected an IOException with message containing \"no capacity " +
+ "scheduler file in\" to be thrown");
+ } catch (IOException e) {
+ assertThat(e.getMessage(),
+ CoreMatchers.containsString("no capacity scheduler file in"));
+ }
+ }
+
+ @Test
+ public void retrieve() throws Exception {
+ Configuration schedulerConf = new Configuration();
+ schedulerConf.set("a", "a");
+ schedulerConf.setLong("long", 1L);
+ schedulerConf.setBoolean("boolean", true);
+ writeConf(schedulerConf);
+
+ configurationStore.initialize(conf, conf, null);
+ Configuration storedConfig = configurationStore.retrieve();
+
+ compareConfig(schedulerConf, storedConfig);
+ }
+
+ @Test
+ public void checkVersion() {
+ try {
+ configurationStore.checkVersion();
+ } catch (Exception e) {
+ fail("checkVersion throw exception");
+ }
+ }
+
private void compareConfig(Configuration schedulerConf,
Configuration storedConfig) {
for (Map.Entry entry : schedulerConf) {
@@ -229,13 +231,26 @@ private void compareConfig(Configuration schedulerConf,
}
}
- @Override
- public YarnConfigurationStore createConfStore() {
- return new FSSchedulerConfigurationStore();
- }
+ private void prepareParameterizedLogMutation(
+ FSSchedulerConfigurationStore configStore,
+ boolean validityFlag, String... values) throws Exception {
+ Map updates = new HashMap<>();
+ String key;
+ String value;
+
+ if (values.length % 2 != 0) {
+ throw new IllegalArgumentException("The number of parameters should be " +
+ "even.");
+ }
+
+ for (int i = 1; i <= values.length; i += 2) {
+ key = values[i - 1];
+ value = values[i];
+ updates.put(key, value);
+ }
- @Override
- Version getVersion() {
- return null;
+ LogMutation logMutation = new LogMutation(updates, TEST_USER);
+ configStore.logMutation(logMutation);
+ configStore.confirmMutation(validityFlag);
}
}
\ No newline at end of file