diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index c5ae3f0..67fd5f2 100644
--- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -269,6 +269,15 @@ public final class HConstants {
/** Parameter name for HBase instance root directory */
public static final String HBASE_DIR = "hbase.rootdir";
+ /** Parameter name for HBase instance log directory permission*/
+ public static final String HBASE_DIR_PERMS = "hbase.rootdir.perms";
+
+ /** Parameter name for HBase instance log directory */
+ public static final String HBASE_LOG_DIR = "hbase.regionserver.hlog.dir";
+
+ /** Parameter name for HBase instance log directory permission*/
+ public static final String HBASE_LOG_DIR_PERMS = "hbase.regionserver.hlog.dir.perms";
+
/** Parameter name for HBase client IPC pool type */
public static final String HBASE_CLIENT_IPC_POOL_TYPE = "hbase.client.ipc.pool.type";
diff --git hbase-common/src/main/resources/hbase-default.xml hbase-common/src/main/resources/hbase-default.xml
index dbdcc30..598ee8e 100644
--- hbase-common/src/main/resources/hbase-default.xml
+++ hbase-common/src/main/resources/hbase-default.xml
@@ -1145,6 +1145,13 @@ possible configurations would overwhelm and obscure the important.
if it does not match.
+ hbase.regionserver.hlog.dir.perms
+ 700
+ FS Permissions for the log directory in a secure(kerberos) setup.
+ When master starts, it creates the logdir with this permissions or sets the permissions
+ if it does not match.
+
+
hbase.data.umask.enable
false
Enable, if true, that file permissions should be assigned
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java
index 344d496..c2ad71a 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java
@@ -45,17 +45,17 @@ public class WALLink extends FileLink {
*/
public WALLink(final Configuration conf,
final String serverName, final String logName) throws IOException {
- this(FSUtils.getRootDir(conf), serverName, logName);
+ this(FSUtils.getLogRootDir(conf), serverName, logName);
}
/**
- * @param rootDir Path to the root directory where hbase files are stored
+ * @param logRootDir Path to the root directory where hbase files are stored
* @param serverName Region Server owner of the log
* @param logName WAL file name
*/
- public WALLink(final Path rootDir, final String serverName, final String logName) {
- final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- final Path logDir = new Path(new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), serverName);
+ public WALLink(final Path logRootDir, final String serverName, final String logName) {
+ final Path oldLogDir = new Path(logRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ final Path logDir = new Path(new Path(logRootDir, HConstants.HREGION_LOGDIR_NAME), serverName);
setLocations(new Path(logDir, logName), new Path(oldLogDir, logName));
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 29eacf0..ae4ff8b 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -583,15 +583,15 @@ public class AssignmentManager extends ZooKeeperListener {
Set queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
if (!queuedDeadServers.isEmpty()) {
Configuration conf = server.getConfiguration();
- Path rootdir = FSUtils.getRootDir(conf);
- FileSystem fs = rootdir.getFileSystem(conf);
+ Path logRootdir = FSUtils.getLogRootDir(conf);
+ FileSystem logFs = FSUtils.getLogRootDirFileSystem(conf);
for (ServerName serverName: queuedDeadServers) {
// In the case of a clean exit, the shutdown handler would have presplit any WALs and
// removed empty directories.
- Path logDir = new Path(rootdir,
+ Path logDir = new Path(logRootdir,
DefaultWALProvider.getWALDirectoryName(serverName.toString()));
Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
- if (fs.exists(logDir) || fs.exists(splitDir)) {
+ if (logFs.exists(logDir) || logFs.exists(splitDir)) {
LOG.debug("Found queued dead server " + serverName);
failover = true;
break;
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 59a9faa..a777fb1 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1103,7 +1103,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
this.logCleaner =
new LogCleaner(cleanerInterval,
- this, conf, getMasterFileSystem().getFileSystem(),
+ this, conf, getMasterFileSystem().getOldLogDir().getFileSystem(conf),
getMasterFileSystem().getOldLogDir());
getChoreService().scheduleChore(logCleaner);
@@ -1169,10 +1169,10 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
private void startProcedureExecutor() throws IOException {
final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
- final Path logDir = new Path(fileSystemManager.getRootDir(),
+ final Path logDir = new Path(FSUtils.getLogRootDir(this.conf),
MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
- procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
+ procedureStore = new WALProcedureStore(conf, logDir.getFileSystem(conf), logDir,
new MasterProcedureEnv.WALStoreLeaseRecovery(this));
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index dc43d8c..60fde9b 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -78,12 +78,15 @@ public class MasterFileSystem {
private ClusterId clusterId;
// Keep around for convenience.
private final FileSystem fs;
+ private final FileSystem logFs;
// Is the fileystem ok?
- private volatile boolean fsOk = true;
+ private volatile boolean logFsOk = true;
// The Path to the old logs dir
private final Path oldLogDir;
// root hbase directory on the FS
private final Path rootdir;
+ // logroot hbase directory on the FS
+ private final Path logRootDir;
// hbase temp directory used for table construction and deletion
private final Path tempdir;
// create the split log lock
@@ -120,9 +123,12 @@ public class MasterFileSystem {
// Cover both bases, the old way of setting default fs and the new.
// We're supposed to run on 0.20 and 0.21 anyways.
this.fs = this.rootdir.getFileSystem(conf);
+ this.logRootDir = FSUtils.getLogRootDir(conf);
+ this.logFs = FSUtils.getLogRootDirFileSystem(conf);
FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
// make sure the fs has the same conf
fs.setConf(conf);
+ logFs.setConf(conf);
// setup the filesystem variable
// set up the archived logs path
this.oldLogDir = createInitialFileSystemLayout();
@@ -150,16 +156,19 @@ public class MasterFileSystem {
*/
private Path createInitialFileSystemLayout() throws IOException {
// check if the root directory exists
- checkRootDir(this.rootdir, conf, this.fs);
-
+ checkRootDir(this.rootdir, conf, this.fs, HConstants.HBASE_DIR, HConstants.HBASE_DIR_PERMS);
+ // if the log directory is different from root, check if it exists
+ if (!this.logRootDir.equals(this.rootdir)) {
+ checkRootDir(this.logRootDir, conf, this.logFs, HConstants.HBASE_LOG_DIR, HConstants.HBASE_LOG_DIR_PERMS);
+ }
// check if temp directory exists and clean it
checkTempDir(this.tempdir, conf, this.fs);
- Path oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+ Path oldLogDir = new Path(this.logRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
// Make sure the region servers can archive their old logs
- if(!this.fs.exists(oldLogDir)) {
- this.fs.mkdirs(oldLogDir);
+ if(!this.logFs.exists(oldLogDir)) {
+ this.logFs.mkdirs(oldLogDir);
}
return oldLogDir;
@@ -169,6 +178,7 @@ public class MasterFileSystem {
return this.fs;
}
+ public FileSystem getLogFileSystem() { return this.logFs; }
/**
* Get the directory where old logs go
* @return the dir
@@ -183,16 +193,16 @@ public class MasterFileSystem {
* @return false if file system is not available
*/
public boolean checkFileSystem() {
- if (this.fsOk) {
+ if (this.logFsOk) {
try {
- FSUtils.checkFileSystemAvailable(this.fs);
+ FSUtils.checkFileSystemAvailable(this.logFs);
FSUtils.checkDfsSafeMode(this.conf);
} catch (IOException e) {
master.abort("Shutting down HBase cluster: file system not available", e);
- this.fsOk = false;
+ this.logFsOk = false;
}
}
- return this.fsOk;
+ return this.logFsOk;
}
/**
@@ -203,6 +213,13 @@ public class MasterFileSystem {
}
/**
+ * @return HBase root log dir.
+ */
+ public Path getLogRootDir() {
+ return this.logRootDir;
+ }
+
+ /**
* @return HBase temp dir.
*/
public Path getTempDir() {
@@ -225,7 +242,7 @@ public class MasterFileSystem {
WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT);
Set serverNames = new HashSet();
- Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
+ Path logsDirPath = new Path(this.logRootDir, HConstants.HREGION_LOGDIR_NAME);
do {
if (master.isStopped()) {
@@ -233,8 +250,8 @@ public class MasterFileSystem {
break;
}
try {
- if (!this.fs.exists(logsDirPath)) return serverNames;
- FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null);
+ if (!this.logFs.exists(logsDirPath)) return serverNames;
+ FileStatus[] logFolders = FSUtils.listStatus(this.logFs, logsDirPath, null);
// Get online servers after getting log folders to avoid log folder deletion of newly
// checked in region servers . see HBASE-5916
Set onlineServers = ((HMaster) master).getServerManager().getOnlineServers()
@@ -245,7 +262,7 @@ public class MasterFileSystem {
return serverNames;
}
for (FileStatus status : logFolders) {
- FileStatus[] curLogFiles = FSUtils.listStatus(this.fs, status.getPath(), null);
+ FileStatus[] curLogFiles = FSUtils.listStatus(this.logFs, status.getPath(), null);
if (curLogFiles == null || curLogFiles.length == 0) {
// Empty log folder. No recovery needed
continue;
@@ -326,17 +343,17 @@ public class MasterFileSystem {
}
try {
for (ServerName serverName : serverNames) {
- Path logDir = new Path(this.rootdir,
+ Path logDir = new Path(this.logRootDir,
DefaultWALProvider.getWALDirectoryName(serverName.toString()));
Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
// Rename the directory so a rogue RS doesn't create more WALs
- if (fs.exists(logDir)) {
- if (!this.fs.rename(logDir, splitDir)) {
+ if (logFs.exists(logDir)) {
+ if (!this.logFs.rename(logDir, splitDir)) {
throw new IOException("Failed fs.rename for log split: " + logDir);
}
logDir = splitDir;
LOG.debug("Renamed region directory: " + splitDir);
- } else if (!fs.exists(splitDir)) {
+ } else if (!logFs.exists(splitDir)) {
LOG.info("Log dir for server " + serverName + " does not exist");
continue;
}
@@ -418,19 +435,19 @@ public class MasterFileSystem {
*/
@SuppressWarnings("deprecation")
private Path checkRootDir(final Path rd, final Configuration c,
- final FileSystem fs)
+ final FileSystem fs, final String dirConfKey, final String dirPermsConfName)
throws IOException {
// If FS is in safe mode wait till out of it.
FSUtils.waitOnSafeMode(c, c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
boolean isSecurityEnabled = "kerberos".equalsIgnoreCase(c.get("hbase.security.authentication"));
- FsPermission rootDirPerms = new FsPermission(c.get("hbase.rootdir.perms", "700"));
+ FsPermission dirPerms = new FsPermission(c.get(dirPermsConfName, "700"));
- // Filesystem is good. Go ahead and check for hbase.rootdir.
+ // Filesystem is good. Go ahead and check for root directory.
try {
if (!fs.exists(rd)) {
if (isSecurityEnabled) {
- fs.mkdirs(rd, rootDirPerms);
+ fs.mkdirs(rd, dirPerms);
} else {
fs.mkdirs(rd);
}
@@ -448,15 +465,15 @@ public class MasterFileSystem {
if (!fs.isDirectory(rd)) {
throw new IllegalArgumentException(rd.toString() + " is not a directory");
}
- if (isSecurityEnabled && !rootDirPerms.equals(fs.getFileStatus(rd).getPermission())) {
+ if (isSecurityEnabled && !dirPerms.equals(fs.getFileStatus(rd).getPermission())) {
// check whether the permission match
LOG.warn("Found rootdir permissions NOT matching expected \"hbase.rootdir.perms\" for "
- + "rootdir=" + rd.toString() + " permissions=" + fs.getFileStatus(rd).getPermission()
- + " and \"hbase.rootdir.perms\" configured as "
- + c.get("hbase.rootdir.perms", "700") + ". Automatically setting the permissions. You"
- + " can change the permissions by setting \"hbase.rootdir.perms\" in hbase-site.xml "
+ + "rd=" + rd.toString() + " permissions=" + fs.getFileStatus(rd).getPermission()
+ + " and \"" + dirPermsConfName + "\" configured as "
+ + c.get(dirPermsConfName, "700") + ". Automatically setting the permissions. You"
+ + " can change the permissions by setting \"" + dirPermsConfName + "\" in hbase-site.xml "
+ "and restarting the master");
- fs.setPermission(rd, rootDirPerms);
+ fs.setPermission(rd, dirPerms);
}
// as above
FSUtils.checkVersion(fs, rd, true, c.getInt(HConstants.THREAD_WAKE_FREQUENCY,
@@ -464,39 +481,40 @@ public class MasterFileSystem {
HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS));
}
} catch (DeserializationException de) {
- LOG.fatal("Please fix invalid configuration for " + HConstants.HBASE_DIR, de);
+ LOG.fatal("Please fix invalid configuration for " + dirConfKey, de);
IOException ioe = new IOException();
ioe.initCause(de);
throw ioe;
} catch (IllegalArgumentException iae) {
LOG.fatal("Please fix invalid configuration for "
- + HConstants.HBASE_DIR + " " + rd.toString(), iae);
+ + dirConfKey + " " + rd.toString(), iae);
throw iae;
}
- // Make sure cluster ID exists
- if (!FSUtils.checkClusterIdExists(fs, rd, c.getInt(
- HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000))) {
- FSUtils.setClusterId(fs, rd, new ClusterId(), c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
- }
- clusterId = FSUtils.getClusterId(fs, rd);
-
- // Make sure the meta region directory exists!
- if (!FSUtils.metaRegionExists(fs, rd)) {
- bootstrap(rd, c);
- } else {
- // Migrate table descriptor files if necessary
- org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir
- .migrateFSTableDescriptorsIfNecessary(fs, rd);
- }
+ if(dirConfKey.equals(HConstants.HBASE_DIR)) {
+ // Make sure cluster ID exists
+ if (!FSUtils.checkClusterIdExists(fs, rd, c.getInt(
+ HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000))) {
+ FSUtils.setClusterId(fs, rd, new ClusterId(), c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000));
+ }
+ clusterId = FSUtils.getClusterId(fs, rd);
- // Create tableinfo-s for hbase:meta if not already there.
+ // Make sure the meta region directory exists!
+ if (!FSUtils.metaRegionExists(fs, rd)) {
+ bootstrap(rd, c);
+ } else {
+ // Migrate table descriptor files if necessary
+ org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir
+ .migrateFSTableDescriptorsIfNecessary(fs, rd);
+ }
- // meta table is a system table, so descriptors are predefined,
- // we should get them from registry.
- FSTableDescriptors fsd = new FSTableDescriptors(c, fs, rd);
- fsd.createTableDescriptor(
- new HTableDescriptor(fsd.get(TableName.META_TABLE_NAME)));
+ // Create tableinfo-s for hbase:meta if not already there.
+ // meta table is a system table, so descriptors are predefined,
+ // we should get them from registry.
+ FSTableDescriptors fsd = new FSTableDescriptors(c, fs, rd);
+ fsd.createTableDescriptor(
+ new HTableDescriptor(fsd.get(TableName.META_TABLE_NAME)));
+ }
return rd;
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 852b6c4..47c2b7a 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -266,7 +266,7 @@ public class SplitLogManager {
// recover-lease is done. totalSize will be under in most cases and the
// metrics that it drives will also be under-reported.
totalSize += lf.getLen();
- String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
+ String pathToLog = FSUtils.removeLogRootPath(lf.getPath(), conf);
if (!enqueueSplitTask(pathToLog, batch)) {
throw new IOException("duplicate log split scheduled for " + lf.getPath());
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 9fa0a69..b9d5fe9 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -285,6 +285,7 @@ public class HRegionServer extends HasThread implements
// If false, the file system has become unavailable
protected volatile boolean fsOk;
protected HFileSystem fs;
+ protected HFileSystem logFs;
// Set when a report to the master comes back with a message asking us to
// shutdown. Also set by call to stop when debugging or running unit tests
@@ -306,6 +307,7 @@ public class HRegionServer extends HasThread implements
protected final Configuration conf;
private Path rootDir;
+ private Path logRootDir;
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -601,13 +603,16 @@ public class HRegionServer extends HasThread implements
}
private void initializeFileSystem() throws IOException {
+ // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
+ // checksum verification enabled, then automatically switch off hdfs checksum verification.
+ boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
+ FSUtils.setFsDefault(this.conf, FSUtils.getLogRootDir(this.conf));
+ this.logFs = new HFileSystem(this.conf, useHBaseChecksum);
+ this.logRootDir = FSUtils.getLogRootDir(this.conf);
// Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else
// underlying hadoop hdfs accessors will be going against wrong filesystem
// (unless all is set to defaults).
FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf));
- // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase
- // checksum verification enabled, then automatically switch off hdfs checksum verification.
- boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
this.fs = new HFileSystem(this.conf, useHBaseChecksum);
this.rootDir = FSUtils.getRootDir(this.conf);
this.tableDescriptors = new FSTableDescriptors(
@@ -1625,19 +1630,19 @@ public class HRegionServer extends HasThread implements
*/
private WALFactory setupWALAndReplication() throws IOException {
// TODO Replication make assumptions here based on the default filesystem impl
- final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ final Path oldLogDir = new Path(logRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString());
- Path logdir = new Path(rootDir, logName);
- if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
- if (this.fs.exists(logdir)) {
+ Path logDir = new Path(logRootDir, logName);
+ if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir);
+ if (this.logFs.exists(logDir)) {
throw new RegionServerRunningException("Region server has already " +
"created directory at " + this.serverName.toString());
}
// Instantiate replication manager if replication enabled. Pass it the
// log directories.
- createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir);
+ createNewReplicationInstance(conf, this, this.logFs, logDir, oldLogDir);
// listeners the wal factory will add to wals it creates.
final List listeners = new ArrayList();
@@ -2537,6 +2542,20 @@ public class HRegionServer extends HasThread implements
return fs;
}
+ /**
+ * @return Return the logRootDir.
+ */
+ protected Path getLogRootDir() {
+ return logRootDir;
+ }
+
+ /**
+ * @return Return the logFs.
+ */
+ public FileSystem getLogFileSystem() {
+ return logFs;
+ }
+
@Override
public String toString() {
return getServerName().toString();
@@ -2603,7 +2622,7 @@ public class HRegionServer extends HasThread implements
* Load the replication service objects, if any
*/
static private void createNewReplicationInstance(Configuration conf,
- HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{
+ HRegionServer server, FileSystem logFs, Path logDir, Path oldLogDir) throws IOException{
// If replication is not enabled, then return immediately.
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
@@ -2624,21 +2643,21 @@ public class HRegionServer extends HasThread implements
if (sourceClassname.equals(sinkClassname)) {
server.replicationSourceHandler = (ReplicationSourceService)
newReplicationInstance(sourceClassname,
- conf, server, fs, logDir, oldLogDir);
+ conf, server, logFs, logDir, oldLogDir);
server.replicationSinkHandler = (ReplicationSinkService)
server.replicationSourceHandler;
} else {
server.replicationSourceHandler = (ReplicationSourceService)
newReplicationInstance(sourceClassname,
- conf, server, fs, logDir, oldLogDir);
+ conf, server, logFs, logDir, oldLogDir);
server.replicationSinkHandler = (ReplicationSinkService)
newReplicationInstance(sinkClassname,
- conf, server, fs, logDir, oldLogDir);
+ conf, server, logFs, logDir, oldLogDir);
}
}
static private ReplicationService newReplicationInstance(String classname,
- Configuration conf, HRegionServer server, FileSystem fs, Path logDir,
+ Configuration conf, HRegionServer server, FileSystem logFs, Path logDir,
Path oldLogDir) throws IOException{
Class> clazz = null;
@@ -2652,7 +2671,7 @@ public class HRegionServer extends HasThread implements
// create an instance of the replication object.
ReplicationService service = (ReplicationService)
ReflectionUtils.newInstance(clazz, conf);
- service.initialize(server, fs, logDir, oldLogDir);
+ service.initialize(server, logFs, logDir, oldLogDir);
return service;
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index eeffa8b..5324eae 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -88,11 +88,11 @@ public class SplitLogWorker implements Runnable {
this(server, conf, server, new TaskExecutor() {
@Override
public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
- Path rootdir;
+ Path logDir;
FileSystem fs;
try {
- rootdir = FSUtils.getRootDir(conf);
- fs = rootdir.getFileSystem(conf);
+ logDir = FSUtils.getLogRootDir(conf);
+ fs = FSUtils.getLogRootDirFileSystem(conf);
} catch (IOException e) {
LOG.warn("could not find root dir or fs", e);
return Status.RESIGNED;
@@ -101,7 +101,7 @@ public class SplitLogWorker implements Runnable {
// interrupted or has encountered a transient error and when it has
// encountered a bad non-retry-able persistent error.
try {
- if (!WALSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
+ if (!WALSplitter.splitLogFile(logDir, fs.getFileStatus(new Path(logDir, filename)),
fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode, factory)) {
return Status.PREEMPTED;
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 459d598..6a14929 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1573,7 +1573,7 @@ public class FSHLog implements WAL {
private static void split(final Configuration conf, final Path p)
throws IOException {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FSUtils.getLogRootDirFileSystem(conf);
if (!fs.exists(p)) {
throw new FileNotFoundException(p.toString());
}
@@ -1581,7 +1581,7 @@ public class FSHLog implements WAL {
throw new IOException(p + " is not a directory");
}
- final Path baseDir = FSUtils.getRootDir(conf);
+ final Path baseDir = FSUtils.getLogRootDir(conf);
final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index be16d01..85118b2 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -750,9 +750,9 @@ public class ReplicationSource extends Thread
// to look at)
List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
LOG.info("NB dead servers : " + deadRegionServers.size());
- final Path rootDir = FSUtils.getRootDir(conf);
+ final Path logDir = FSUtils.getLogRootDir(conf);
for (String curDeadServerName : deadRegionServers) {
- final Path deadRsDirectory = new Path(rootDir,
+ final Path deadRsDirectory = new Path(logDir,
DefaultWALProvider.getWALDirectoryName(curDeadServerName));
Path[] locs = new Path[] {
new Path(deadRsDirectory, currentPath.getName()),
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index 8d38b09..64bed53 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -76,7 +76,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
Replication replication;
ReplicationSourceManager manager;
FileSystem fs;
- Path oldLogDir, logDir, rootDir;
+ Path oldLogDir, logDir, rootLogDir;
ZooKeeperWatcher zkw;
Abortable abortable = new Abortable() {
@@ -94,10 +94,10 @@ public class ReplicationSyncUp extends Configured implements Tool {
new ZooKeeperWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable,
true);
- rootDir = FSUtils.getRootDir(conf);
- fs = FileSystem.get(conf);
- oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+ rootLogDir = FSUtils.getRootDir(conf);
+ fs = FSUtils.getLogRootDirFileSystem(conf);
+ oldLogDir = new Path(rootLogDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ logDir = new Path(rootLogDir, HConstants.HREGION_LOGDIR_NAME);
System.out.println("Start Replication Server start");
replication = new Replication(new DummyServer(zkw), fs, logDir, oldLogDir);
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 8717948..764c8e1 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -83,6 +83,9 @@ import org.apache.hadoop.util.StringUtils;
import com.google.common.primitives.Ints;
+import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
+import static org.apache.hadoop.hbase.HConstants.HBASE_LOG_DIR;
+
/**
* Utility methods for interacting with the underlying file system.
*/
@@ -932,22 +935,22 @@ public abstract class FSUtils {
return root;
} catch (URISyntaxException e) {
IOException io = new IOException("Root directory path is not a valid " +
- "URI -- check your " + HConstants.HBASE_DIR + " configuration");
+ "URI -- check your " + HBASE_DIR + " configuration");
io.initCause(e);
throw io;
}
}
/**
- * Checks for the presence of the root path (using the provided conf object) in the given path. If
+ * Checks for the presence of the WAL log root path (using the provided conf object) in the given path. If
* it exists, this method removes it and returns the String representation of remaining relative path.
* @param path
* @param conf
* @return String representation of the remaining relative path
* @throws IOException
*/
- public static String removeRootPath(Path path, final Configuration conf) throws IOException {
- Path root = FSUtils.getRootDir(conf);
+ public static String removeLogRootPath(Path path, final Configuration conf) throws IOException {
+ Path root = FSUtils.getLogRootDir(conf);
String pathStr = path.toString();
// check that the path is absolute... it has the root path in it.
if (!pathStr.startsWith(root.toString())) return pathStr;
@@ -994,18 +997,44 @@ public abstract class FSUtils {
/**
* @param c configuration
- * @return Path to hbase root directory: i.e. hbase.rootdir from
+ * @return {@link Path} to hbase root directory: i.e. {@value Hconstants#HBASE_DIR} from
* configuration as a qualified Path.
* @throws IOException e
*/
public static Path getRootDir(final Configuration c) throws IOException {
- Path p = new Path(c.get(HConstants.HBASE_DIR));
+ Path p = new Path(c.get(HBASE_DIR));
FileSystem fs = p.getFileSystem(c);
return p.makeQualified(fs);
}
public static void setRootDir(final Configuration c, final Path root) throws IOException {
- c.set(HConstants.HBASE_DIR, root.toString());
+ c.set(HBASE_DIR, root.toString());
+ }
+
+ /**
+ * @param c configuration
+ * @return {@link Path} to hbase log root directory: i.e. {@value Hconstants#HBASE_LOG_DIR} from
+ * configuration as a qualified Path. Defaults to {@value Hconstants#HBASE_DIR}
+ * @throws IOException e
+ */
+ public static Path getLogRootDir(final Configuration c) throws IOException {
+ Path p = new Path(c.get(HBASE_LOG_DIR, c.get(HBASE_DIR)));
+ FileSystem fs = p.getFileSystem(c);
+ return p.makeQualified(fs);
+ }
+
+ public static void setLogRootDir(final Configuration c, final Path root) throws IOException {
+ c.set(HBASE_LOG_DIR, root.toString());
+ }
+
+ public static FileSystem getLogRootDirFileSystem(final Configuration c) throws IOException {
+ Path p = getLogRootDir(c);
+ return p.getFileSystem(c);
+ }
+
+ public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
+ Path p = getRootDir(c);
+ return p.getFileSystem(c);
}
public static void setFsDefault(final Configuration c, final Path root) throws IOException {
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
index c1fa079..c5de395 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java
@@ -47,8 +47,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
* A WAL Provider that returns a single thread safe WAL that writes to HDFS.
* By default, this implementation picks a directory in HDFS based on a combination of
*
- * - the HBase root directory
- *
- HConstants.HREGION_LOGDIR_NAME
+ *
- the HBase root log directory (can be changed via {@value HConstants#HBASE_LOG_DIR})
+ *
- {@value HConstants#HREGION_LOGDIR_NAME}
*
- the given factory's factoryId (usually identifying the regionserver by host:port)
*
* It also uses the providerId to diffentiate among files.
@@ -94,7 +94,7 @@ public class DefaultWALProvider implements WALProvider {
providerId = DEFAULT_PROVIDER_ID;
}
final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
- log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf),
+ log = new FSHLog(FSUtils.getLogRootDirFileSystem(conf), FSUtils.getLogRootDir(conf),
getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
}
@@ -266,14 +266,14 @@ public class DefaultWALProvider implements WALProvider {
throw new IllegalArgumentException("parameter conf must be set");
}
- final String rootDir = conf.get(HConstants.HBASE_DIR);
- if (rootDir == null || rootDir.isEmpty()) {
- throw new IllegalArgumentException(HConstants.HBASE_DIR
- + " key not found in conf.");
+ final String logDir = conf.get(HConstants.HBASE_LOG_DIR, conf.get(HConstants.HBASE_DIR));
+ if (logDir == null || logDir.isEmpty()) {
+ throw new IllegalArgumentException(String.format("%s key not found in conf.",
+ HConstants.HBASE_DIR));
}
- final StringBuilder startPathSB = new StringBuilder(rootDir);
- if (!rootDir.endsWith("/"))
+ final StringBuilder startPathSB = new StringBuilder(logDir);
+ if (!logDir.endsWith("/"))
startPathSB.append('/');
startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/"))
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 33785a6..f82ff1f 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -62,7 +62,7 @@ class DisabledWALProvider implements WALProvider {
if (null == providerId) {
providerId = "defaultDisabled";
}
- disabled = new DisabledWAL(new Path(FSUtils.getRootDir(conf), providerId), conf, null);
+ disabled = new DisabledWAL(new Path(FSUtils.getLogRootDir(conf), providerId), conf, null);
}
@Override
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 05d00de..6f035be 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -240,17 +240,17 @@ public class WALSplitter {
// log splitting. Used by tools and unit tests. It should be package private.
// It is public only because UpgradeTo96 and TestWALObserver are in different packages,
// which uses this method to do log splitting.
- public static List split(Path rootDir, Path logDir, Path oldLogDir,
+ public static List split(Path logRootDir, Path logDir, Path oldLogDir,
FileSystem fs, Configuration conf, final WALFactory factory) throws IOException {
final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
Collections.singletonList(logDir), null);
List splits = new ArrayList();
if (logfiles != null && logfiles.length > 0) {
for (FileStatus logfile: logfiles) {
- WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null,
+ WALSplitter s = new WALSplitter(factory, conf, logRootDir, fs, null, null,
RecoveryMode.LOG_SPLITTING);
if (s.splitLogFile(logfile, null)) {
- finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
+ finishSplitLogFile(logRootDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) {
splits.addAll(s.outputSink.splits);
}
@@ -438,7 +438,7 @@ public class WALSplitter {
*/
public static void finishSplitLogFile(String logfile,
Configuration conf) throws IOException {
- Path rootdir = FSUtils.getRootDir(conf);
+ Path rootdir = FSUtils.getLogRootDir(conf);
Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
Path logPath;
if (FSUtils.isStartingWithPath(rootdir, logfile)) {
@@ -481,7 +481,7 @@ public class WALSplitter {
final List corruptedLogs,
final List processedLogs, final Path oldLogDir,
final FileSystem fs, final Configuration conf) throws IOException {
- final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
+ final Path corruptDir = new Path(FSUtils.getLogRootDir(conf), conf.get(
"hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
if (!fs.mkdirs(corruptDir)) {
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index e15a9f6..94e09b4 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -746,7 +746,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
}
private MiniZooKeeperCluster startMiniZKCluster(final File dir)
- throws Exception {
+ throws Exception {
return startMiniZKCluster(dir, 1, null);
}
@@ -806,6 +806,16 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return startMiniCluster(1, 1);
}
+ /**
+ * Start up a minicluster of hbase, dfs, and zookeeper where WAL's logDir is created separately.
+ * @throws Exception
+ * @return Mini hbase cluster instance created.
+ * @see {@link #shutdownMiniDFSCluster()}
+ */
+ public MiniHBaseCluster startMiniClusterWithLogDir() throws Exception {
+ return startMiniClusterWithLogDir(1, 1);
+ }
+
/**
* Start up a minicluster of hbase, dfs, and zookeeper.
* Set the create flag to create root or data directory path or not
@@ -815,7 +825,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* @see {@link #shutdownMiniDFSCluster()}
*/
public MiniHBaseCluster startMiniCluster(final int numSlaves, boolean create)
- throws Exception {
+ throws Exception {
return startMiniCluster(1, numSlaves, create);
}
@@ -833,10 +843,15 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* @return Mini hbase cluster instance created.
*/
public MiniHBaseCluster startMiniCluster(final int numSlaves)
- throws Exception {
+ throws Exception {
return startMiniCluster(1, numSlaves, false);
}
+ public MiniHBaseCluster startMiniClusterWithLogDir(final int numSlaves)
+ throws Exception {
+ return startMiniClusterWithLogDir(1, numSlaves, false);
+ }
+
/**
* Start minicluster. Whether to create a new root or data dir path even if such a path
* has been created earlier is decided based on flag create
@@ -846,10 +861,17 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
*/
public MiniHBaseCluster startMiniCluster(final int numMasters,
final int numSlaves, boolean create)
- throws Exception {
- return startMiniCluster(numMasters, numSlaves, null, create);
+ throws Exception {
+ return startMiniCluster(numMasters, numSlaves, null, create);
}
+ public MiniHBaseCluster startMiniClusterWithLogDir(final int numMasters,
+ final int numSlaves, boolean create)
+ throws Exception {
+ return startMiniClusterWithLogDir(numMasters, numSlaves, null, create);
+ }
+
+
/**
* start minicluster
* @throws Exception
@@ -857,11 +879,17 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* @return Mini hbase cluster instance created.
*/
public MiniHBaseCluster startMiniCluster(final int numMasters,
- final int numSlaves)
- throws Exception {
+ final int numSlaves)
+ throws Exception {
return startMiniCluster(numMasters, numSlaves, null, false);
}
+ public MiniHBaseCluster startMiniClusterWithLogDir(final int numMasters,
+ final int numSlaves)
+ throws Exception {
+ return startMiniClusterWithLogDir(numMasters, numSlaves, null, false);
+ }
+
public MiniHBaseCluster startMiniCluster(final int numMasters,
final int numSlaves, final String[] dataNodeHosts, boolean create)
throws Exception {
@@ -869,6 +897,13 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
null, null, create);
}
+ public MiniHBaseCluster startMiniClusterWithLogDir(final int numMasters,
+ final int numSlaves, final String[] dataNodeHosts, boolean create)
+ throws Exception {
+ return startMiniClusterWithLogDir(numMasters, numSlaves, numSlaves, dataNodeHosts,
+ null, null, create);
+ }
+
/**
* Start up a minicluster of hbase, optionally dfs, and zookeeper.
* Modifies Configuration. Homes the cluster data directory under a random
@@ -994,12 +1029,52 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
// Start the MiniHBaseCluster
return startMiniHBaseCluster(numMasters, numSlaves, masterClass,
- regionserverClass, create);
+ regionserverClass, create, false);
+ }
+
+ /**
+ * Same as {@link #startMiniClusterWithLogDir(int, int, String[], Class, Class)}, but with custom
+ * number of datanodes.
+ * @param numDataNodes Number of data nodes.
+ * @param create Set this flag to create a new
+ * root or data directory path or not (will overwrite if exists already).
+ */
+ public MiniHBaseCluster startMiniClusterWithLogDir(final int numMasters,
+ final int numSlaves, int numDataNodes, final String[] dataNodeHosts,
+ Class extends HMaster> masterClass,
+ Class extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
+ boolean create)
+ throws Exception {
+ if (dataNodeHosts != null && dataNodeHosts.length != 0) {
+ numDataNodes = dataNodeHosts.length;
+ }
+
+ LOG.info("Starting up minicluster with " + numMasters + " master(s) and " +
+ numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)");
+ // If we already started a cluster, fail.
+ if (miniClusterRunning) {
+ throw new IllegalStateException("A mini-cluster is already running");
+ }
+ miniClusterRunning = true;
+ setupClusterTestDir();
+ System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath());
+ // Bring up mini dfs cluster. This spews a bunch of warnings about missing
+ // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
+ if(this.dfsCluster == null) {
+ dfsCluster = startMiniDFSCluster(numDataNodes, dataNodeHosts);
+ }
+ // Start up a zk cluster.
+ if (this.zkCluster == null) {
+ startMiniZKCluster(clusterTestDir);
+ }
+ // Start the MiniHBaseCluster
+ return startMiniHBaseCluster(numMasters, numSlaves, masterClass,
+ regionserverClass, create, true);
}
public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves)
throws IOException, InterruptedException{
- return startMiniHBaseCluster(numMasters, numSlaves, null, null, false);
+ return startMiniHBaseCluster(numMasters, numSlaves, null, null, false, false);
}
/**
@@ -1018,11 +1093,13 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
final int numSlaves, Class extends HMaster> masterClass,
Class extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass,
- boolean create)
- throws IOException, InterruptedException {
+ boolean create, boolean withLogDir)
+ throws IOException, InterruptedException {
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
createRootDir(create);
-
+ if (withLogDir) {
+ createLogRootDir(create);
+ }
// These settings will make the server waits until this exact number of
// regions servers are connected.
if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
@@ -1205,7 +1282,39 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public Path createRootDir() throws IOException {
return createRootDir(false);
}
-
+ /**
+ * Creates a hbase logdir in the user's home directory. Also creates hbase
+ * version file. Normally you won't make use of this method. Root hbaselogdir
+ * is created for you as part of mini cluster startup. You'd only use this
+ * method if you were doing manual operation.
+ *
+ * @param create This flag decides whether to get a new
+ * root or data directory path or not, if it has been fetched already.
+ * Note : Directory will be made irrespective of whether path has been fetched or not.
+ * If directory already exists, it will be overwritten
+ * @return Fully qualified path to hbase root dir
+ * @throws IOException
+ */
+ public Path createLogRootDir(boolean create) throws IOException {
+ FileSystem fs = FileSystem.get(this.conf);
+ String randomStr = UUID.randomUUID().toString();
+ Path logBase = new Path(getDefaultRootDirPath(create).getParent(), randomStr);
+ Path hbaseLogRootdir = new Path(logBase, "logRootDir");
+ FSUtils.setLogRootDir(this.conf, hbaseLogRootdir);
+ fs.mkdirs(hbaseLogRootdir);
+ FSUtils.setVersion(fs, hbaseLogRootdir);
+ return hbaseLogRootdir;
+ }
+ /**
+ * Same as {@link HBaseTestingUtility#createLogRootDir(boolean create)}
+ * except that create flag is false.
+ *
+ * @return Fully qualified path to hbase root dir
+ * @throws IOException
+ */
+ public Path createLogRootDir() throws IOException {
+ return createLogRootDir(false);
+ }
private void setHBaseFsTmpDir() throws IOException {
String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir");
@@ -1782,12 +1891,14 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
/**
* Create an unmanaged WAL. Be sure to close it when you're through.
*/
- public static WAL createWal(final Configuration conf, final Path rootDir, final HRegionInfo hri)
+ public static WAL createWal(final Configuration conf, final Path rootDir, final Path logRootDir,
+ final HRegionInfo hri)
throws IOException {
// The WAL subsystem will use the default rootDir rather than the passed in rootDir
// unless I pass along via the conf.
Configuration confForWAL = new Configuration(conf);
confForWAL.set(HConstants.HBASE_DIR, rootDir.toString());
+ confForWAL.set(HConstants.HBASE_LOG_DIR, logRootDir.toString());
return (new WALFactory(confForWAL,
Collections.singletonList(new MetricsWAL()),
"hregion-" + RandomStringUtils.randomNumeric(8))).
@@ -1799,8 +1910,17 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
*/
public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
+ final Path logRootDir, final Configuration conf, final HTableDescriptor htd) throws IOException {
+ return createRegionAndWAL(info, rootDir, logRootDir, conf, htd, true);
+ }
+
+ /**
+ * Create a region with a WAL under the rootDir. Be sure to call
+ * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
+ */
+ public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
final Configuration conf, final HTableDescriptor htd) throws IOException {
- return createRegionAndWAL(info, rootDir, conf, htd, true);
+ return createRegionAndWAL(info, rootDir, rootDir, conf, htd, true);
}
/**
@@ -1808,9 +1928,9 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
* {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
*/
public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir,
- final Configuration conf, final HTableDescriptor htd, boolean initialize)
+ final Path logRootDir, final Configuration conf, final HTableDescriptor htd, boolean initialize)
throws IOException {
- WAL wal = createWal(conf, rootDir, info);
+ WAL wal = createWal(conf, rootDir, logRootDir, info);
return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize);
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index a771c21..b9ddc97 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
-import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoadBase;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
@@ -324,7 +324,7 @@ public class TestReplicaWithCluster {
final List> famPaths = new ArrayList>();
for (HColumnDescriptor col : hdt.getColumnFamilies()) {
Path hfile = new Path(dir, col.getNameAsString());
- TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(),
+ TestHRegionServerBulkLoadBase.createHFile(HTU.getTestFileSystem(), hfile, col.getName(),
qual, val, numRows);
famPaths.add(new Pair(col.getName(), hfile.toString()));
}
@@ -334,7 +334,7 @@ public class TestReplicaWithCluster {
@SuppressWarnings("deprecation")
final HConnection conn = HTU.getHBaseAdmin().getConnection();
RegionServerCallable callable = new RegionServerCallable(
- conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
+ conn, hdt.getTableName(), TestHRegionServerBulkLoadBase.rowkey(0)) {
@Override
public Void call(int timeout) throws Exception {
LOG.debug("Going to connect to server " + getLocation() + " for row "
@@ -353,7 +353,7 @@ public class TestReplicaWithCluster {
// verify we can read them from the primary
LOG.debug("Verifying data load");
for (int i = 0; i < numRows; i++) {
- byte[] row = TestHRegionServerBulkLoad.rowkey(i);
+ byte[] row = TestHRegionServerBulkLoadBase.rowkey(i);
Get g = new Get(row);
Result r = table.get(g);
Assert.assertFalse(r.isStale());
@@ -364,7 +364,7 @@ public class TestReplicaWithCluster {
try {
SlowMeCopro.cdl.set(new CountDownLatch(1));
for (int i = 0; i < numRows; i++) {
- byte[] row = TestHRegionServerBulkLoad.rowkey(i);
+ byte[] row = TestHRegionServerBulkLoadBase.rowkey(i);
Get g = new Get(row);
g.setConsistency(Consistency.TIMELINE);
Result r = table.get(g);
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
index 256c0eb..fcb69f6 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
@@ -97,6 +97,7 @@ public class TestWALObserver {
private FileSystem fs;
private Path dir;
private Path hbaseRootDir;
+ private Path hbaseLogRootDir;
private String logName;
private Path oldLogDir;
private Path logDir;
@@ -115,8 +116,11 @@ public class TestWALObserver {
TEST_UTIL.startMiniCluster(1);
Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
.makeQualified(new Path("/hbase"));
+ Path hbaseLogRootDir = TEST_UTIL.getDFSCluster().getFileSystem()
+ .makeQualified(new Path("/hbaseLog"));
LOG.info("hbase.rootdir=" + hbaseRootDir);
FSUtils.setRootDir(conf, hbaseRootDir);
+ FSUtils.setLogRootDir(conf, hbaseLogRootDir);
}
@AfterClass
@@ -130,16 +134,20 @@ public class TestWALObserver {
// this.cluster = TEST_UTIL.getDFSCluster();
this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
this.hbaseRootDir = FSUtils.getRootDir(conf);
- this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName());
- this.oldLogDir = new Path(this.hbaseRootDir,
+ this.hbaseLogRootDir = FSUtils.getLogRootDir(conf);
+ this.dir = new Path(this.hbaseLogRootDir, TestWALObserver.class.getName());
+ this.oldLogDir = new Path(this.hbaseLogRootDir,
HConstants.HREGION_OLDLOGDIR_NAME);
- this.logDir = new Path(this.hbaseRootDir,
+ this.logDir = new Path(this.hbaseLogRootDir,
DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName()));
this.logName = HConstants.HREGION_LOGDIR_NAME;
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
}
+ if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseLogRootDir)) {
+ TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseLogRootDir, true);
+ }
this.wals = new WALFactory(conf, null, currentTest.getMethodName());
}
@@ -153,6 +161,7 @@ public class TestWALObserver {
LOG.debug("details of failure to close wal factory.", exception);
}
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
+ TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseLogRootDir, true);
}
/**
@@ -512,7 +521,7 @@ public class TestWALObserver {
private Path runWALSplit(final Configuration c) throws IOException {
List splits = WALSplitter.split(
- hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
+ hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
// Split should generate only 1 file since there's only 1 region
assertEquals(1, splits.size());
// Make sure the file exists
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterFromRegionSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterFromRegionSide.java
index c574a95..cf39095 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterFromRegionSide.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterFromRegionSide.java
@@ -84,7 +84,7 @@ public class TestFilterFromRegionSide {
}
HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
REGION = HBaseTestingUtility
- .createRegionAndWAL(info, TEST_UTIL.getDataTestDir(), TEST_UTIL.getConfiguration(), htd);
+ .createRegionAndWAL(info, TEST_UTIL.getDataTestDir(), TEST_UTIL.getConfiguration(), htd);
for(Put put:createPuts(ROWS, FAMILIES, QUALIFIERS, VALUE)){
REGION.put(put);
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java
index 1aa75a1..ac6e80f 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java
@@ -88,6 +88,8 @@ public class TestBlockReorder {
private static final String host1 = "host1";
private static final String host2 = "host2";
private static final String host3 = "host3";
+ private static Path rootDir;
+ private static Path logRootDir;
@Before
public void setUp() throws Exception {
@@ -101,10 +103,14 @@ public class TestBlockReorder {
conf = htu.getConfiguration();
cluster = htu.getDFSCluster();
dfs = (DistributedFileSystem) FileSystem.get(conf);
+ rootDir = htu.createRootDir();
+ logRootDir = htu.createLogRootDir();
}
@After
public void tearDownAfterClass() throws Exception {
+ dfs.delete(rootDir, true);
+ dfs.delete(logRootDir, true);
htu.shutdownMiniCluster();
}
@@ -277,7 +283,7 @@ public class TestBlockReorder {
// Now we need to find the log file, its locations, and look at it
- String rootDir = new Path(FSUtils.getRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
+ String logRootDir = new Path(FSUtils.getLogRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME +
"/" + targetRs.getServerName().toString()).toUri().getPath();
DistributedFileSystem mdfs = (DistributedFileSystem)
@@ -321,7 +327,7 @@ public class TestBlockReorder {
p.add(sb, sb, sb);
h.put(p);
- DirectoryListing dl = dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME);
+ DirectoryListing dl = dfs.getClient().listPaths(logRootDir, HdfsFileStatus.EMPTY_NAME);
HdfsFileStatus[] hfs = dl.getPartialListing();
// As we wrote a put, we should have at least one log file.
@@ -329,8 +335,8 @@ public class TestBlockReorder {
for (HdfsFileStatus hf : hfs) {
// Because this is a live cluster, log files might get archived while we're processing
try {
- LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir);
- String logFile = rootDir + "/" + hf.getLocalName();
+ LOG.info("Log file found: " + hf.getLocalName() + " in " + logRootDir);
+ String logFile = logRootDir + "/" + hf.getLocalName();
FileStatus fsLog = rfs.getFileStatus(new Path(logFile));
LOG.info("Checking log file: " + logFile);
@@ -457,7 +463,7 @@ public class TestBlockReorder {
// Should be reordered, as we pretend to be a file name with a compliant stuff
Assert.assertNotNull(conf.get(HConstants.HBASE_DIR));
Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty());
- String pseudoLogFile = conf.get(HConstants.HBASE_DIR) + "/" +
+ String pseudoLogFile = conf.get(HConstants.HBASE_LOG_DIR) + "/" +
HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile";
// Check that it will be possible to extract a ServerName from our construction
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
index 26583f3..293e3f3 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoadBase;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
@@ -80,7 +80,7 @@ import com.google.protobuf.ServiceException;
*/
@Category(LargeTests.class)
public class TestLoadIncrementalHFilesSplitRecovery {
- private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
+ private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadBase.class);
static HBaseTestingUtility util;
//used by secure subclass
@@ -115,7 +115,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
for (int i = 0; i < NUM_CFS; i++) {
Path testIn = new Path(dir, family(i));
- TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
+ TestHRegionServerBulkLoadBase.createHFile(fs, new Path(testIn, "hfile_" + i),
Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
}
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
index 343fc64..62b683f 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
@@ -30,6 +30,7 @@ import java.io.PrintStream;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -67,16 +69,28 @@ import org.mockito.stubbing.Answer;
public class TestWALPlayer {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static MiniHBaseCluster cluster;
+ private static Path rootDir;
+ private static Path logRootDir;
+ private static FileSystem fs;
+ private static FileSystem logFs;
+ private static Configuration conf;
@BeforeClass
public static void beforeClass() throws Exception {
TEST_UTIL.setJobWithoutMRCluster();
+ conf= TEST_UTIL.getConfiguration();
+ rootDir = TEST_UTIL.createRootDir();
+ logRootDir = TEST_UTIL.createLogRootDir();
+ fs = FSUtils.getRootDirFileSystem(conf);
+ logFs = FSUtils.getLogRootDirFileSystem(conf);
cluster = TEST_UTIL.startMiniCluster();
}
@AfterClass
public static void afterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
+ fs.delete(rootDir, true);
+ logFs.delete(logRootDir, true);
}
/**
@@ -108,7 +122,7 @@ public class TestWALPlayer {
WAL log = cluster.getRegionServer(0).getWAL(null);
log.rollWriter();
String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
- .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
+ .getLogRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
Configuration configuration= TEST_UTIL.getConfiguration();
WALPlayer player = new WALPlayer(configuration);
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
index 201ffe6..7bb00e6 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -64,7 +65,9 @@ public class TestWALRecordReader {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Configuration conf;
private static FileSystem fs;
+ private static FileSystem logFs;
private static Path hbaseDir;
+ private static Path logRootDir;
// visible for TestHLogRecordReader
static final TableName tableName = TableName.valueOf(getName());
private static final byte [] rowName = tableName.getName();
@@ -84,11 +87,8 @@ public class TestWALRecordReader {
@Before
public void setUp() throws Exception {
mvcc = new MultiVersionConcurrencyControl();
- FileStatus[] entries = fs.listStatus(hbaseDir);
- for (FileStatus dir : entries) {
- fs.delete(dir.getPath(), true);
- }
-
+ fs.delete(hbaseDir, true);
+ logFs.delete(logRootDir, true);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -102,8 +102,9 @@ public class TestWALRecordReader {
fs = TEST_UTIL.getDFSCluster().getFileSystem();
hbaseDir = TEST_UTIL.createRootDir();
-
- logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
+ logRootDir = TEST_UTIL.createLogRootDir();
+ logFs = FSUtils.getLogRootDirFileSystem(conf);
+ logDir = new Path(logRootDir, HConstants.HREGION_LOGDIR_NAME);
htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(family));
@@ -111,6 +112,8 @@ public class TestWALRecordReader {
@AfterClass
public static void tearDownAfterClass() throws Exception {
+ fs.delete(hbaseDir, true);
+ logFs.delete(logRootDir, true);
TEST_UTIL.shutdownMiniCluster();
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystemWithLogDir.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystemWithLogDir.java
new file mode 100644
index 0000000..c0f7e50
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystemWithLogDir.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+import static org.junit.Assert.*;
+
+/**
+ * Test the master filesystem in a local cluster
+ */
+@Category(MediumTests.class)
+public class TestMasterFileSystemWithLogDir {
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setupTest() throws Exception {
+ UTIL.startMiniClusterWithLogDir();
+ }
+
+ @AfterClass
+ public static void teardownTest() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testFsUriSetProperly() throws Exception {
+ HMaster master = UTIL.getMiniHBaseCluster().getMaster();
+ MasterFileSystem fs = master.getMasterFileSystem();
+ Path masterRoot = FSUtils.getRootDir(fs.conf);
+ Path rootDir = FSUtils.getRootDir(fs.getFileSystem().getConf());
+ assertEquals(masterRoot, rootDir);
+ assertEquals(FSUtils.getLogRootDir(UTIL.getConfiguration()), fs.getLogRootDir());
+ }
+
+}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index 118270a..8e35461 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -1,383 +1,40 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.hadoop.hbase.regionserver;
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
-import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
-import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.RegionServerCallable;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.RpcRetryingCaller;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALKey;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
-/**
- * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
- * the region server's bullkLoad functionality.
- */
-@Category(LargeTests.class)
-public class TestHRegionServerBulkLoad {
- private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
- private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
- private final static Configuration conf = UTIL.getConfiguration();
- private final static byte[] QUAL = Bytes.toBytes("qual");
- private final static int NUM_CFS = 10;
- public static int BLOCKSIZE = 64 * 1024;
- public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
-
- private final static byte[][] families = new byte[NUM_CFS][];
- static {
- for (int i = 0; i < NUM_CFS; i++) {
- families[i] = Bytes.toBytes(family(i));
- }
- }
-
- /**
- * Create a rowkey compatible with
- * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
- */
- public static byte[] rowkey(int i) {
- return Bytes.toBytes(String.format("row_%08d", i));
- }
-
- static String family(int i) {
- return String.format("family_%04d", i);
- }
-
- /**
- * Create an HFile with the given number of rows with a specified value.
- */
- public static void createHFile(FileSystem fs, Path path, byte[] family,
- byte[] qualifier, byte[] value, int numRows) throws IOException {
- HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE)
- .withCompression(COMPRESSION)
- .build();
- HFile.Writer writer = HFile
- .getWriterFactory(conf, new CacheConfig(conf))
- .withPath(fs, path)
- .withFileContext(context)
- .create();
- long now = System.currentTimeMillis();
- try {
- // subtract 2 since iterateOnSplits doesn't include boundary keys
- for (int i = 0; i < numRows; i++) {
- KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
- writer.append(kv);
- }
- writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(now));
- } finally {
- writer.close();
- }
- }
-
- /**
- * Thread that does full scans of the table looking for any partially
- * completed rows.
- *
- * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
- * handles. So every 10 iterations (500 file handles) it does a region
- * compaction to reduce the number of open file handles.
- */
- public static class AtomicHFileLoader extends RepeatingTestThread {
- final AtomicLong numBulkLoads = new AtomicLong();
- final AtomicLong numCompactions = new AtomicLong();
- private TableName tableName;
-
- public AtomicHFileLoader(TableName tableName, TestContext ctx,
- byte targetFamilies[][]) throws IOException {
- super(ctx);
- this.tableName = tableName;
- }
-
- public void doAnAction() throws Exception {
- long iteration = numBulkLoads.getAndIncrement();
- Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
- iteration));
-
- // create HFiles for different column families
- FileSystem fs = UTIL.getTestFileSystem();
- byte[] val = Bytes.toBytes(String.format("%010d", iteration));
- final List> famPaths = new ArrayList>(
- NUM_CFS);
- for (int i = 0; i < NUM_CFS; i++) {
- Path hfile = new Path(dir, family(i));
- byte[] fam = Bytes.toBytes(family(i));
- createHFile(fs, hfile, fam, QUAL, val, 1000);
- famPaths.add(new Pair(fam, hfile.toString()));
- }
-
- // bulk load HFiles
- final HConnection conn = UTIL.getHBaseAdmin().getConnection();
- RegionServerCallable callable =
- new RegionServerCallable(conn, tableName, Bytes.toBytes("aaa")) {
- @Override
- public Void call(int callTimeout) throws Exception {
- LOG.debug("Going to connect to server " + getLocation() + " for row "
- + Bytes.toStringBinary(getRow()));
- byte[] regionName = getLocation().getRegionInfo().getRegionName();
- BulkLoadHFileRequest request =
- RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
- getStub().bulkLoadHFile(null, request);
- return null;
- }
- };
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
- RpcRetryingCaller caller = factory. newCaller();
- caller.callWithRetries(callable, Integer.MAX_VALUE);
-
- // Periodically do compaction to reduce the number of open file handles.
- if (numBulkLoads.get() % 10 == 0) {
- // 10 * 50 = 500 open file handles!
- callable = new RegionServerCallable(conn, tableName, Bytes.toBytes("aaa")) {
- @Override
- public Void call(int callTimeout) throws Exception {
- LOG.debug("compacting " + getLocation() + " for row "
- + Bytes.toStringBinary(getRow()));
- AdminProtos.AdminService.BlockingInterface server =
- conn.getAdmin(getLocation().getServerName());
- CompactRegionRequest request =
- RequestConverter.buildCompactRegionRequest(
- getLocation().getRegionInfo().getRegionName(), true, null);
- server.compactRegion(null, request);
- numCompactions.incrementAndGet();
- return null;
- }
- };
- caller.callWithRetries(callable, Integer.MAX_VALUE);
- }
- }
- }
-
- /**
- * Thread that does full scans of the table looking for any partially
- * completed rows.
- */
- public static class AtomicScanReader extends RepeatingTestThread {
- byte targetFamilies[][];
- HTable table;
- AtomicLong numScans = new AtomicLong();
- AtomicLong numRowsScanned = new AtomicLong();
- TableName TABLE_NAME;
-
- public AtomicScanReader(TableName TABLE_NAME, TestContext ctx,
- byte targetFamilies[][]) throws IOException {
- super(ctx);
- this.TABLE_NAME = TABLE_NAME;
- this.targetFamilies = targetFamilies;
- table = new HTable(conf, TABLE_NAME);
- }
-
- public void doAnAction() throws Exception {
- Scan s = new Scan();
- for (byte[] family : targetFamilies) {
- s.addFamily(family);
- }
- ResultScanner scanner = table.getScanner(s);
-
- for (Result res : scanner) {
- byte[] lastRow = null, lastFam = null, lastQual = null;
- byte[] gotValue = null;
- for (byte[] family : targetFamilies) {
- byte qualifier[] = QUAL;
- byte thisValue[] = res.getValue(family, qualifier);
- if (gotValue != null && thisValue != null
- && !Bytes.equals(gotValue, thisValue)) {
-
- StringBuilder msg = new StringBuilder();
- msg.append("Failed on scan ").append(numScans)
- .append(" after scanning ").append(numRowsScanned)
- .append(" rows!\n");
- msg.append("Current was " + Bytes.toString(res.getRow()) + "/"
- + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
- + " = " + Bytes.toString(thisValue) + "\n");
- msg.append("Previous was " + Bytes.toString(lastRow) + "/"
- + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
- + " = " + Bytes.toString(gotValue));
- throw new RuntimeException(msg.toString());
- }
-
- lastFam = family;
- lastQual = qualifier;
- lastRow = res.getRow();
- gotValue = thisValue;
- }
- numRowsScanned.getAndIncrement();
- }
- numScans.getAndIncrement();
- }
- }
-
- /**
- * Creates a table with given table name and specified number of column
- * families if the table does not already exist.
- */
- private void setupTable(TableName table, int cfs) throws IOException {
- try {
- LOG.info("Creating table " + table);
- HTableDescriptor htd = new HTableDescriptor(table);
- for (int i = 0; i < 10; i++) {
- htd.addFamily(new HColumnDescriptor(family(i)));
- }
-
- UTIL.getHBaseAdmin().createTable(htd);
- } catch (TableExistsException tee) {
- LOG.info("Table " + table + " already exists");
- }
- }
-
- /**
- * Atomic bulk load.
- */
- @Test
- public void testAtomicBulkLoad() throws Exception {
- TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
-
- int millisToRun = 30000;
- int numScanners = 50;
-
- UTIL.startMiniCluster(1);
- try {
- WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
- FindBulkHBaseListener listener = new FindBulkHBaseListener();
- log.registerWALActionsListener(listener);
- runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
- assertThat(listener.isFound(), is(true));
- } finally {
- UTIL.shutdownMiniCluster();
- }
- }
-
- void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
- throws Exception {
- setupTable(tableName, 10);
-
- TestContext ctx = new TestContext(UTIL.getConfiguration());
-
- AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
- ctx.addThread(loader);
-
- List scanners = Lists.newArrayList();
- for (int i = 0; i < numScanners; i++) {
- AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
- scanners.add(scanner);
- ctx.addThread(scanner);
- }
-
- ctx.startThreads();
- ctx.waitFor(millisToRun);
- ctx.stop();
-
- LOG.info("Loaders:");
- LOG.info(" loaded " + loader.numBulkLoads.get());
- LOG.info(" compations " + loader.numCompactions.get());
-
- LOG.info("Scanners:");
- for (AtomicScanReader scanner : scanners) {
- LOG.info(" scanned " + scanner.numScans.get());
- LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
- }
- }
-
- /**
- * Run test on an HBase instance for 5 minutes. This assumes that the table
- * under test only has a single region.
- */
- public static void main(String args[]) throws Exception {
- try {
- Configuration c = HBaseConfiguration.create();
- TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad();
- test.setConf(c);
- test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
- } finally {
- System.exit(0); // something hangs (believe it is lru threadpool)
- }
- }
-
- private void setConf(Configuration c) {
- UTIL = new HBaseTestingUtility(c);
- }
-
- static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
- private boolean found = false;
-
- @Override
- public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
- for (Cell cell : logEdit.getCells()) {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- for (Map.Entry entry : kv.toStringMap().entrySet()) {
- if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
- found = true;
- }
+public class TestHRegionServerBulkLoad extends TestHRegionServerBulkLoadBase{
+ /**
+ * Atomic bulk load.
+ */
+ @Test
+ public void testAtomicBulkLoad() throws Exception {
+ TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
+
+ int millisToRun = 30000;
+ int numScanners = 50;
+
+ UTIL.startMiniCluster(1);
+ try {
+ Path logRootDir = UTIL.getHBaseCluster().getRegionServer(0).getLogRootDir();
+ Path rootDir = UTIL.getHBaseCluster().getRegionServer(0).getRootDir();
+ assertEquals(logRootDir, FSUtils.getLogRootDir(UTIL.getConfiguration()));
+ assertEquals(rootDir, FSUtils.getRootDir(UTIL.getConfiguration()));
+ WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
+ FindBulkHBaseListener listener = new FindBulkHBaseListener();
+ log.registerWALActionsListener(listener);
+ runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
+ assertThat(listener.isFound(), is(true));
+ } finally {
+ UTIL.shutdownMiniCluster();
}
- }
}
- public boolean isFound() {
- return found;
- }
- }
}
-
-
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadBase.java
new file mode 100644
index 0000000..50fd24d
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadBase.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
+ * the region server's bullkLoad functionality.
+ */
+@Category(LargeTests.class)
+public class TestHRegionServerBulkLoadBase {
+ private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadBase.class);
+ protected static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+ private final static Configuration conf = UTIL.getConfiguration();
+ private final static byte[] QUAL = Bytes.toBytes("qual");
+ private final static int NUM_CFS = 10;
+ public static int BLOCKSIZE = 64 * 1024;
+ public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
+
+ private final static byte[][] families = new byte[NUM_CFS][];
+ static {
+ for (int i = 0; i < NUM_CFS; i++) {
+ families[i] = Bytes.toBytes(family(i));
+ }
+ }
+
+ /**
+ * Create a rowkey compatible with
+ * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
+ */
+ public static byte[] rowkey(int i) {
+ return Bytes.toBytes(String.format("row_%08d", i));
+ }
+
+ static String family(int i) {
+ return String.format("family_%04d", i);
+ }
+
+ /**
+ * Create an HFile with the given number of rows with a specified value.
+ */
+ public static void createHFile(FileSystem fs, Path path, byte[] family,
+ byte[] qualifier, byte[] value, int numRows) throws IOException {
+ HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE)
+ .withCompression(COMPRESSION)
+ .build();
+ HFile.Writer writer = HFile
+ .getWriterFactory(conf, new CacheConfig(conf))
+ .withPath(fs, path)
+ .withFileContext(context)
+ .create();
+ long now = System.currentTimeMillis();
+ try {
+ // subtract 2 since iterateOnSplits doesn't include boundary keys
+ for (int i = 0; i < numRows; i++) {
+ KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
+ writer.append(kv);
+ }
+ writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(now));
+ } finally {
+ writer.close();
+ }
+ }
+
+ /**
+ * Thread that does full scans of the table looking for any partially
+ * completed rows.
+ *
+ * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
+ * handles. So every 10 iterations (500 file handles) it does a region
+ * compaction to reduce the number of open file handles.
+ */
+ public static class AtomicHFileLoader extends RepeatingTestThread {
+ final AtomicLong numBulkLoads = new AtomicLong();
+ final AtomicLong numCompactions = new AtomicLong();
+ private TableName tableName;
+
+ public AtomicHFileLoader(TableName tableName, TestContext ctx,
+ byte targetFamilies[][]) throws IOException {
+ super(ctx);
+ this.tableName = tableName;
+ }
+
+ public void doAnAction() throws Exception {
+ long iteration = numBulkLoads.getAndIncrement();
+ Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
+ iteration));
+
+ // create HFiles for different column families
+ FileSystem fs = UTIL.getTestFileSystem();
+ byte[] val = Bytes.toBytes(String.format("%010d", iteration));
+ final List> famPaths = new ArrayList>(
+ NUM_CFS);
+ for (int i = 0; i < NUM_CFS; i++) {
+ Path hfile = new Path(dir, family(i));
+ byte[] fam = Bytes.toBytes(family(i));
+ createHFile(fs, hfile, fam, QUAL, val, 1000);
+ famPaths.add(new Pair(fam, hfile.toString()));
+ }
+
+ // bulk load HFiles
+ final HConnection conn = UTIL.getHBaseAdmin().getConnection();
+ RegionServerCallable callable =
+ new RegionServerCallable(conn, tableName, Bytes.toBytes("aaa")) {
+ @Override
+ public Void call(int callTimeout) throws Exception {
+ LOG.debug("Going to connect to server " + getLocation() + " for row "
+ + Bytes.toStringBinary(getRow()));
+ byte[] regionName = getLocation().getRegionInfo().getRegionName();
+ BulkLoadHFileRequest request =
+ RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true);
+ getStub().bulkLoadHFile(null, request);
+ return null;
+ }
+ };
+ RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
+ RpcRetryingCaller caller = factory. newCaller();
+ caller.callWithRetries(callable, Integer.MAX_VALUE);
+
+ // Periodically do compaction to reduce the number of open file handles.
+ if (numBulkLoads.get() % 10 == 0) {
+ // 10 * 50 = 500 open file handles!
+ callable = new RegionServerCallable(conn, tableName, Bytes.toBytes("aaa")) {
+ @Override
+ public Void call(int callTimeout) throws Exception {
+ LOG.debug("compacting " + getLocation() + " for row "
+ + Bytes.toStringBinary(getRow()));
+ AdminProtos.AdminService.BlockingInterface server =
+ conn.getAdmin(getLocation().getServerName());
+ CompactRegionRequest request =
+ RequestConverter.buildCompactRegionRequest(
+ getLocation().getRegionInfo().getRegionName(), true, null);
+ server.compactRegion(null, request);
+ numCompactions.incrementAndGet();
+ return null;
+ }
+ };
+ caller.callWithRetries(callable, Integer.MAX_VALUE);
+ }
+ }
+ }
+
+ /**
+ * Thread that does full scans of the table looking for any partially
+ * completed rows.
+ */
+ public static class AtomicScanReader extends RepeatingTestThread {
+ byte targetFamilies[][];
+ HTable table;
+ AtomicLong numScans = new AtomicLong();
+ AtomicLong numRowsScanned = new AtomicLong();
+ TableName TABLE_NAME;
+
+ public AtomicScanReader(TableName TABLE_NAME, TestContext ctx,
+ byte targetFamilies[][]) throws IOException {
+ super(ctx);
+ this.TABLE_NAME = TABLE_NAME;
+ this.targetFamilies = targetFamilies;
+ table = new HTable(conf, TABLE_NAME);
+ }
+
+ public void doAnAction() throws Exception {
+ Scan s = new Scan();
+ for (byte[] family : targetFamilies) {
+ s.addFamily(family);
+ }
+ ResultScanner scanner = table.getScanner(s);
+
+ for (Result res : scanner) {
+ byte[] lastRow = null, lastFam = null, lastQual = null;
+ byte[] gotValue = null;
+ for (byte[] family : targetFamilies) {
+ byte qualifier[] = QUAL;
+ byte thisValue[] = res.getValue(family, qualifier);
+ if (gotValue != null && thisValue != null
+ && !Bytes.equals(gotValue, thisValue)) {
+
+ StringBuilder msg = new StringBuilder();
+ msg.append("Failed on scan ").append(numScans)
+ .append(" after scanning ").append(numRowsScanned)
+ .append(" rows!\n");
+ msg.append("Current was " + Bytes.toString(res.getRow()) + "/"
+ + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
+ + " = " + Bytes.toString(thisValue) + "\n");
+ msg.append("Previous was " + Bytes.toString(lastRow) + "/"
+ + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
+ + " = " + Bytes.toString(gotValue));
+ throw new RuntimeException(msg.toString());
+ }
+
+ lastFam = family;
+ lastQual = qualifier;
+ lastRow = res.getRow();
+ gotValue = thisValue;
+ }
+ numRowsScanned.getAndIncrement();
+ }
+ numScans.getAndIncrement();
+ }
+ }
+
+ /**
+ * Creates a table with given table name and specified number of column
+ * families if the table does not already exist.
+ */
+ private void setupTable(TableName table, int cfs) throws IOException {
+ try {
+ LOG.info("Creating table " + table);
+ HTableDescriptor htd = new HTableDescriptor(table);
+ for (int i = 0; i < 10; i++) {
+ htd.addFamily(new HColumnDescriptor(family(i)));
+ }
+
+ UTIL.getHBaseAdmin().createTable(htd);
+ } catch (TableExistsException tee) {
+ LOG.info("Table " + table + " already exists");
+ }
+ }
+
+ void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
+ throws Exception {
+ setupTable(tableName, 10);
+
+ TestContext ctx = new TestContext(UTIL.getConfiguration());
+
+ AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
+ ctx.addThread(loader);
+
+ List scanners = Lists.newArrayList();
+ for (int i = 0; i < numScanners; i++) {
+ AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
+ scanners.add(scanner);
+ ctx.addThread(scanner);
+ }
+
+ ctx.startThreads();
+ ctx.waitFor(millisToRun);
+ ctx.stop();
+
+ LOG.info("Loaders:");
+ LOG.info(" loaded " + loader.numBulkLoads.get());
+ LOG.info(" compations " + loader.numCompactions.get());
+
+ LOG.info("Scanners:");
+ for (AtomicScanReader scanner : scanners) {
+ LOG.info(" scanned " + scanner.numScans.get());
+ LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
+ }
+ }
+
+ /**
+ * Run test on an HBase instance for 5 minutes. This assumes that the table
+ * under test only has a single region.
+ */
+ public static void main(String args[]) throws Exception {
+ try {
+ Configuration c = HBaseConfiguration.create();
+ TestHRegionServerBulkLoadBase test = new TestHRegionServerBulkLoadBase();
+ test.setConf(c);
+ test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
+ } finally {
+ System.exit(0); // something hangs (believe it is lru threadpool)
+ }
+ }
+
+ private void setConf(Configuration c) {
+ UTIL = new HBaseTestingUtility(c);
+ }
+
+ static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
+ private boolean found = false;
+
+ @Override
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
+ for (Cell cell : logEdit.getCells()) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ for (Map.Entry entry : kv.toStringMap().entrySet()) {
+ if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
+ found = true;
+ }
+ }
+ }
+ }
+
+ public boolean isFound() {
+ return found;
+ }
+ }
+}
+
+
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWIthLogDir.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWIthLogDir.java
new file mode 100644
index 0000000..3802b39
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWIthLogDir.java
@@ -0,0 +1,40 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+
+public class TestHRegionServerBulkLoadWIthLogDir extends TestHRegionServerBulkLoadBase{
+ /**
+ * Atomic bulk load.
+ */
+ @Test
+ public void testAtomicBulkLoadWithLogDir() throws Exception {
+ TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
+
+ int millisToRun = 30000;
+ int numScanners = 50;
+
+ UTIL.startMiniClusterWithLogDir(1);
+ assertNotEquals(FSUtils.getLogRootDir(UTIL.getConfiguration()),FSUtils.getRootDir(UTIL.getConfiguration()));
+ try {
+ Path logRootDir = UTIL.getHBaseCluster().getRegionServer(0).getLogRootDir();
+ Path rootDir = UTIL.getHBaseCluster().getRegionServer(0).getRootDir();
+ assertEquals(logRootDir, FSUtils.getLogRootDir(UTIL.getConfiguration()));
+ assertEquals(rootDir, FSUtils.getRootDir(UTIL.getConfiguration()));
+ WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
+ FindBulkHBaseListener listener = new FindBulkHBaseListener();
+ log.registerWALActionsListener(listener);
+ runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
+ assertThat(listener.isFound(), is(true));
+ } finally {
+ UTIL.shutdownMiniCluster();
+ }
+ }
+
+}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index e09b621..ab5083d 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -88,6 +88,8 @@ public class TestFSHLog {
protected static Configuration conf;
protected static FileSystem fs;
protected static Path dir;
+ protected static Path rootdir;
+ protected static Path logRootdir;
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Rule
@@ -99,8 +101,9 @@ public class TestFSHLog {
for (FileStatus dir : entries) {
fs.delete(dir.getPath(), true);
}
- final Path hbaseDir = TEST_UTIL.createRootDir();
- dir = new Path(hbaseDir, currentTest.getMethodName());
+ rootdir = TEST_UTIL.createRootDir();
+ logRootdir = TEST_UTIL.createLogRootDir();
+ dir = new Path(logRootdir, currentTest.getMethodName());
}
@After
@@ -133,6 +136,8 @@ public class TestFSHLog {
@AfterClass
public static void tearDownAfterClass() throws Exception {
+ fs.delete(rootdir, true);
+ fs.delete(logRootdir, true);
TEST_UTIL.shutdownMiniCluster();
}
@@ -144,7 +149,7 @@ public class TestFSHLog {
// test to see whether the coprocessor is loaded or not.
FSHLog log = null;
try {
- log = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
+ log = new FSHLog(fs, logRootdir, dir.toString(),
HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
WALCoprocessorHost host = log.getCoprocessorHost();
Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName());
@@ -195,7 +200,7 @@ public class TestFSHLog {
FSHLog wal1 = null;
FSHLog walMeta = null;
try {
- wal1 = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
+ wal1 = new FSHLog(fs, logRootdir, dir.toString(),
HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
LOG.debug("Log obtained is: " + wal1);
Comparator comp = wal1.LOG_NAME_COMPARATOR;
@@ -205,7 +210,7 @@ public class TestFSHLog {
assertTrue(comp.compare(p1, p1) == 0);
// comparing with different filenum.
assertTrue(comp.compare(p1, p2) < 0);
- walMeta = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(),
+ walMeta = new FSHLog(fs, logRootdir, dir.toString(),
HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null,
DefaultWALProvider.META_WAL_PROVIDER_ID);
Comparator compMeta = walMeta.LOG_NAME_COMPARATOR;
@@ -253,7 +258,7 @@ public class TestFSHLog {
LOG.debug("testFindMemStoresEligibleForFlush");
Configuration conf1 = HBaseConfiguration.create(conf);
conf1.setInt("hbase.regionserver.maxlogs", 1);
- FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(),
+ FSHLog wal = new FSHLog(fs, logRootdir, dir.toString(),
HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
HTableDescriptor t1 =
new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
@@ -330,7 +335,7 @@ public class TestFSHLog {
@Test(expected=IOException.class)
public void testFailedToCreateWALIfParentRenamed() throws IOException {
final String name = "testFailedToCreateWALIfParentRenamed";
- FSHLog log = new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME,
+ FSHLog log = new FSHLog(fs, logRootdir, name, HConstants.HREGION_OLDLOGDIR_NAME,
conf, null, true, null, null);
long filenum = System.currentTimeMillis();
Path path = log.computeFilename(filenum);
@@ -359,13 +364,13 @@ public class TestFSHLog {
final byte[] rowName = tableName.getName();
final HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor("f"));
- HRegion r = HRegion.createHRegion(hri, TEST_UTIL.getDefaultRootDirPath(),
+ HRegion r = HRegion.createHRegion(hri, rootdir,
TEST_UTIL.getConfiguration(), htd);
HRegion.closeHRegion(r);
final int countPerFamily = 10;
final MutableBoolean goslow = new MutableBoolean(false);
// subclass and doctor a method.
- FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(),
+ FSHLog wal = new FSHLog(FileSystem.get(conf), logRootdir,
testName, conf) {
@Override
void atHeadOfRingBufferEventHandlerAppend() {
@@ -377,7 +382,7 @@ public class TestFSHLog {
}
};
HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
- TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
+ TEST_UTIL.getTestFileSystem(), rootdir, hri, htd, wal);
EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
try {
List puts = null;
@@ -430,7 +435,7 @@ public class TestFSHLog {
SecurityException, IllegalArgumentException, IllegalAccessException {
final String name = "testSyncRunnerIndexOverflow";
FSHLog log =
- new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
+ new FSHLog(fs, FSUtils.getLogRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, conf,
null, true, null, null);
try {
Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
index 440120e..b1170ca 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
@@ -70,8 +70,9 @@ public class TestLogRollAbort {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
/* For the split-then-roll test */
- private static final Path HBASEDIR = new Path("/hbase");
- private static final Path OLDLOGDIR = new Path(HBASEDIR, HConstants.HREGION_OLDLOGDIR_NAME);
+ private static final Path HBASE_DIR = new Path("/hbase");
+ private static final Path HBASE_LOG_DIR = new Path("/hbaselog");
+ private static final Path OLD_LOG_DIR = new Path(HBASE_LOG_DIR, HConstants.HREGION_OLDLOGDIR_NAME);
// Need to override this setup so we can edit the config before it gets sent
// to the HDFS & HBase cluster startup.
@@ -111,7 +112,8 @@ public class TestLogRollAbort {
// disable region rebalancing (interferes with log watching)
cluster.getMaster().balanceSwitch(false);
- FSUtils.setRootDir(conf, HBASEDIR);
+ FSUtils.setRootDir(conf, HBASE_DIR);
+ FSUtils.setLogRootDir(conf, HBASE_LOG_DIR);
}
@After
@@ -183,7 +185,7 @@ public class TestLogRollAbort {
public void testLogRollAfterSplitStart() throws IOException {
LOG.info("Verify wal roll after split starts will fail.");
String logName = "testLogRollAfterSplitStart";
- Path thisTestsDir = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(logName));
+ Path thisTestsDir = new Path(HBASE_LOG_DIR, DefaultWALProvider.getWALDirectoryName(logName));
final WALFactory wals = new WALFactory(conf, null, logName);
try {
@@ -219,7 +221,7 @@ public class TestLogRollAbort {
LOG.debug("Renamed region directory: " + rsSplitDir);
LOG.debug("Processing the old log files.");
- WALSplitter.split(HBASEDIR, rsSplitDir, OLDLOGDIR, fs, conf, wals);
+ WALSplitter.split(HBASE_LOG_DIR, rsSplitDir, OLD_LOG_DIR, fs, conf, wals);
LOG.debug("Trying to roll the WAL.");
try {
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
index bf746d4..16f67f2 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
@@ -55,19 +55,25 @@ public class TestWALActionsListener {
private final static byte[] SOME_BYTES = Bytes.toBytes("t");
private static FileSystem fs;
private static Configuration conf;
+ private static Path rootDir;
+ private static Path logRootDir;
+ private static FileSystem logFs;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = TEST_UTIL.getConfiguration();
conf.setInt("hbase.regionserver.maxlogs", 5);
- fs = FileSystem.get(conf);
- FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir());
+ rootDir = TEST_UTIL.createRootDir();
+ logRootDir = TEST_UTIL.createLogRootDir();
+ fs = FSUtils.getRootDirFileSystem(conf);
+ logFs = FSUtils.getLogRootDirFileSystem(conf);
}
@Before
public void setUp() throws Exception {
fs.delete(new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME), true);
- fs.delete(new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME), true);
+ logFs.delete(new Path(logRootDir, HConstants.HREGION_LOGDIR_NAME), true);
+ logFs.delete(new Path(logRootDir, HConstants.HREGION_OLDLOGDIR_NAME), true);
}
@After
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 4c34823..e869183 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -108,6 +108,7 @@ public class TestWALReplay {
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
private Path hbaseRootDir = null;
+ private Path hbaseLogRootDir = null;
private String logName;
private Path oldLogDir;
private Path logDir;
@@ -129,8 +130,11 @@ public class TestWALReplay {
TEST_UTIL.startMiniCluster(3);
Path hbaseRootDir =
TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
- LOG.info("hbase.rootdir=" + hbaseRootDir);
+ Path hbaseLogRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbaselog"));
+ LOG.info(HConstants.HBASE_DIR + "=" + hbaseRootDir);
+ LOG.info(HConstants.HBASE_LOG_DIR + "=" + hbaseLogRootDir);
FSUtils.setRootDir(conf, hbaseRootDir);
+ FSUtils.setRootDir(conf, hbaseLogRootDir);
}
@AfterClass
@@ -143,12 +147,16 @@ public class TestWALReplay {
this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
this.hbaseRootDir = FSUtils.getRootDir(this.conf);
- this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ this.hbaseLogRootDir = FSUtils.getLogRootDir(this.conf);
+ this.oldLogDir = new Path(this.hbaseLogRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
this.logName = DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName() + "-manual");
- this.logDir = new Path(this.hbaseRootDir, logName);
+ this.logDir = new Path(this.hbaseLogRootDir, logName);
if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
}
+ if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseLogRootDir)) {
+ TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseLogRootDir, true);
+ }
this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ?
RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING);
this.wals = new WALFactory(conf, null, currentTest.getMethodName());
@@ -158,6 +166,7 @@ public class TestWALReplay {
public void tearDown() throws Exception {
this.wals.close();
TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
+ TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseLogRootDir, true);
}
/*
@@ -943,7 +952,7 @@ public class TestWALReplay {
final byte[] rowName = tableName.getName();
final int countPerFamily = 10;
final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
- HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
+ HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, hbaseLogRootDir, this.conf, htd);
Path regionDir = region1.getRegionFileSystem().getRegionDir();
HBaseTestingUtility.closeRegionAndWAL(region1);
@@ -1030,9 +1039,9 @@ public class TestWALReplay {
static class MockWAL extends FSHLog {
boolean doCompleteCacheFlush = false;
- public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf)
+ public MockWAL(FileSystem fs, Path logRootDir, String logName, Configuration conf)
throws IOException {
- super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
+ super(fs, logRootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null);
}
@Override
@@ -1052,7 +1061,7 @@ public class TestWALReplay {
}
private MockWAL createMockWAL() throws IOException {
- MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf);
+ MockWAL wal = new MockWAL(fs, hbaseLogRootDir, logName, conf);
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
@@ -1144,7 +1153,7 @@ public class TestWALReplay {
*/
private Path runWALSplit(final Configuration c) throws IOException {
List splits = WALSplitter.split(
- hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
+ hbaseLogRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);
// Split should generate only 1 file since there's only 1 region
assertEquals("splits=" + splits, 1, splits.size());
// Make sure the file exists
@@ -1159,7 +1168,7 @@ public class TestWALReplay {
* @throws IOException
*/
private WAL createWAL(final Configuration c) throws IOException {
- FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
+ FSHLog wal = new FSHLog(FileSystem.get(c), hbaseLogRootDir, logName, c);
// Set down maximum recovery so we dfsclient doesn't linger retrying something
// long gone.
HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1);
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index 2699292..24d604f 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -381,6 +381,43 @@ public class TestFSUtils {
verifyFileInDirWithStoragePolicy("1772");
}
+ @Test
+ public void testSetLogRootDir() throws Exception {
+ HBaseTestingUtility htu = new HBaseTestingUtility();
+ Configuration conf = htu.getConfiguration();
+ Path p = new Path("file:///hbase/root");
+ FSUtils.setLogRootDir(conf, p);
+ assertEquals(p.toString(), conf.get(HConstants.HBASE_LOG_DIR));
+ }
+
+ @Test
+ public void testGetLogRootDir() throws Exception {
+ HBaseTestingUtility htu = new HBaseTestingUtility();
+ Configuration conf = htu.getConfiguration();
+ Path root = new Path("file:///hbase/root");
+ Path logRoot = new Path("file:///hbase/logroot");
+ FSUtils.setRootDir(conf, root);
+ assertEquals(FSUtils.getRootDir(conf), root);
+ assertEquals(FSUtils.getLogRootDir(conf), root);
+ FSUtils.setLogRootDir(conf, logRoot);
+ assertEquals(FSUtils.getLogRootDir(conf), logRoot);
+ }
+
+ @Test
+ public void testRemoveLogRootPath() throws Exception {
+ HBaseTestingUtility htu = new HBaseTestingUtility();
+ Configuration conf = htu.getConfiguration();
+ FSUtils.setRootDir(conf, new Path("file:///user/hbase"));
+ Path testFile = new Path(FSUtils.getRootDir(conf), "test/testfile");
+ Path tmpFile = new Path("file:///test/testfile");
+ assertEquals(FSUtils.removeLogRootPath(testFile, conf), "test/testfile");
+ assertEquals(FSUtils.removeLogRootPath(tmpFile, conf), tmpFile.toString());
+ FSUtils.setLogRootDir(conf, new Path("file:///user/hbaseLogDir"));
+ assertEquals(FSUtils.removeLogRootPath(testFile, conf), testFile.toString());
+ Path logFile = new Path(FSUtils.getLogRootDir(conf), "test/testlog");
+ assertEquals(FSUtils.removeLogRootPath(logFile, conf), "test/testlog");
+ }
+
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
assertTrue(fileSys.exists(name));
assertTrue(fileSys.delete(name, true));
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java
index 3029601..771a81c 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java
@@ -29,6 +29,7 @@ import java.util.UUID;
import java.util.Set;
import java.util.HashSet;
+import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -60,15 +61,21 @@ public class TestFSVisitor {
private Set serverLogs;
private FileSystem fs;
+ private FileSystem logFs;
private Path tableDir;
private Path logsDir;
private Path rootDir;
+ private Path logRootDir;
+ private Configuration conf;
@Before
public void setUp() throws Exception {
- fs = FileSystem.get(TEST_UTIL.getConfiguration());
- rootDir = TEST_UTIL.getDataTestDir("hbase");
- logsDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+ conf = TEST_UTIL.getConfiguration();
+ rootDir = TEST_UTIL.createRootDir();
+ logRootDir = TEST_UTIL.createLogRootDir();
+ logsDir = new Path(logRootDir, HConstants.HREGION_LOGDIR_NAME);
+ fs = FSUtils.getRootDirFileSystem(conf);
+ logFs = FSUtils.getLogRootDirFileSystem(conf);
tableFamilies = new HashSet();
tableRegions = new HashSet();
@@ -85,6 +92,7 @@ public class TestFSVisitor {
@After
public void tearDown() throws Exception {
fs.delete(rootDir, true);
+ logFs.delete(logRootDir, true);
}
@Test
@@ -124,7 +132,7 @@ public class TestFSVisitor {
public void testVisitLogFiles() throws IOException {
final Set servers = new HashSet();
final Set logs = new HashSet();
- FSVisitor.visitLogFiles(fs, rootDir, new FSVisitor.LogFileVisitor() {
+ FSVisitor.visitLogFiles(logFs, logRootDir, new FSVisitor.LogFileVisitor() {
public void logFile (final String server, final String logfile) throws IOException {
servers.add(server);
logs.add(logfile);
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index d2581a1..b1800f8 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -101,7 +101,7 @@ public class IOTestProvider implements WALProvider {
providerId = DEFAULT_PROVIDER_ID;
}
final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
- log = new IOTestWAL(FileSystem.get(conf), FSUtils.getRootDir(conf),
+ log = new IOTestWAL(FSUtils.getLogRootDirFileSystem(conf), FSUtils.getLogRootDir(conf),
DefaultWALProvider.getWALDirectoryName(factory.factoryId),
HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
index 4402909..ce8f2de 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java
@@ -66,6 +66,9 @@ public class TestDefaultWALProvider {
protected static Configuration conf;
protected static FileSystem fs;
+ protected static FileSystem logFs;
+ protected static Path rootDir;
+ protected static Path logRootDir;
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected MultiVersionConcurrencyControl mvcc;
@@ -79,6 +82,8 @@ public class TestDefaultWALProvider {
for (FileStatus dir : entries) {
fs.delete(dir.getPath(), true);
}
+ fs.delete(rootDir, true);
+ logFs.delete(logRootDir, true);
}
@After
@@ -104,13 +109,17 @@ public class TestDefaultWALProvider {
TEST_UTIL.startMiniDFSCluster(3);
// Set up a working space for our tests.
- TEST_UTIL.createRootDir();
+ rootDir = TEST_UTIL.createRootDir();
+ logRootDir = TEST_UTIL.createLogRootDir();
conf = TEST_UTIL.getConfiguration();
- fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ fs = FSUtils.getRootDirFileSystem(conf);
+ logFs = FSUtils.getLogRootDirFileSystem(conf);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
+ fs.delete(rootDir, true);
+ logFs.delete(logRootDir, true);
TEST_UTIL.shutdownMiniCluster();
}
@@ -121,13 +130,13 @@ public class TestDefaultWALProvider {
@Test
public void testGetServerNameFromWALDirectoryName() throws IOException {
ServerName sn = ServerName.valueOf("hn", 450, 1398);
- String hl = FSUtils.getRootDir(conf) + "/" +
+ String hl = FSUtils.getLogRootDir(conf) + "/" +
DefaultWALProvider.getWALDirectoryName(sn.toString());
// Must not throw exception
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, null));
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
- FSUtils.getRootDir(conf).toUri().toString()));
+ FSUtils.getLogRootDir(conf).toUri().toString()));
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, ""));
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, " "));
assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl));
@@ -136,7 +145,7 @@ public class TestDefaultWALProvider {
final String wals = "/WALs/";
ServerName parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
- FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
+ FSUtils.getLogRootDir(conf).toUri().toString() + wals + sn +
"/localhost%2C32984%2C1343316388997.1343316390417");
assertEquals("standard", sn, parsed);
@@ -144,7 +153,7 @@ public class TestDefaultWALProvider {
assertEquals("subdir", sn, parsed);
parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf,
- FSUtils.getRootDir(conf).toUri().toString() + wals + sn +
+ FSUtils.getLogRootDir(conf).toUri().toString() + wals + sn +
"-splitting/localhost%3A57020.1340474893931");
assertEquals("split", sn, parsed);
}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALLogDir.java hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALLogDir.java
new file mode 100644
index 0000000..8e3aa31
--- /dev/null
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALLogDir.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@Category(MediumTests.class)
+public class TestWALLogDir {
+ private static final Log LOG = LogFactory.getLog(TestWALLogDir.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static Configuration conf;
+ private static FileSystem fs;
+ private static FileSystem logFs;
+ static final TableName tableName = TableName.valueOf("TestWALLogDir");
+ private static final byte [] rowName = Bytes.toBytes("row");
+ private static final byte [] family = Bytes.toBytes("column");
+ private static HTableDescriptor htd;
+ private static Path logRootDir;
+ private static Path rootDir;
+
+ @Before
+ public void setUp() throws Exception {
+ cleanup();
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf = TEST_UTIL.getConfiguration();
+ TEST_UTIL.startMiniDFSCluster(1);
+ rootDir = TEST_UTIL.createRootDir();
+ logRootDir = TEST_UTIL.createLogRootDir();
+ fs = FSUtils.getRootDirFileSystem(conf);
+ logFs = FSUtils.getLogRootDirFileSystem(conf);
+ htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor(family));
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ cleanup();
+ TEST_UTIL.shutdownMiniDFSCluster();
+ }
+
+ @Test
+ public void testWALRootDir() throws Exception {
+ FileSystem logFs = logRootDir.getFileSystem(conf);
+ HRegionInfo regionInfo = new HRegionInfo(tableName);
+ HRegion region = HRegion.createHRegion(regionInfo, rootDir, this.conf, htd);
+ assertEquals("Expect 2 file in rootDir, which are regionInfo file and recovered.edits", 2, getWALFiles(fs, rootDir).size());
+ WAL log = region.getWAL();
+ byte [] value = Bytes.toBytes("value");
+ WALEdit edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
+ System.currentTimeMillis(), value));
+ long txid = log.append(htd, regionInfo, getWalKey(System.currentTimeMillis(), regionInfo), edit, true);
+ log.sync(txid);
+ assertEquals("Expect 1 log have been created", 1, getWALFiles(logFs, logRootDir).size());
+ log.rollWriter();
+ //Create 1 more WAL, and put old WAL into oldwal dir
+ assertEquals(1, getWALFiles(logFs, new Path(logRootDir, HConstants.HREGION_OLDLOGDIR_NAME)).size());
+ assertEquals(1, getWALFiles(logFs, new Path(logRootDir, HConstants.HREGION_LOGDIR_NAME)).size());
+ edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
+ System.currentTimeMillis(), value));
+ txid = log.append(htd, regionInfo, getWalKey(System.currentTimeMillis(), regionInfo), edit, true);
+ log.sync(txid);
+ log.rollWriter();
+ log.shutdown();
+
+ assertEquals("Expect 2 logs in oldWALs dir", 2, getWALFiles(logFs, new Path(logRootDir, HConstants.HREGION_OLDLOGDIR_NAME)).size());
+ assertEquals("Expect 1 logs in WALs dir", 1, getWALFiles(logFs, new Path(logRootDir, HConstants.HREGION_LOGDIR_NAME)).size());
+ // No changes in number of files in rootDir
+ assertEquals("Should have no changes in number of files in rootDir, still be 2", 2, getWALFiles(fs, rootDir).size());
+ }
+
+ protected WALKey getWalKey(final long time, HRegionInfo hri) {
+ return new WALKey(hri.getEncodedNameAsBytes(), tableName, time);
+ }
+
+ private List getWALFiles(FileSystem fs, Path dir)
+ throws IOException {
+ List result = new ArrayList();
+ LOG.debug("Scanning " + dir.toString() + " for WAL files");
+
+ FileStatus[] files = fs.listStatus(dir);
+ if (files == null) return Collections.emptyList();
+ for (FileStatus file : files) {
+ if (file.isDirectory()) {
+ // recurse into sub directories
+ result.addAll(getWALFiles(fs, file.getPath()));
+ } else {
+ String name = file.getPath().toString();
+ if (!name.startsWith(".")) {
+ result.add(file);
+ }
+ }
+ }
+ return result;
+ }
+
+ private static void cleanup() throws Exception{
+ logFs.delete(logRootDir, true);
+ fs.delete(rootDir, true);
+ }
+
+}
diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
index b599d1c..66a2069 100644
--- hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
@@ -304,7 +304,9 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
// First set the fs from configs. In case we are on hadoop1
FSUtils.setFsDefault(getConf(), FSUtils.getRootDir(getConf()));
FileSystem fs = FileSystem.get(getConf());
+ FileSystem logFs = FSUtils.getLogRootDirFileSystem(getConf());
LOG.info("FileSystem: " + fs);
+ LOG.info("Log FileSystem: " + logFs);
SpanReceiverHost receiverHost = trace ? SpanReceiverHost.getInstance(getConf()) : null;
final Sampler> sampler = trace ? Sampler.ALWAYS : Sampler.NEVER;
@@ -346,14 +348,14 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
}
if (verify) {
LOG.info("verifying written log entries.");
- Path dir = new Path(FSUtils.getRootDir(getConf()),
+ Path dir = new Path(FSUtils.getLogRootDir(getConf()),
DefaultWALProvider.getWALDirectoryName("wals"));
long editCount = 0;
- FileStatus [] fsss = fs.listStatus(dir);
+ FileStatus [] fsss = logFs.listStatus(dir);
if (fsss.length == 0) throw new IllegalStateException("No WAL found");
for (FileStatus fss: fsss) {
Path p = fss.getPath();
- if (!fs.exists(p)) throw new IllegalStateException(p.toString());
+ if (!logFs.exists(p)) throw new IllegalStateException(p.toString());
editCount += verify(wals, p, verbose);
}
long expected = numIterations * numThreads;