diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index c5ae3f0..67fd5f2 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -269,6 +269,15 @@ public final class HConstants { /** Parameter name for HBase instance root directory */ public static final String HBASE_DIR = "hbase.rootdir"; + /** Parameter name for HBase instance log directory permission*/ + public static final String HBASE_DIR_PERMS = "hbase.rootdir.perms"; + + /** Parameter name for HBase instance log directory */ + public static final String HBASE_LOG_DIR = "hbase.regionserver.hlog.dir"; + + /** Parameter name for HBase instance log directory permission*/ + public static final String HBASE_LOG_DIR_PERMS = "hbase.regionserver.hlog.dir.perms"; + /** Parameter name for HBase client IPC pool type */ public static final String HBASE_CLIENT_IPC_POOL_TYPE = "hbase.client.ipc.pool.type"; diff --git hbase-common/src/main/resources/hbase-default.xml hbase-common/src/main/resources/hbase-default.xml index dbdcc30..598ee8e 100644 --- hbase-common/src/main/resources/hbase-default.xml +++ hbase-common/src/main/resources/hbase-default.xml @@ -1145,6 +1145,13 @@ possible configurations would overwhelm and obscure the important. if it does not match. + hbase.regionserver.hlog.dir.perms + 700 + FS Permissions for the log directory in a secure(kerberos) setup. + When master starts, it creates the logdir with this permissions or sets the permissions + if it does not match. + + hbase.data.umask.enable false Enable, if true, that file permissions should be assigned diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java index 344d496..c2ad71a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java @@ -45,17 +45,17 @@ public class WALLink extends FileLink { */ public WALLink(final Configuration conf, final String serverName, final String logName) throws IOException { - this(FSUtils.getRootDir(conf), serverName, logName); + this(FSUtils.getLogRootDir(conf), serverName, logName); } /** - * @param rootDir Path to the root directory where hbase files are stored + * @param logRootDir Path to the root directory where hbase files are stored * @param serverName Region Server owner of the log * @param logName WAL file name */ - public WALLink(final Path rootDir, final String serverName, final String logName) { - final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); - final Path logDir = new Path(new Path(rootDir, HConstants.HREGION_LOGDIR_NAME), serverName); + public WALLink(final Path logRootDir, final String serverName, final String logName) { + final Path oldLogDir = new Path(logRootDir, HConstants.HREGION_OLDLOGDIR_NAME); + final Path logDir = new Path(new Path(logRootDir, HConstants.HREGION_LOGDIR_NAME), serverName); setLocations(new Path(logDir, logName), new Path(oldLogDir, logName)); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 29eacf0..ae4ff8b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -583,15 +583,15 @@ public class AssignmentManager extends ZooKeeperListener { Set queuedDeadServers = serverManager.getRequeuedDeadServers().keySet(); if (!queuedDeadServers.isEmpty()) { Configuration conf = server.getConfiguration(); - Path rootdir = FSUtils.getRootDir(conf); - FileSystem fs = rootdir.getFileSystem(conf); + Path logRootdir = FSUtils.getLogRootDir(conf); + FileSystem logFs = FSUtils.getLogRootDirFileSystem(conf); for (ServerName serverName: queuedDeadServers) { // In the case of a clean exit, the shutdown handler would have presplit any WALs and // removed empty directories. - Path logDir = new Path(rootdir, + Path logDir = new Path(logRootdir, DefaultWALProvider.getWALDirectoryName(serverName.toString())); Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT); - if (fs.exists(logDir) || fs.exists(splitDir)) { + if (logFs.exists(logDir) || logFs.exists(splitDir)) { LOG.debug("Found queued dead server " + serverName); failover = true; break; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 59a9faa..a777fb1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1103,7 +1103,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000); this.logCleaner = new LogCleaner(cleanerInterval, - this, conf, getMasterFileSystem().getFileSystem(), + this, conf, getMasterFileSystem().getOldLogDir().getFileSystem(conf), getMasterFileSystem().getOldLogDir()); getChoreService().scheduleChore(logCleaner); @@ -1169,10 +1169,10 @@ public class HMaster extends HRegionServer implements MasterServices, Server { private void startProcedureExecutor() throws IOException { final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); - final Path logDir = new Path(fileSystemManager.getRootDir(), + final Path logDir = new Path(FSUtils.getLogRootDir(this.conf), MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); - procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir, + procedureStore = new WALProcedureStore(conf, logDir.getFileSystem(conf), logDir, new MasterProcedureEnv.WALStoreLeaseRecovery(this)); procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore, diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index dc43d8c..60fde9b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -78,12 +78,15 @@ public class MasterFileSystem { private ClusterId clusterId; // Keep around for convenience. private final FileSystem fs; + private final FileSystem logFs; // Is the fileystem ok? - private volatile boolean fsOk = true; + private volatile boolean logFsOk = true; // The Path to the old logs dir private final Path oldLogDir; // root hbase directory on the FS private final Path rootdir; + // logroot hbase directory on the FS + private final Path logRootDir; // hbase temp directory used for table construction and deletion private final Path tempdir; // create the split log lock @@ -120,9 +123,12 @@ public class MasterFileSystem { // Cover both bases, the old way of setting default fs and the new. // We're supposed to run on 0.20 and 0.21 anyways. this.fs = this.rootdir.getFileSystem(conf); + this.logRootDir = FSUtils.getLogRootDir(conf); + this.logFs = FSUtils.getLogRootDirFileSystem(conf); FSUtils.setFsDefault(conf, new Path(this.fs.getUri())); // make sure the fs has the same conf fs.setConf(conf); + logFs.setConf(conf); // setup the filesystem variable // set up the archived logs path this.oldLogDir = createInitialFileSystemLayout(); @@ -150,16 +156,19 @@ public class MasterFileSystem { */ private Path createInitialFileSystemLayout() throws IOException { // check if the root directory exists - checkRootDir(this.rootdir, conf, this.fs); - + checkRootDir(this.rootdir, conf, this.fs, HConstants.HBASE_DIR, HConstants.HBASE_DIR_PERMS); + // if the log directory is different from root, check if it exists + if (!this.logRootDir.equals(this.rootdir)) { + checkRootDir(this.logRootDir, conf, this.logFs, HConstants.HBASE_LOG_DIR, HConstants.HBASE_LOG_DIR_PERMS); + } // check if temp directory exists and clean it checkTempDir(this.tempdir, conf, this.fs); - Path oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME); + Path oldLogDir = new Path(this.logRootDir, HConstants.HREGION_OLDLOGDIR_NAME); // Make sure the region servers can archive their old logs - if(!this.fs.exists(oldLogDir)) { - this.fs.mkdirs(oldLogDir); + if(!this.logFs.exists(oldLogDir)) { + this.logFs.mkdirs(oldLogDir); } return oldLogDir; @@ -169,6 +178,7 @@ public class MasterFileSystem { return this.fs; } + public FileSystem getLogFileSystem() { return this.logFs; } /** * Get the directory where old logs go * @return the dir @@ -183,16 +193,16 @@ public class MasterFileSystem { * @return false if file system is not available */ public boolean checkFileSystem() { - if (this.fsOk) { + if (this.logFsOk) { try { - FSUtils.checkFileSystemAvailable(this.fs); + FSUtils.checkFileSystemAvailable(this.logFs); FSUtils.checkDfsSafeMode(this.conf); } catch (IOException e) { master.abort("Shutting down HBase cluster: file system not available", e); - this.fsOk = false; + this.logFsOk = false; } } - return this.fsOk; + return this.logFsOk; } /** @@ -203,6 +213,13 @@ public class MasterFileSystem { } /** + * @return HBase root log dir. + */ + public Path getLogRootDir() { + return this.logRootDir; + } + + /** * @return HBase temp dir. */ public Path getTempDir() { @@ -225,7 +242,7 @@ public class MasterFileSystem { WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT); Set serverNames = new HashSet(); - Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME); + Path logsDirPath = new Path(this.logRootDir, HConstants.HREGION_LOGDIR_NAME); do { if (master.isStopped()) { @@ -233,8 +250,8 @@ public class MasterFileSystem { break; } try { - if (!this.fs.exists(logsDirPath)) return serverNames; - FileStatus[] logFolders = FSUtils.listStatus(this.fs, logsDirPath, null); + if (!this.logFs.exists(logsDirPath)) return serverNames; + FileStatus[] logFolders = FSUtils.listStatus(this.logFs, logsDirPath, null); // Get online servers after getting log folders to avoid log folder deletion of newly // checked in region servers . see HBASE-5916 Set onlineServers = ((HMaster) master).getServerManager().getOnlineServers() @@ -245,7 +262,7 @@ public class MasterFileSystem { return serverNames; } for (FileStatus status : logFolders) { - FileStatus[] curLogFiles = FSUtils.listStatus(this.fs, status.getPath(), null); + FileStatus[] curLogFiles = FSUtils.listStatus(this.logFs, status.getPath(), null); if (curLogFiles == null || curLogFiles.length == 0) { // Empty log folder. No recovery needed continue; @@ -326,17 +343,17 @@ public class MasterFileSystem { } try { for (ServerName serverName : serverNames) { - Path logDir = new Path(this.rootdir, + Path logDir = new Path(this.logRootDir, DefaultWALProvider.getWALDirectoryName(serverName.toString())); Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT); // Rename the directory so a rogue RS doesn't create more WALs - if (fs.exists(logDir)) { - if (!this.fs.rename(logDir, splitDir)) { + if (logFs.exists(logDir)) { + if (!this.logFs.rename(logDir, splitDir)) { throw new IOException("Failed fs.rename for log split: " + logDir); } logDir = splitDir; LOG.debug("Renamed region directory: " + splitDir); - } else if (!fs.exists(splitDir)) { + } else if (!logFs.exists(splitDir)) { LOG.info("Log dir for server " + serverName + " does not exist"); continue; } @@ -418,19 +435,19 @@ public class MasterFileSystem { */ @SuppressWarnings("deprecation") private Path checkRootDir(final Path rd, final Configuration c, - final FileSystem fs) + final FileSystem fs, final String dirConfKey, final String dirPermsConfName) throws IOException { // If FS is in safe mode wait till out of it. FSUtils.waitOnSafeMode(c, c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000)); boolean isSecurityEnabled = "kerberos".equalsIgnoreCase(c.get("hbase.security.authentication")); - FsPermission rootDirPerms = new FsPermission(c.get("hbase.rootdir.perms", "700")); + FsPermission dirPerms = new FsPermission(c.get(dirPermsConfName, "700")); - // Filesystem is good. Go ahead and check for hbase.rootdir. + // Filesystem is good. Go ahead and check for root directory. try { if (!fs.exists(rd)) { if (isSecurityEnabled) { - fs.mkdirs(rd, rootDirPerms); + fs.mkdirs(rd, dirPerms); } else { fs.mkdirs(rd); } @@ -448,15 +465,15 @@ public class MasterFileSystem { if (!fs.isDirectory(rd)) { throw new IllegalArgumentException(rd.toString() + " is not a directory"); } - if (isSecurityEnabled && !rootDirPerms.equals(fs.getFileStatus(rd).getPermission())) { + if (isSecurityEnabled && !dirPerms.equals(fs.getFileStatus(rd).getPermission())) { // check whether the permission match LOG.warn("Found rootdir permissions NOT matching expected \"hbase.rootdir.perms\" for " - + "rootdir=" + rd.toString() + " permissions=" + fs.getFileStatus(rd).getPermission() - + " and \"hbase.rootdir.perms\" configured as " - + c.get("hbase.rootdir.perms", "700") + ". Automatically setting the permissions. You" - + " can change the permissions by setting \"hbase.rootdir.perms\" in hbase-site.xml " + + "rd=" + rd.toString() + " permissions=" + fs.getFileStatus(rd).getPermission() + + " and \"" + dirPermsConfName + "\" configured as " + + c.get(dirPermsConfName, "700") + ". Automatically setting the permissions. You" + + " can change the permissions by setting \"" + dirPermsConfName + "\" in hbase-site.xml " + "and restarting the master"); - fs.setPermission(rd, rootDirPerms); + fs.setPermission(rd, dirPerms); } // as above FSUtils.checkVersion(fs, rd, true, c.getInt(HConstants.THREAD_WAKE_FREQUENCY, @@ -464,39 +481,40 @@ public class MasterFileSystem { HConstants.DEFAULT_VERSION_FILE_WRITE_ATTEMPTS)); } } catch (DeserializationException de) { - LOG.fatal("Please fix invalid configuration for " + HConstants.HBASE_DIR, de); + LOG.fatal("Please fix invalid configuration for " + dirConfKey, de); IOException ioe = new IOException(); ioe.initCause(de); throw ioe; } catch (IllegalArgumentException iae) { LOG.fatal("Please fix invalid configuration for " - + HConstants.HBASE_DIR + " " + rd.toString(), iae); + + dirConfKey + " " + rd.toString(), iae); throw iae; } - // Make sure cluster ID exists - if (!FSUtils.checkClusterIdExists(fs, rd, c.getInt( - HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000))) { - FSUtils.setClusterId(fs, rd, new ClusterId(), c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000)); - } - clusterId = FSUtils.getClusterId(fs, rd); - - // Make sure the meta region directory exists! - if (!FSUtils.metaRegionExists(fs, rd)) { - bootstrap(rd, c); - } else { - // Migrate table descriptor files if necessary - org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir - .migrateFSTableDescriptorsIfNecessary(fs, rd); - } + if(dirConfKey.equals(HConstants.HBASE_DIR)) { + // Make sure cluster ID exists + if (!FSUtils.checkClusterIdExists(fs, rd, c.getInt( + HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000))) { + FSUtils.setClusterId(fs, rd, new ClusterId(), c.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000)); + } + clusterId = FSUtils.getClusterId(fs, rd); - // Create tableinfo-s for hbase:meta if not already there. + // Make sure the meta region directory exists! + if (!FSUtils.metaRegionExists(fs, rd)) { + bootstrap(rd, c); + } else { + // Migrate table descriptor files if necessary + org.apache.hadoop.hbase.util.FSTableDescriptorMigrationToSubdir + .migrateFSTableDescriptorsIfNecessary(fs, rd); + } - // meta table is a system table, so descriptors are predefined, - // we should get them from registry. - FSTableDescriptors fsd = new FSTableDescriptors(c, fs, rd); - fsd.createTableDescriptor( - new HTableDescriptor(fsd.get(TableName.META_TABLE_NAME))); + // Create tableinfo-s for hbase:meta if not already there. + // meta table is a system table, so descriptors are predefined, + // we should get them from registry. + FSTableDescriptors fsd = new FSTableDescriptors(c, fs, rd); + fsd.createTableDescriptor( + new HTableDescriptor(fsd.get(TableName.META_TABLE_NAME))); + } return rd; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 852b6c4..47c2b7a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -266,7 +266,7 @@ public class SplitLogManager { // recover-lease is done. totalSize will be under in most cases and the // metrics that it drives will also be under-reported. totalSize += lf.getLen(); - String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf); + String pathToLog = FSUtils.removeLogRootPath(lf.getPath(), conf); if (!enqueueSplitTask(pathToLog, batch)) { throw new IOException("duplicate log split scheduled for " + lf.getPath()); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 9fa0a69..b9d5fe9 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -285,6 +285,7 @@ public class HRegionServer extends HasThread implements // If false, the file system has become unavailable protected volatile boolean fsOk; protected HFileSystem fs; + protected HFileSystem logFs; // Set when a report to the master comes back with a message asking us to // shutdown. Also set by call to stop when debugging or running unit tests @@ -306,6 +307,7 @@ public class HRegionServer extends HasThread implements protected final Configuration conf; private Path rootDir; + private Path logRootDir; protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -601,13 +603,16 @@ public class HRegionServer extends HasThread implements } private void initializeFileSystem() throws IOException { + // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase + // checksum verification enabled, then automatically switch off hdfs checksum verification. + boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); + FSUtils.setFsDefault(this.conf, FSUtils.getLogRootDir(this.conf)); + this.logFs = new HFileSystem(this.conf, useHBaseChecksum); + this.logRootDir = FSUtils.getLogRootDir(this.conf); // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else // underlying hadoop hdfs accessors will be going against wrong filesystem // (unless all is set to defaults). FSUtils.setFsDefault(this.conf, FSUtils.getRootDir(this.conf)); - // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase - // checksum verification enabled, then automatically switch off hdfs checksum verification. - boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); this.fs = new HFileSystem(this.conf, useHBaseChecksum); this.rootDir = FSUtils.getRootDir(this.conf); this.tableDescriptors = new FSTableDescriptors( @@ -1625,19 +1630,19 @@ public class HRegionServer extends HasThread implements */ private WALFactory setupWALAndReplication() throws IOException { // TODO Replication make assumptions here based on the default filesystem impl - final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + final Path oldLogDir = new Path(logRootDir, HConstants.HREGION_OLDLOGDIR_NAME); final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString()); - Path logdir = new Path(rootDir, logName); - if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir); - if (this.fs.exists(logdir)) { + Path logDir = new Path(logRootDir, logName); + if (LOG.isDebugEnabled()) LOG.debug("logDir=" + logDir); + if (this.logFs.exists(logDir)) { throw new RegionServerRunningException("Region server has already " + "created directory at " + this.serverName.toString()); } // Instantiate replication manager if replication enabled. Pass it the // log directories. - createNewReplicationInstance(conf, this, this.fs, logdir, oldLogDir); + createNewReplicationInstance(conf, this, this.logFs, logDir, oldLogDir); // listeners the wal factory will add to wals it creates. final List listeners = new ArrayList(); @@ -2537,6 +2542,20 @@ public class HRegionServer extends HasThread implements return fs; } + /** + * @return Return the logRootDir. + */ + protected Path getLogRootDir() { + return logRootDir; + } + + /** + * @return Return the logFs. + */ + public FileSystem getLogFileSystem() { + return logFs; + } + @Override public String toString() { return getServerName().toString(); @@ -2603,7 +2622,7 @@ public class HRegionServer extends HasThread implements * Load the replication service objects, if any */ static private void createNewReplicationInstance(Configuration conf, - HRegionServer server, FileSystem fs, Path logDir, Path oldLogDir) throws IOException{ + HRegionServer server, FileSystem logFs, Path logDir, Path oldLogDir) throws IOException{ // If replication is not enabled, then return immediately. if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, @@ -2624,21 +2643,21 @@ public class HRegionServer extends HasThread implements if (sourceClassname.equals(sinkClassname)) { server.replicationSourceHandler = (ReplicationSourceService) newReplicationInstance(sourceClassname, - conf, server, fs, logDir, oldLogDir); + conf, server, logFs, logDir, oldLogDir); server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler; } else { server.replicationSourceHandler = (ReplicationSourceService) newReplicationInstance(sourceClassname, - conf, server, fs, logDir, oldLogDir); + conf, server, logFs, logDir, oldLogDir); server.replicationSinkHandler = (ReplicationSinkService) newReplicationInstance(sinkClassname, - conf, server, fs, logDir, oldLogDir); + conf, server, logFs, logDir, oldLogDir); } } static private ReplicationService newReplicationInstance(String classname, - Configuration conf, HRegionServer server, FileSystem fs, Path logDir, + Configuration conf, HRegionServer server, FileSystem logFs, Path logDir, Path oldLogDir) throws IOException{ Class clazz = null; @@ -2652,7 +2671,7 @@ public class HRegionServer extends HasThread implements // create an instance of the replication object. ReplicationService service = (ReplicationService) ReflectionUtils.newInstance(clazz, conf); - service.initialize(server, fs, logDir, oldLogDir); + service.initialize(server, logFs, logDir, oldLogDir); return service; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index eeffa8b..5324eae 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -88,11 +88,11 @@ public class SplitLogWorker implements Runnable { this(server, conf, server, new TaskExecutor() { @Override public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) { - Path rootdir; + Path logDir; FileSystem fs; try { - rootdir = FSUtils.getRootDir(conf); - fs = rootdir.getFileSystem(conf); + logDir = FSUtils.getLogRootDir(conf); + fs = FSUtils.getLogRootDirFileSystem(conf); } catch (IOException e) { LOG.warn("could not find root dir or fs", e); return Status.RESIGNED; @@ -101,7 +101,7 @@ public class SplitLogWorker implements Runnable { // interrupted or has encountered a transient error and when it has // encountered a bad non-retry-able persistent error. try { - if (!WALSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)), + if (!WALSplitter.splitLogFile(logDir, fs.getFileStatus(new Path(logDir, filename)), fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager(), mode, factory)) { return Status.PREEMPTED; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 459d598..6a14929 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1573,7 +1573,7 @@ public class FSHLog implements WAL { private static void split(final Configuration conf, final Path p) throws IOException { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FSUtils.getLogRootDirFileSystem(conf); if (!fs.exists(p)) { throw new FileNotFoundException(p.toString()); } @@ -1581,7 +1581,7 @@ public class FSHLog implements WAL { throw new IOException(p + " is not a directory"); } - final Path baseDir = FSUtils.getRootDir(conf); + final Path baseDir = FSUtils.getLogRootDir(conf); final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index be16d01..85118b2 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -750,9 +750,9 @@ public class ReplicationSource extends Thread // to look at) List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); LOG.info("NB dead servers : " + deadRegionServers.size()); - final Path rootDir = FSUtils.getRootDir(conf); + final Path logDir = FSUtils.getLogRootDir(conf); for (String curDeadServerName : deadRegionServers) { - final Path deadRsDirectory = new Path(rootDir, + final Path deadRsDirectory = new Path(logDir, DefaultWALProvider.getWALDirectoryName(curDeadServerName)); Path[] locs = new Path[] { new Path(deadRsDirectory, currentPath.getName()), diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index 8d38b09..64bed53 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -76,7 +76,7 @@ public class ReplicationSyncUp extends Configured implements Tool { Replication replication; ReplicationSourceManager manager; FileSystem fs; - Path oldLogDir, logDir, rootDir; + Path oldLogDir, logDir, rootLogDir; ZooKeeperWatcher zkw; Abortable abortable = new Abortable() { @@ -94,10 +94,10 @@ public class ReplicationSyncUp extends Configured implements Tool { new ZooKeeperWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable, true); - rootDir = FSUtils.getRootDir(conf); - fs = FileSystem.get(conf); - oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); - logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + rootLogDir = FSUtils.getRootDir(conf); + fs = FSUtils.getLogRootDirFileSystem(conf); + oldLogDir = new Path(rootLogDir, HConstants.HREGION_OLDLOGDIR_NAME); + logDir = new Path(rootLogDir, HConstants.HREGION_LOGDIR_NAME); System.out.println("Start Replication Server start"); replication = new Replication(new DummyServer(zkw), fs, logDir, oldLogDir); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 8717948..764c8e1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -83,6 +83,9 @@ import org.apache.hadoop.util.StringUtils; import com.google.common.primitives.Ints; +import static org.apache.hadoop.hbase.HConstants.HBASE_DIR; +import static org.apache.hadoop.hbase.HConstants.HBASE_LOG_DIR; + /** * Utility methods for interacting with the underlying file system. */ @@ -932,22 +935,22 @@ public abstract class FSUtils { return root; } catch (URISyntaxException e) { IOException io = new IOException("Root directory path is not a valid " + - "URI -- check your " + HConstants.HBASE_DIR + " configuration"); + "URI -- check your " + HBASE_DIR + " configuration"); io.initCause(e); throw io; } } /** - * Checks for the presence of the root path (using the provided conf object) in the given path. If + * Checks for the presence of the WAL log root path (using the provided conf object) in the given path. If * it exists, this method removes it and returns the String representation of remaining relative path. * @param path * @param conf * @return String representation of the remaining relative path * @throws IOException */ - public static String removeRootPath(Path path, final Configuration conf) throws IOException { - Path root = FSUtils.getRootDir(conf); + public static String removeLogRootPath(Path path, final Configuration conf) throws IOException { + Path root = FSUtils.getLogRootDir(conf); String pathStr = path.toString(); // check that the path is absolute... it has the root path in it. if (!pathStr.startsWith(root.toString())) return pathStr; @@ -994,18 +997,44 @@ public abstract class FSUtils { /** * @param c configuration - * @return Path to hbase root directory: i.e. hbase.rootdir from + * @return {@link Path} to hbase root directory: i.e. {@value Hconstants#HBASE_DIR} from * configuration as a qualified Path. * @throws IOException e */ public static Path getRootDir(final Configuration c) throws IOException { - Path p = new Path(c.get(HConstants.HBASE_DIR)); + Path p = new Path(c.get(HBASE_DIR)); FileSystem fs = p.getFileSystem(c); return p.makeQualified(fs); } public static void setRootDir(final Configuration c, final Path root) throws IOException { - c.set(HConstants.HBASE_DIR, root.toString()); + c.set(HBASE_DIR, root.toString()); + } + + /** + * @param c configuration + * @return {@link Path} to hbase log root directory: i.e. {@value Hconstants#HBASE_LOG_DIR} from + * configuration as a qualified Path. Defaults to {@value Hconstants#HBASE_DIR} + * @throws IOException e + */ + public static Path getLogRootDir(final Configuration c) throws IOException { + Path p = new Path(c.get(HBASE_LOG_DIR, c.get(HBASE_DIR))); + FileSystem fs = p.getFileSystem(c); + return p.makeQualified(fs); + } + + public static void setLogRootDir(final Configuration c, final Path root) throws IOException { + c.set(HBASE_LOG_DIR, root.toString()); + } + + public static FileSystem getLogRootDirFileSystem(final Configuration c) throws IOException { + Path p = getLogRootDir(c); + return p.getFileSystem(c); + } + + public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException { + Path p = getRootDir(c); + return p.getFileSystem(c); } public static void setFsDefault(final Configuration c, final Path root) throws IOException { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java index c1fa079..c5de395 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java @@ -47,8 +47,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; * A WAL Provider that returns a single thread safe WAL that writes to HDFS. * By default, this implementation picks a directory in HDFS based on a combination of *
    - *
  • the HBase root directory - *
  • HConstants.HREGION_LOGDIR_NAME + *
  • the HBase root log directory (can be changed via {@value HConstants#HBASE_LOG_DIR}) + *
  • {@value HConstants#HREGION_LOGDIR_NAME} *
  • the given factory's factoryId (usually identifying the regionserver by host:port) *
* It also uses the providerId to diffentiate among files. @@ -94,7 +94,7 @@ public class DefaultWALProvider implements WALProvider { providerId = DEFAULT_PROVIDER_ID; } final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; - log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf), + log = new FSHLog(FSUtils.getLogRootDirFileSystem(conf), FSUtils.getLogRootDir(conf), getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); } @@ -266,14 +266,14 @@ public class DefaultWALProvider implements WALProvider { throw new IllegalArgumentException("parameter conf must be set"); } - final String rootDir = conf.get(HConstants.HBASE_DIR); - if (rootDir == null || rootDir.isEmpty()) { - throw new IllegalArgumentException(HConstants.HBASE_DIR - + " key not found in conf."); + final String logDir = conf.get(HConstants.HBASE_LOG_DIR, conf.get(HConstants.HBASE_DIR)); + if (logDir == null || logDir.isEmpty()) { + throw new IllegalArgumentException(String.format("%s key not found in conf.", + HConstants.HBASE_DIR)); } - final StringBuilder startPathSB = new StringBuilder(rootDir); - if (!rootDir.endsWith("/")) + final StringBuilder startPathSB = new StringBuilder(logDir); + if (!logDir.endsWith("/")) startPathSB.append('/'); startPathSB.append(HConstants.HREGION_LOGDIR_NAME); if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 33785a6..f82ff1f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -62,7 +62,7 @@ class DisabledWALProvider implements WALProvider { if (null == providerId) { providerId = "defaultDisabled"; } - disabled = new DisabledWAL(new Path(FSUtils.getRootDir(conf), providerId), conf, null); + disabled = new DisabledWAL(new Path(FSUtils.getLogRootDir(conf), providerId), conf, null); } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 05d00de..6f035be 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -240,17 +240,17 @@ public class WALSplitter { // log splitting. Used by tools and unit tests. It should be package private. // It is public only because UpgradeTo96 and TestWALObserver are in different packages, // which uses this method to do log splitting. - public static List split(Path rootDir, Path logDir, Path oldLogDir, + public static List split(Path logRootDir, Path logDir, Path oldLogDir, FileSystem fs, Configuration conf, final WALFactory factory) throws IOException { final FileStatus[] logfiles = SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null); List splits = new ArrayList(); if (logfiles != null && logfiles.length > 0) { for (FileStatus logfile: logfiles) { - WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null, + WALSplitter s = new WALSplitter(factory, conf, logRootDir, fs, null, null, RecoveryMode.LOG_SPLITTING); if (s.splitLogFile(logfile, null)) { - finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); + finishSplitLogFile(logRootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { splits.addAll(s.outputSink.splits); } @@ -438,7 +438,7 @@ public class WALSplitter { */ public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException { - Path rootdir = FSUtils.getRootDir(conf); + Path rootdir = FSUtils.getLogRootDir(conf); Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); Path logPath; if (FSUtils.isStartingWithPath(rootdir, logfile)) { @@ -481,7 +481,7 @@ public class WALSplitter { final List corruptedLogs, final List processedLogs, final Path oldLogDir, final FileSystem fs, final Configuration conf) throws IOException { - final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get( + final Path corruptDir = new Path(FSUtils.getLogRootDir(conf), conf.get( "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME)); if (!fs.mkdirs(corruptDir)) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index e15a9f6..94e09b4 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -746,7 +746,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } private MiniZooKeeperCluster startMiniZKCluster(final File dir) - throws Exception { + throws Exception { return startMiniZKCluster(dir, 1, null); } @@ -806,6 +806,16 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return startMiniCluster(1, 1); } + /** + * Start up a minicluster of hbase, dfs, and zookeeper where WAL's logDir is created separately. + * @throws Exception + * @return Mini hbase cluster instance created. + * @see {@link #shutdownMiniDFSCluster()} + */ + public MiniHBaseCluster startMiniClusterWithLogDir() throws Exception { + return startMiniClusterWithLogDir(1, 1); + } + /** * Start up a minicluster of hbase, dfs, and zookeeper. * Set the create flag to create root or data directory path or not @@ -815,7 +825,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @see {@link #shutdownMiniDFSCluster()} */ public MiniHBaseCluster startMiniCluster(final int numSlaves, boolean create) - throws Exception { + throws Exception { return startMiniCluster(1, numSlaves, create); } @@ -833,10 +843,15 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @return Mini hbase cluster instance created. */ public MiniHBaseCluster startMiniCluster(final int numSlaves) - throws Exception { + throws Exception { return startMiniCluster(1, numSlaves, false); } + public MiniHBaseCluster startMiniClusterWithLogDir(final int numSlaves) + throws Exception { + return startMiniClusterWithLogDir(1, numSlaves, false); + } + /** * Start minicluster. Whether to create a new root or data dir path even if such a path * has been created earlier is decided based on flag create @@ -846,10 +861,17 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { */ public MiniHBaseCluster startMiniCluster(final int numMasters, final int numSlaves, boolean create) - throws Exception { - return startMiniCluster(numMasters, numSlaves, null, create); + throws Exception { + return startMiniCluster(numMasters, numSlaves, null, create); } + public MiniHBaseCluster startMiniClusterWithLogDir(final int numMasters, + final int numSlaves, boolean create) + throws Exception { + return startMiniClusterWithLogDir(numMasters, numSlaves, null, create); + } + + /** * start minicluster * @throws Exception @@ -857,11 +879,17 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * @return Mini hbase cluster instance created. */ public MiniHBaseCluster startMiniCluster(final int numMasters, - final int numSlaves) - throws Exception { + final int numSlaves) + throws Exception { return startMiniCluster(numMasters, numSlaves, null, false); } + public MiniHBaseCluster startMiniClusterWithLogDir(final int numMasters, + final int numSlaves) + throws Exception { + return startMiniClusterWithLogDir(numMasters, numSlaves, null, false); + } + public MiniHBaseCluster startMiniCluster(final int numMasters, final int numSlaves, final String[] dataNodeHosts, boolean create) throws Exception { @@ -869,6 +897,13 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { null, null, create); } + public MiniHBaseCluster startMiniClusterWithLogDir(final int numMasters, + final int numSlaves, final String[] dataNodeHosts, boolean create) + throws Exception { + return startMiniClusterWithLogDir(numMasters, numSlaves, numSlaves, dataNodeHosts, + null, null, create); + } + /** * Start up a minicluster of hbase, optionally dfs, and zookeeper. * Modifies Configuration. Homes the cluster data directory under a random @@ -994,12 +1029,52 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { // Start the MiniHBaseCluster return startMiniHBaseCluster(numMasters, numSlaves, masterClass, - regionserverClass, create); + regionserverClass, create, false); + } + + /** + * Same as {@link #startMiniClusterWithLogDir(int, int, String[], Class, Class)}, but with custom + * number of datanodes. + * @param numDataNodes Number of data nodes. + * @param create Set this flag to create a new + * root or data directory path or not (will overwrite if exists already). + */ + public MiniHBaseCluster startMiniClusterWithLogDir(final int numMasters, + final int numSlaves, int numDataNodes, final String[] dataNodeHosts, + Class masterClass, + Class regionserverClass, + boolean create) + throws Exception { + if (dataNodeHosts != null && dataNodeHosts.length != 0) { + numDataNodes = dataNodeHosts.length; + } + + LOG.info("Starting up minicluster with " + numMasters + " master(s) and " + + numSlaves + " regionserver(s) and " + numDataNodes + " datanode(s)"); + // If we already started a cluster, fail. + if (miniClusterRunning) { + throw new IllegalStateException("A mini-cluster is already running"); + } + miniClusterRunning = true; + setupClusterTestDir(); + System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestDir.getPath()); + // Bring up mini dfs cluster. This spews a bunch of warnings about missing + // scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. + if(this.dfsCluster == null) { + dfsCluster = startMiniDFSCluster(numDataNodes, dataNodeHosts); + } + // Start up a zk cluster. + if (this.zkCluster == null) { + startMiniZKCluster(clusterTestDir); + } + // Start the MiniHBaseCluster + return startMiniHBaseCluster(numMasters, numSlaves, masterClass, + regionserverClass, create, true); } public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves) throws IOException, InterruptedException{ - return startMiniHBaseCluster(numMasters, numSlaves, null, null, false); + return startMiniHBaseCluster(numMasters, numSlaves, null, null, false, false); } /** @@ -1018,11 +1093,13 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves, Class masterClass, Class regionserverClass, - boolean create) - throws IOException, InterruptedException { + boolean create, boolean withLogDir) + throws IOException, InterruptedException { // Now do the mini hbase cluster. Set the hbase.rootdir in config. createRootDir(create); - + if (withLogDir) { + createLogRootDir(create); + } // These settings will make the server waits until this exact number of // regions servers are connected. if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { @@ -1205,7 +1282,39 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public Path createRootDir() throws IOException { return createRootDir(false); } - + /** + * Creates a hbase logdir in the user's home directory. Also creates hbase + * version file. Normally you won't make use of this method. Root hbaselogdir + * is created for you as part of mini cluster startup. You'd only use this + * method if you were doing manual operation. + * + * @param create This flag decides whether to get a new + * root or data directory path or not, if it has been fetched already. + * Note : Directory will be made irrespective of whether path has been fetched or not. + * If directory already exists, it will be overwritten + * @return Fully qualified path to hbase root dir + * @throws IOException + */ + public Path createLogRootDir(boolean create) throws IOException { + FileSystem fs = FileSystem.get(this.conf); + String randomStr = UUID.randomUUID().toString(); + Path logBase = new Path(getDefaultRootDirPath(create).getParent(), randomStr); + Path hbaseLogRootdir = new Path(logBase, "logRootDir"); + FSUtils.setLogRootDir(this.conf, hbaseLogRootdir); + fs.mkdirs(hbaseLogRootdir); + FSUtils.setVersion(fs, hbaseLogRootdir); + return hbaseLogRootdir; + } + /** + * Same as {@link HBaseTestingUtility#createLogRootDir(boolean create)} + * except that create flag is false. + * + * @return Fully qualified path to hbase root dir + * @throws IOException + */ + public Path createLogRootDir() throws IOException { + return createLogRootDir(false); + } private void setHBaseFsTmpDir() throws IOException { String hbaseFsTmpDirInString = this.conf.get("hbase.fs.tmp.dir"); @@ -1782,12 +1891,14 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { /** * Create an unmanaged WAL. Be sure to close it when you're through. */ - public static WAL createWal(final Configuration conf, final Path rootDir, final HRegionInfo hri) + public static WAL createWal(final Configuration conf, final Path rootDir, final Path logRootDir, + final HRegionInfo hri) throws IOException { // The WAL subsystem will use the default rootDir rather than the passed in rootDir // unless I pass along via the conf. Configuration confForWAL = new Configuration(conf); confForWAL.set(HConstants.HBASE_DIR, rootDir.toString()); + confForWAL.set(HConstants.HBASE_LOG_DIR, logRootDir.toString()); return (new WALFactory(confForWAL, Collections.singletonList(new MetricsWAL()), "hregion-" + RandomStringUtils.randomNumeric(8))). @@ -1799,8 +1910,17 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources. */ public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir, + final Path logRootDir, final Configuration conf, final HTableDescriptor htd) throws IOException { + return createRegionAndWAL(info, rootDir, logRootDir, conf, htd, true); + } + + /** + * Create a region with a WAL under the rootDir. Be sure to call + * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources. + */ + public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir, final Configuration conf, final HTableDescriptor htd) throws IOException { - return createRegionAndWAL(info, rootDir, conf, htd, true); + return createRegionAndWAL(info, rootDir, rootDir, conf, htd, true); } /** @@ -1808,9 +1928,9 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources. */ public static HRegion createRegionAndWAL(final HRegionInfo info, final Path rootDir, - final Configuration conf, final HTableDescriptor htd, boolean initialize) + final Path logRootDir, final Configuration conf, final HTableDescriptor htd, boolean initialize) throws IOException { - WAL wal = createWal(conf, rootDir, info); + WAL wal = createWal(conf, rootDir, logRootDir, info); return HRegion.createHRegion(info, rootDir, conf, htd, wal, initialize); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index a771c21..b9ddc97 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; -import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; +import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoadBase; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; @@ -324,7 +324,7 @@ public class TestReplicaWithCluster { final List> famPaths = new ArrayList>(); for (HColumnDescriptor col : hdt.getColumnFamilies()) { Path hfile = new Path(dir, col.getNameAsString()); - TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), + TestHRegionServerBulkLoadBase.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual, val, numRows); famPaths.add(new Pair(col.getName(), hfile.toString())); } @@ -334,7 +334,7 @@ public class TestReplicaWithCluster { @SuppressWarnings("deprecation") final HConnection conn = HTU.getHBaseAdmin().getConnection(); RegionServerCallable callable = new RegionServerCallable( - conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) { + conn, hdt.getTableName(), TestHRegionServerBulkLoadBase.rowkey(0)) { @Override public Void call(int timeout) throws Exception { LOG.debug("Going to connect to server " + getLocation() + " for row " @@ -353,7 +353,7 @@ public class TestReplicaWithCluster { // verify we can read them from the primary LOG.debug("Verifying data load"); for (int i = 0; i < numRows; i++) { - byte[] row = TestHRegionServerBulkLoad.rowkey(i); + byte[] row = TestHRegionServerBulkLoadBase.rowkey(i); Get g = new Get(row); Result r = table.get(g); Assert.assertFalse(r.isStale()); @@ -364,7 +364,7 @@ public class TestReplicaWithCluster { try { SlowMeCopro.cdl.set(new CountDownLatch(1)); for (int i = 0; i < numRows; i++) { - byte[] row = TestHRegionServerBulkLoad.rowkey(i); + byte[] row = TestHRegionServerBulkLoadBase.rowkey(i); Get g = new Get(row); g.setConsistency(Consistency.TIMELINE); Result r = table.get(g); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index 256c0eb..fcb69f6 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -97,6 +97,7 @@ public class TestWALObserver { private FileSystem fs; private Path dir; private Path hbaseRootDir; + private Path hbaseLogRootDir; private String logName; private Path oldLogDir; private Path logDir; @@ -115,8 +116,11 @@ public class TestWALObserver { TEST_UTIL.startMiniCluster(1); Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem() .makeQualified(new Path("/hbase")); + Path hbaseLogRootDir = TEST_UTIL.getDFSCluster().getFileSystem() + .makeQualified(new Path("/hbaseLog")); LOG.info("hbase.rootdir=" + hbaseRootDir); FSUtils.setRootDir(conf, hbaseRootDir); + FSUtils.setLogRootDir(conf, hbaseLogRootDir); } @AfterClass @@ -130,16 +134,20 @@ public class TestWALObserver { // this.cluster = TEST_UTIL.getDFSCluster(); this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); this.hbaseRootDir = FSUtils.getRootDir(conf); - this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName()); - this.oldLogDir = new Path(this.hbaseRootDir, + this.hbaseLogRootDir = FSUtils.getLogRootDir(conf); + this.dir = new Path(this.hbaseLogRootDir, TestWALObserver.class.getName()); + this.oldLogDir = new Path(this.hbaseLogRootDir, HConstants.HREGION_OLDLOGDIR_NAME); - this.logDir = new Path(this.hbaseRootDir, + this.logDir = new Path(this.hbaseLogRootDir, DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName())); this.logName = HConstants.HREGION_LOGDIR_NAME; if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); } + if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseLogRootDir)) { + TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseLogRootDir, true); + } this.wals = new WALFactory(conf, null, currentTest.getMethodName()); } @@ -153,6 +161,7 @@ public class TestWALObserver { LOG.debug("details of failure to close wal factory.", exception); } TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); + TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseLogRootDir, true); } /** @@ -512,7 +521,7 @@ public class TestWALObserver { private Path runWALSplit(final Configuration c) throws IOException { List splits = WALSplitter.split( - hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); + hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); // Split should generate only 1 file since there's only 1 region assertEquals(1, splits.size()); // Make sure the file exists diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterFromRegionSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterFromRegionSide.java index c574a95..cf39095 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterFromRegionSide.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilterFromRegionSide.java @@ -84,7 +84,7 @@ public class TestFilterFromRegionSide { } HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); REGION = HBaseTestingUtility - .createRegionAndWAL(info, TEST_UTIL.getDataTestDir(), TEST_UTIL.getConfiguration(), htd); + .createRegionAndWAL(info, TEST_UTIL.getDataTestDir(), TEST_UTIL.getConfiguration(), htd); for(Put put:createPuts(ROWS, FAMILIES, QUALIFIERS, VALUE)){ REGION.put(put); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java index 1aa75a1..ac6e80f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java @@ -88,6 +88,8 @@ public class TestBlockReorder { private static final String host1 = "host1"; private static final String host2 = "host2"; private static final String host3 = "host3"; + private static Path rootDir; + private static Path logRootDir; @Before public void setUp() throws Exception { @@ -101,10 +103,14 @@ public class TestBlockReorder { conf = htu.getConfiguration(); cluster = htu.getDFSCluster(); dfs = (DistributedFileSystem) FileSystem.get(conf); + rootDir = htu.createRootDir(); + logRootDir = htu.createLogRootDir(); } @After public void tearDownAfterClass() throws Exception { + dfs.delete(rootDir, true); + dfs.delete(logRootDir, true); htu.shutdownMiniCluster(); } @@ -277,7 +283,7 @@ public class TestBlockReorder { // Now we need to find the log file, its locations, and look at it - String rootDir = new Path(FSUtils.getRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME + + String logRootDir = new Path(FSUtils.getLogRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME + "/" + targetRs.getServerName().toString()).toUri().getPath(); DistributedFileSystem mdfs = (DistributedFileSystem) @@ -321,7 +327,7 @@ public class TestBlockReorder { p.add(sb, sb, sb); h.put(p); - DirectoryListing dl = dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME); + DirectoryListing dl = dfs.getClient().listPaths(logRootDir, HdfsFileStatus.EMPTY_NAME); HdfsFileStatus[] hfs = dl.getPartialListing(); // As we wrote a put, we should have at least one log file. @@ -329,8 +335,8 @@ public class TestBlockReorder { for (HdfsFileStatus hf : hfs) { // Because this is a live cluster, log files might get archived while we're processing try { - LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir); - String logFile = rootDir + "/" + hf.getLocalName(); + LOG.info("Log file found: " + hf.getLocalName() + " in " + logRootDir); + String logFile = logRootDir + "/" + hf.getLocalName(); FileStatus fsLog = rfs.getFileStatus(new Path(logFile)); LOG.info("Checking log file: " + logFile); @@ -457,7 +463,7 @@ public class TestBlockReorder { // Should be reordered, as we pretend to be a file name with a compliant stuff Assert.assertNotNull(conf.get(HConstants.HBASE_DIR)); Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty()); - String pseudoLogFile = conf.get(HConstants.HBASE_DIR) + "/" + + String pseudoLogFile = conf.get(HConstants.HBASE_LOG_DIR) + "/" + HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile"; // Check that it will be possible to extract a ServerName from our construction diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index 26583f3..293e3f3 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -60,7 +60,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; +import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoadBase; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; @@ -80,7 +80,7 @@ import com.google.protobuf.ServiceException; */ @Category(LargeTests.class) public class TestLoadIncrementalHFilesSplitRecovery { - private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class); + private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadBase.class); static HBaseTestingUtility util; //used by secure subclass @@ -115,7 +115,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { for (int i = 0; i < NUM_CFS; i++) { Path testIn = new Path(dir, family(i)); - TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), + TestHRegionServerBulkLoadBase.createHFile(fs, new Path(testIn, "hfile_" + i), Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index 343fc64..62b683f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -30,6 +30,7 @@ import java.io.PrintStream; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -67,16 +69,28 @@ import org.mockito.stubbing.Answer; public class TestWALPlayer { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static MiniHBaseCluster cluster; + private static Path rootDir; + private static Path logRootDir; + private static FileSystem fs; + private static FileSystem logFs; + private static Configuration conf; @BeforeClass public static void beforeClass() throws Exception { TEST_UTIL.setJobWithoutMRCluster(); + conf= TEST_UTIL.getConfiguration(); + rootDir = TEST_UTIL.createRootDir(); + logRootDir = TEST_UTIL.createLogRootDir(); + fs = FSUtils.getRootDirFileSystem(conf); + logFs = FSUtils.getLogRootDirFileSystem(conf); cluster = TEST_UTIL.startMiniCluster(); } @AfterClass public static void afterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); + fs.delete(rootDir, true); + logFs.delete(logRootDir, true); } /** @@ -108,7 +122,7 @@ public class TestWALPlayer { WAL log = cluster.getRegionServer(0).getWAL(null); log.rollWriter(); String walInputDir = new Path(cluster.getMaster().getMasterFileSystem() - .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString(); + .getLogRootDir(), HConstants.HREGION_LOGDIR_NAME).toString(); Configuration configuration= TEST_UTIL.getConfiguration(); WALPlayer player = new WALPlayer(configuration); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index 201ffe6..7bb00e6 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader; import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; @@ -64,7 +65,9 @@ public class TestWALRecordReader { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static Configuration conf; private static FileSystem fs; + private static FileSystem logFs; private static Path hbaseDir; + private static Path logRootDir; // visible for TestHLogRecordReader static final TableName tableName = TableName.valueOf(getName()); private static final byte [] rowName = tableName.getName(); @@ -84,11 +87,8 @@ public class TestWALRecordReader { @Before public void setUp() throws Exception { mvcc = new MultiVersionConcurrencyControl(); - FileStatus[] entries = fs.listStatus(hbaseDir); - for (FileStatus dir : entries) { - fs.delete(dir.getPath(), true); - } - + fs.delete(hbaseDir, true); + logFs.delete(logRootDir, true); } @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -102,8 +102,9 @@ public class TestWALRecordReader { fs = TEST_UTIL.getDFSCluster().getFileSystem(); hbaseDir = TEST_UTIL.createRootDir(); - - logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); + logRootDir = TEST_UTIL.createLogRootDir(); + logFs = FSUtils.getLogRootDirFileSystem(conf); + logDir = new Path(logRootDir, HConstants.HREGION_LOGDIR_NAME); htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor(family)); @@ -111,6 +112,8 @@ public class TestWALRecordReader { @AfterClass public static void tearDownAfterClass() throws Exception { + fs.delete(hbaseDir, true); + logFs.delete(logRootDir, true); TEST_UTIL.shutdownMiniCluster(); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystemWithLogDir.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystemWithLogDir.java new file mode 100644 index 0000000..c0f7e50 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFileSystemWithLogDir.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + + +import static org.junit.Assert.*; + +/** + * Test the master filesystem in a local cluster + */ +@Category(MediumTests.class) +public class TestMasterFileSystemWithLogDir { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setupTest() throws Exception { + UTIL.startMiniClusterWithLogDir(); + } + + @AfterClass + public static void teardownTest() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void testFsUriSetProperly() throws Exception { + HMaster master = UTIL.getMiniHBaseCluster().getMaster(); + MasterFileSystem fs = master.getMasterFileSystem(); + Path masterRoot = FSUtils.getRootDir(fs.conf); + Path rootDir = FSUtils.getRootDir(fs.getFileSystem().getConf()); + assertEquals(masterRoot, rootDir); + assertEquals(FSUtils.getLogRootDir(UTIL.getConfiguration()), fs.getLogRootDir()); + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java index 118270a..8e35461 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java @@ -1,383 +1,40 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.collect.Lists; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; -import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; -import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.RegionServerCallable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.RpcRetryingCaller; -import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; -import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALKey; import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.google.common.collect.Lists; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -/** - * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of - * the region server's bullkLoad functionality. - */ -@Category(LargeTests.class) -public class TestHRegionServerBulkLoad { - private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class); - private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private final static Configuration conf = UTIL.getConfiguration(); - private final static byte[] QUAL = Bytes.toBytes("qual"); - private final static int NUM_CFS = 10; - public static int BLOCKSIZE = 64 * 1024; - public static Algorithm COMPRESSION = Compression.Algorithm.NONE; - - private final static byte[][] families = new byte[NUM_CFS][]; - static { - for (int i = 0; i < NUM_CFS; i++) { - families[i] = Bytes.toBytes(family(i)); - } - } - - /** - * Create a rowkey compatible with - * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}. - */ - public static byte[] rowkey(int i) { - return Bytes.toBytes(String.format("row_%08d", i)); - } - - static String family(int i) { - return String.format("family_%04d", i); - } - - /** - * Create an HFile with the given number of rows with a specified value. - */ - public static void createHFile(FileSystem fs, Path path, byte[] family, - byte[] qualifier, byte[] value, int numRows) throws IOException { - HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE) - .withCompression(COMPRESSION) - .build(); - HFile.Writer writer = HFile - .getWriterFactory(conf, new CacheConfig(conf)) - .withPath(fs, path) - .withFileContext(context) - .create(); - long now = System.currentTimeMillis(); - try { - // subtract 2 since iterateOnSplits doesn't include boundary keys - for (int i = 0; i < numRows; i++) { - KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value); - writer.append(kv); - } - writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(now)); - } finally { - writer.close(); - } - } - - /** - * Thread that does full scans of the table looking for any partially - * completed rows. - * - * Each iteration of this loads 10 hdfs files, which occupies 5 file open file - * handles. So every 10 iterations (500 file handles) it does a region - * compaction to reduce the number of open file handles. - */ - public static class AtomicHFileLoader extends RepeatingTestThread { - final AtomicLong numBulkLoads = new AtomicLong(); - final AtomicLong numCompactions = new AtomicLong(); - private TableName tableName; - - public AtomicHFileLoader(TableName tableName, TestContext ctx, - byte targetFamilies[][]) throws IOException { - super(ctx); - this.tableName = tableName; - } - - public void doAnAction() throws Exception { - long iteration = numBulkLoads.getAndIncrement(); - Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", - iteration)); - - // create HFiles for different column families - FileSystem fs = UTIL.getTestFileSystem(); - byte[] val = Bytes.toBytes(String.format("%010d", iteration)); - final List> famPaths = new ArrayList>( - NUM_CFS); - for (int i = 0; i < NUM_CFS; i++) { - Path hfile = new Path(dir, family(i)); - byte[] fam = Bytes.toBytes(family(i)); - createHFile(fs, hfile, fam, QUAL, val, 1000); - famPaths.add(new Pair(fam, hfile.toString())); - } - - // bulk load HFiles - final HConnection conn = UTIL.getHBaseAdmin().getConnection(); - RegionServerCallable callable = - new RegionServerCallable(conn, tableName, Bytes.toBytes("aaa")) { - @Override - public Void call(int callTimeout) throws Exception { - LOG.debug("Going to connect to server " + getLocation() + " for row " - + Bytes.toStringBinary(getRow())); - byte[] regionName = getLocation().getRegionInfo().getRegionName(); - BulkLoadHFileRequest request = - RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true); - getStub().bulkLoadHFile(null, request); - return null; - } - }; - RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); - RpcRetryingCaller caller = factory. newCaller(); - caller.callWithRetries(callable, Integer.MAX_VALUE); - - // Periodically do compaction to reduce the number of open file handles. - if (numBulkLoads.get() % 10 == 0) { - // 10 * 50 = 500 open file handles! - callable = new RegionServerCallable(conn, tableName, Bytes.toBytes("aaa")) { - @Override - public Void call(int callTimeout) throws Exception { - LOG.debug("compacting " + getLocation() + " for row " - + Bytes.toStringBinary(getRow())); - AdminProtos.AdminService.BlockingInterface server = - conn.getAdmin(getLocation().getServerName()); - CompactRegionRequest request = - RequestConverter.buildCompactRegionRequest( - getLocation().getRegionInfo().getRegionName(), true, null); - server.compactRegion(null, request); - numCompactions.incrementAndGet(); - return null; - } - }; - caller.callWithRetries(callable, Integer.MAX_VALUE); - } - } - } - - /** - * Thread that does full scans of the table looking for any partially - * completed rows. - */ - public static class AtomicScanReader extends RepeatingTestThread { - byte targetFamilies[][]; - HTable table; - AtomicLong numScans = new AtomicLong(); - AtomicLong numRowsScanned = new AtomicLong(); - TableName TABLE_NAME; - - public AtomicScanReader(TableName TABLE_NAME, TestContext ctx, - byte targetFamilies[][]) throws IOException { - super(ctx); - this.TABLE_NAME = TABLE_NAME; - this.targetFamilies = targetFamilies; - table = new HTable(conf, TABLE_NAME); - } - - public void doAnAction() throws Exception { - Scan s = new Scan(); - for (byte[] family : targetFamilies) { - s.addFamily(family); - } - ResultScanner scanner = table.getScanner(s); - - for (Result res : scanner) { - byte[] lastRow = null, lastFam = null, lastQual = null; - byte[] gotValue = null; - for (byte[] family : targetFamilies) { - byte qualifier[] = QUAL; - byte thisValue[] = res.getValue(family, qualifier); - if (gotValue != null && thisValue != null - && !Bytes.equals(gotValue, thisValue)) { - - StringBuilder msg = new StringBuilder(); - msg.append("Failed on scan ").append(numScans) - .append(" after scanning ").append(numRowsScanned) - .append(" rows!\n"); - msg.append("Current was " + Bytes.toString(res.getRow()) + "/" - + Bytes.toString(family) + ":" + Bytes.toString(qualifier) - + " = " + Bytes.toString(thisValue) + "\n"); - msg.append("Previous was " + Bytes.toString(lastRow) + "/" - + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual) - + " = " + Bytes.toString(gotValue)); - throw new RuntimeException(msg.toString()); - } - - lastFam = family; - lastQual = qualifier; - lastRow = res.getRow(); - gotValue = thisValue; - } - numRowsScanned.getAndIncrement(); - } - numScans.getAndIncrement(); - } - } - - /** - * Creates a table with given table name and specified number of column - * families if the table does not already exist. - */ - private void setupTable(TableName table, int cfs) throws IOException { - try { - LOG.info("Creating table " + table); - HTableDescriptor htd = new HTableDescriptor(table); - for (int i = 0; i < 10; i++) { - htd.addFamily(new HColumnDescriptor(family(i))); - } - - UTIL.getHBaseAdmin().createTable(htd); - } catch (TableExistsException tee) { - LOG.info("Table " + table + " already exists"); - } - } - - /** - * Atomic bulk load. - */ - @Test - public void testAtomicBulkLoad() throws Exception { - TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad"); - - int millisToRun = 30000; - int numScanners = 50; - - UTIL.startMiniCluster(1); - try { - WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null); - FindBulkHBaseListener listener = new FindBulkHBaseListener(); - log.registerWALActionsListener(listener); - runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners); - assertThat(listener.isFound(), is(true)); - } finally { - UTIL.shutdownMiniCluster(); - } - } - - void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) - throws Exception { - setupTable(tableName, 10); - - TestContext ctx = new TestContext(UTIL.getConfiguration()); - - AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); - ctx.addThread(loader); - - List scanners = Lists.newArrayList(); - for (int i = 0; i < numScanners; i++) { - AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); - scanners.add(scanner); - ctx.addThread(scanner); - } - - ctx.startThreads(); - ctx.waitFor(millisToRun); - ctx.stop(); - - LOG.info("Loaders:"); - LOG.info(" loaded " + loader.numBulkLoads.get()); - LOG.info(" compations " + loader.numCompactions.get()); - - LOG.info("Scanners:"); - for (AtomicScanReader scanner : scanners) { - LOG.info(" scanned " + scanner.numScans.get()); - LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); - } - } - - /** - * Run test on an HBase instance for 5 minutes. This assumes that the table - * under test only has a single region. - */ - public static void main(String args[]) throws Exception { - try { - Configuration c = HBaseConfiguration.create(); - TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(); - test.setConf(c); - test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50); - } finally { - System.exit(0); // something hangs (believe it is lru threadpool) - } - } - - private void setConf(Configuration c) { - UTIL = new HBaseTestingUtility(c); - } - - static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener { - private boolean found = false; - - @Override - public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) { - for (Cell cell : logEdit.getCells()) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - for (Map.Entry entry : kv.toStringMap().entrySet()) { - if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) { - found = true; - } +public class TestHRegionServerBulkLoad extends TestHRegionServerBulkLoadBase{ + /** + * Atomic bulk load. + */ + @Test + public void testAtomicBulkLoad() throws Exception { + TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad"); + + int millisToRun = 30000; + int numScanners = 50; + + UTIL.startMiniCluster(1); + try { + Path logRootDir = UTIL.getHBaseCluster().getRegionServer(0).getLogRootDir(); + Path rootDir = UTIL.getHBaseCluster().getRegionServer(0).getRootDir(); + assertEquals(logRootDir, FSUtils.getLogRootDir(UTIL.getConfiguration())); + assertEquals(rootDir, FSUtils.getRootDir(UTIL.getConfiguration())); + WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null); + FindBulkHBaseListener listener = new FindBulkHBaseListener(); + log.registerWALActionsListener(listener); + runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners); + assertThat(listener.isFound(), is(true)); + } finally { + UTIL.shutdownMiniCluster(); } - } } - public boolean isFound() { - return found; - } - } } - - diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadBase.java new file mode 100644 index 0000000..50fd24d --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadBase.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALKey; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of + * the region server's bullkLoad functionality. + */ +@Category(LargeTests.class) +public class TestHRegionServerBulkLoadBase { + private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoadBase.class); + protected static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private final static Configuration conf = UTIL.getConfiguration(); + private final static byte[] QUAL = Bytes.toBytes("qual"); + private final static int NUM_CFS = 10; + public static int BLOCKSIZE = 64 * 1024; + public static Algorithm COMPRESSION = Compression.Algorithm.NONE; + + private final static byte[][] families = new byte[NUM_CFS][]; + static { + for (int i = 0; i < NUM_CFS; i++) { + families[i] = Bytes.toBytes(family(i)); + } + } + + /** + * Create a rowkey compatible with + * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}. + */ + public static byte[] rowkey(int i) { + return Bytes.toBytes(String.format("row_%08d", i)); + } + + static String family(int i) { + return String.format("family_%04d", i); + } + + /** + * Create an HFile with the given number of rows with a specified value. + */ + public static void createHFile(FileSystem fs, Path path, byte[] family, + byte[] qualifier, byte[] value, int numRows) throws IOException { + HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE) + .withCompression(COMPRESSION) + .build(); + HFile.Writer writer = HFile + .getWriterFactory(conf, new CacheConfig(conf)) + .withPath(fs, path) + .withFileContext(context) + .create(); + long now = System.currentTimeMillis(); + try { + // subtract 2 since iterateOnSplits doesn't include boundary keys + for (int i = 0; i < numRows; i++) { + KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value); + writer.append(kv); + } + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(now)); + } finally { + writer.close(); + } + } + + /** + * Thread that does full scans of the table looking for any partially + * completed rows. + * + * Each iteration of this loads 10 hdfs files, which occupies 5 file open file + * handles. So every 10 iterations (500 file handles) it does a region + * compaction to reduce the number of open file handles. + */ + public static class AtomicHFileLoader extends RepeatingTestThread { + final AtomicLong numBulkLoads = new AtomicLong(); + final AtomicLong numCompactions = new AtomicLong(); + private TableName tableName; + + public AtomicHFileLoader(TableName tableName, TestContext ctx, + byte targetFamilies[][]) throws IOException { + super(ctx); + this.tableName = tableName; + } + + public void doAnAction() throws Exception { + long iteration = numBulkLoads.getAndIncrement(); + Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d", + iteration)); + + // create HFiles for different column families + FileSystem fs = UTIL.getTestFileSystem(); + byte[] val = Bytes.toBytes(String.format("%010d", iteration)); + final List> famPaths = new ArrayList>( + NUM_CFS); + for (int i = 0; i < NUM_CFS; i++) { + Path hfile = new Path(dir, family(i)); + byte[] fam = Bytes.toBytes(family(i)); + createHFile(fs, hfile, fam, QUAL, val, 1000); + famPaths.add(new Pair(fam, hfile.toString())); + } + + // bulk load HFiles + final HConnection conn = UTIL.getHBaseAdmin().getConnection(); + RegionServerCallable callable = + new RegionServerCallable(conn, tableName, Bytes.toBytes("aaa")) { + @Override + public Void call(int callTimeout) throws Exception { + LOG.debug("Going to connect to server " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + BulkLoadHFileRequest request = + RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true); + getStub().bulkLoadHFile(null, request); + return null; + } + }; + RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf); + RpcRetryingCaller caller = factory. newCaller(); + caller.callWithRetries(callable, Integer.MAX_VALUE); + + // Periodically do compaction to reduce the number of open file handles. + if (numBulkLoads.get() % 10 == 0) { + // 10 * 50 = 500 open file handles! + callable = new RegionServerCallable(conn, tableName, Bytes.toBytes("aaa")) { + @Override + public Void call(int callTimeout) throws Exception { + LOG.debug("compacting " + getLocation() + " for row " + + Bytes.toStringBinary(getRow())); + AdminProtos.AdminService.BlockingInterface server = + conn.getAdmin(getLocation().getServerName()); + CompactRegionRequest request = + RequestConverter.buildCompactRegionRequest( + getLocation().getRegionInfo().getRegionName(), true, null); + server.compactRegion(null, request); + numCompactions.incrementAndGet(); + return null; + } + }; + caller.callWithRetries(callable, Integer.MAX_VALUE); + } + } + } + + /** + * Thread that does full scans of the table looking for any partially + * completed rows. + */ + public static class AtomicScanReader extends RepeatingTestThread { + byte targetFamilies[][]; + HTable table; + AtomicLong numScans = new AtomicLong(); + AtomicLong numRowsScanned = new AtomicLong(); + TableName TABLE_NAME; + + public AtomicScanReader(TableName TABLE_NAME, TestContext ctx, + byte targetFamilies[][]) throws IOException { + super(ctx); + this.TABLE_NAME = TABLE_NAME; + this.targetFamilies = targetFamilies; + table = new HTable(conf, TABLE_NAME); + } + + public void doAnAction() throws Exception { + Scan s = new Scan(); + for (byte[] family : targetFamilies) { + s.addFamily(family); + } + ResultScanner scanner = table.getScanner(s); + + for (Result res : scanner) { + byte[] lastRow = null, lastFam = null, lastQual = null; + byte[] gotValue = null; + for (byte[] family : targetFamilies) { + byte qualifier[] = QUAL; + byte thisValue[] = res.getValue(family, qualifier); + if (gotValue != null && thisValue != null + && !Bytes.equals(gotValue, thisValue)) { + + StringBuilder msg = new StringBuilder(); + msg.append("Failed on scan ").append(numScans) + .append(" after scanning ").append(numRowsScanned) + .append(" rows!\n"); + msg.append("Current was " + Bytes.toString(res.getRow()) + "/" + + Bytes.toString(family) + ":" + Bytes.toString(qualifier) + + " = " + Bytes.toString(thisValue) + "\n"); + msg.append("Previous was " + Bytes.toString(lastRow) + "/" + + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual) + + " = " + Bytes.toString(gotValue)); + throw new RuntimeException(msg.toString()); + } + + lastFam = family; + lastQual = qualifier; + lastRow = res.getRow(); + gotValue = thisValue; + } + numRowsScanned.getAndIncrement(); + } + numScans.getAndIncrement(); + } + } + + /** + * Creates a table with given table name and specified number of column + * families if the table does not already exist. + */ + private void setupTable(TableName table, int cfs) throws IOException { + try { + LOG.info("Creating table " + table); + HTableDescriptor htd = new HTableDescriptor(table); + for (int i = 0; i < 10; i++) { + htd.addFamily(new HColumnDescriptor(family(i))); + } + + UTIL.getHBaseAdmin().createTable(htd); + } catch (TableExistsException tee) { + LOG.info("Table " + table + " already exists"); + } + } + + void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners) + throws Exception { + setupTable(tableName, 10); + + TestContext ctx = new TestContext(UTIL.getConfiguration()); + + AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null); + ctx.addThread(loader); + + List scanners = Lists.newArrayList(); + for (int i = 0; i < numScanners; i++) { + AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families); + scanners.add(scanner); + ctx.addThread(scanner); + } + + ctx.startThreads(); + ctx.waitFor(millisToRun); + ctx.stop(); + + LOG.info("Loaders:"); + LOG.info(" loaded " + loader.numBulkLoads.get()); + LOG.info(" compations " + loader.numCompactions.get()); + + LOG.info("Scanners:"); + for (AtomicScanReader scanner : scanners) { + LOG.info(" scanned " + scanner.numScans.get()); + LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); + } + } + + /** + * Run test on an HBase instance for 5 minutes. This assumes that the table + * under test only has a single region. + */ + public static void main(String args[]) throws Exception { + try { + Configuration c = HBaseConfiguration.create(); + TestHRegionServerBulkLoadBase test = new TestHRegionServerBulkLoadBase(); + test.setConf(c); + test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50); + } finally { + System.exit(0); // something hangs (believe it is lru threadpool) + } + } + + private void setConf(Configuration c) { + UTIL = new HBaseTestingUtility(c); + } + + static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener { + private boolean found = false; + + @Override + public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) { + for (Cell cell : logEdit.getCells()) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + for (Map.Entry entry : kv.toStringMap().entrySet()) { + if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) { + found = true; + } + } + } + } + + public boolean isFound() { + return found; + } + } +} + + diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWIthLogDir.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWIthLogDir.java new file mode 100644 index 0000000..3802b39 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWIthLogDir.java @@ -0,0 +1,40 @@ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.*; + +public class TestHRegionServerBulkLoadWIthLogDir extends TestHRegionServerBulkLoadBase{ + /** + * Atomic bulk load. + */ + @Test + public void testAtomicBulkLoadWithLogDir() throws Exception { + TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad"); + + int millisToRun = 30000; + int numScanners = 50; + + UTIL.startMiniClusterWithLogDir(1); + assertNotEquals(FSUtils.getLogRootDir(UTIL.getConfiguration()),FSUtils.getRootDir(UTIL.getConfiguration())); + try { + Path logRootDir = UTIL.getHBaseCluster().getRegionServer(0).getLogRootDir(); + Path rootDir = UTIL.getHBaseCluster().getRegionServer(0).getRootDir(); + assertEquals(logRootDir, FSUtils.getLogRootDir(UTIL.getConfiguration())); + assertEquals(rootDir, FSUtils.getRootDir(UTIL.getConfiguration())); + WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null); + FindBulkHBaseListener listener = new FindBulkHBaseListener(); + log.registerWALActionsListener(listener); + runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners); + assertThat(listener.isFound(), is(true)); + } finally { + UTIL.shutdownMiniCluster(); + } + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index e09b621..ab5083d 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -88,6 +88,8 @@ public class TestFSHLog { protected static Configuration conf; protected static FileSystem fs; protected static Path dir; + protected static Path rootdir; + protected static Path logRootdir; protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @Rule @@ -99,8 +101,9 @@ public class TestFSHLog { for (FileStatus dir : entries) { fs.delete(dir.getPath(), true); } - final Path hbaseDir = TEST_UTIL.createRootDir(); - dir = new Path(hbaseDir, currentTest.getMethodName()); + rootdir = TEST_UTIL.createRootDir(); + logRootdir = TEST_UTIL.createLogRootDir(); + dir = new Path(logRootdir, currentTest.getMethodName()); } @After @@ -133,6 +136,8 @@ public class TestFSHLog { @AfterClass public static void tearDownAfterClass() throws Exception { + fs.delete(rootdir, true); + fs.delete(logRootdir, true); TEST_UTIL.shutdownMiniCluster(); } @@ -144,7 +149,7 @@ public class TestFSHLog { // test to see whether the coprocessor is loaded or not. FSHLog log = null; try { - log = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(), + log = new FSHLog(fs, logRootdir, dir.toString(), HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); WALCoprocessorHost host = log.getCoprocessorHost(); Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class.getName()); @@ -195,7 +200,7 @@ public class TestFSHLog { FSHLog wal1 = null; FSHLog walMeta = null; try { - wal1 = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(), + wal1 = new FSHLog(fs, logRootdir, dir.toString(), HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); LOG.debug("Log obtained is: " + wal1); Comparator comp = wal1.LOG_NAME_COMPARATOR; @@ -205,7 +210,7 @@ public class TestFSHLog { assertTrue(comp.compare(p1, p1) == 0); // comparing with different filenum. assertTrue(comp.compare(p1, p2) < 0); - walMeta = new FSHLog(fs, FSUtils.getRootDir(conf), dir.toString(), + walMeta = new FSHLog(fs, logRootdir, dir.toString(), HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, DefaultWALProvider.META_WAL_PROVIDER_ID); Comparator compMeta = walMeta.LOG_NAME_COMPARATOR; @@ -253,7 +258,7 @@ public class TestFSHLog { LOG.debug("testFindMemStoresEligibleForFlush"); Configuration conf1 = HBaseConfiguration.create(conf); conf1.setInt("hbase.regionserver.maxlogs", 1); - FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(), + FSHLog wal = new FSHLog(fs, logRootdir, dir.toString(), HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null); HTableDescriptor t1 = new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row")); @@ -330,7 +335,7 @@ public class TestFSHLog { @Test(expected=IOException.class) public void testFailedToCreateWALIfParentRenamed() throws IOException { final String name = "testFailedToCreateWALIfParentRenamed"; - FSHLog log = new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, + FSHLog log = new FSHLog(fs, logRootdir, name, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); long filenum = System.currentTimeMillis(); Path path = log.computeFilename(filenum); @@ -359,13 +364,13 @@ public class TestFSHLog { final byte[] rowName = tableName.getName(); final HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("f")); - HRegion r = HRegion.createHRegion(hri, TEST_UTIL.getDefaultRootDirPath(), + HRegion r = HRegion.createHRegion(hri, rootdir, TEST_UTIL.getConfiguration(), htd); HRegion.closeHRegion(r); final int countPerFamily = 10; final MutableBoolean goslow = new MutableBoolean(false); // subclass and doctor a method. - FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(), + FSHLog wal = new FSHLog(FileSystem.get(conf), logRootdir, testName, conf) { @Override void atHeadOfRingBufferEventHandlerAppend() { @@ -377,7 +382,7 @@ public class TestFSHLog { } }; HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(), - TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal); + TEST_UTIL.getTestFileSystem(), rootdir, hri, htd, wal); EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); try { List puts = null; @@ -430,7 +435,7 @@ public class TestFSHLog { SecurityException, IllegalArgumentException, IllegalAccessException { final String name = "testSyncRunnerIndexOverflow"; FSHLog log = - new FSHLog(fs, FSUtils.getRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, conf, + new FSHLog(fs, FSUtils.getLogRootDir(conf), name, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); try { Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler"); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 440120e..b1170ca 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -70,8 +70,9 @@ public class TestLogRollAbort { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); /* For the split-then-roll test */ - private static final Path HBASEDIR = new Path("/hbase"); - private static final Path OLDLOGDIR = new Path(HBASEDIR, HConstants.HREGION_OLDLOGDIR_NAME); + private static final Path HBASE_DIR = new Path("/hbase"); + private static final Path HBASE_LOG_DIR = new Path("/hbaselog"); + private static final Path OLD_LOG_DIR = new Path(HBASE_LOG_DIR, HConstants.HREGION_OLDLOGDIR_NAME); // Need to override this setup so we can edit the config before it gets sent // to the HDFS & HBase cluster startup. @@ -111,7 +112,8 @@ public class TestLogRollAbort { // disable region rebalancing (interferes with log watching) cluster.getMaster().balanceSwitch(false); - FSUtils.setRootDir(conf, HBASEDIR); + FSUtils.setRootDir(conf, HBASE_DIR); + FSUtils.setLogRootDir(conf, HBASE_LOG_DIR); } @After @@ -183,7 +185,7 @@ public class TestLogRollAbort { public void testLogRollAfterSplitStart() throws IOException { LOG.info("Verify wal roll after split starts will fail."); String logName = "testLogRollAfterSplitStart"; - Path thisTestsDir = new Path(HBASEDIR, DefaultWALProvider.getWALDirectoryName(logName)); + Path thisTestsDir = new Path(HBASE_LOG_DIR, DefaultWALProvider.getWALDirectoryName(logName)); final WALFactory wals = new WALFactory(conf, null, logName); try { @@ -219,7 +221,7 @@ public class TestLogRollAbort { LOG.debug("Renamed region directory: " + rsSplitDir); LOG.debug("Processing the old log files."); - WALSplitter.split(HBASEDIR, rsSplitDir, OLDLOGDIR, fs, conf, wals); + WALSplitter.split(HBASE_LOG_DIR, rsSplitDir, OLD_LOG_DIR, fs, conf, wals); LOG.debug("Trying to roll the WAL."); try { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index bf746d4..16f67f2 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -55,19 +55,25 @@ public class TestWALActionsListener { private final static byte[] SOME_BYTES = Bytes.toBytes("t"); private static FileSystem fs; private static Configuration conf; + private static Path rootDir; + private static Path logRootDir; + private static FileSystem logFs; @BeforeClass public static void setUpBeforeClass() throws Exception { conf = TEST_UTIL.getConfiguration(); conf.setInt("hbase.regionserver.maxlogs", 5); - fs = FileSystem.get(conf); - FSUtils.setRootDir(conf, TEST_UTIL.getDataTestDir()); + rootDir = TEST_UTIL.createRootDir(); + logRootDir = TEST_UTIL.createLogRootDir(); + fs = FSUtils.getRootDirFileSystem(conf); + logFs = FSUtils.getLogRootDirFileSystem(conf); } @Before public void setUp() throws Exception { fs.delete(new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME), true); - fs.delete(new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME), true); + logFs.delete(new Path(logRootDir, HConstants.HREGION_LOGDIR_NAME), true); + logFs.delete(new Path(logRootDir, HConstants.HREGION_OLDLOGDIR_NAME), true); } @After diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 4c34823..e869183 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -108,6 +108,7 @@ public class TestWALReplay { static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate(); private Path hbaseRootDir = null; + private Path hbaseLogRootDir = null; private String logName; private Path oldLogDir; private Path logDir; @@ -129,8 +130,11 @@ public class TestWALReplay { TEST_UTIL.startMiniCluster(3); Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); - LOG.info("hbase.rootdir=" + hbaseRootDir); + Path hbaseLogRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbaselog")); + LOG.info(HConstants.HBASE_DIR + "=" + hbaseRootDir); + LOG.info(HConstants.HBASE_LOG_DIR + "=" + hbaseLogRootDir); FSUtils.setRootDir(conf, hbaseRootDir); + FSUtils.setRootDir(conf, hbaseLogRootDir); } @AfterClass @@ -143,12 +147,16 @@ public class TestWALReplay { this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); this.fs = TEST_UTIL.getDFSCluster().getFileSystem(); this.hbaseRootDir = FSUtils.getRootDir(this.conf); - this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); + this.hbaseLogRootDir = FSUtils.getLogRootDir(this.conf); + this.oldLogDir = new Path(this.hbaseLogRootDir, HConstants.HREGION_OLDLOGDIR_NAME); this.logName = DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName() + "-manual"); - this.logDir = new Path(this.hbaseRootDir, logName); + this.logDir = new Path(this.hbaseLogRootDir, logName); if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); } + if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseLogRootDir)) { + TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseLogRootDir, true); + } this.mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); this.wals = new WALFactory(conf, null, currentTest.getMethodName()); @@ -158,6 +166,7 @@ public class TestWALReplay { public void tearDown() throws Exception { this.wals.close(); TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); + TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseLogRootDir, true); } /* @@ -943,7 +952,7 @@ public class TestWALReplay { final byte[] rowName = tableName.getName(); final int countPerFamily = 10; final HTableDescriptor htd = createBasic1FamilyHTD(tableName); - HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); + HRegion region1 = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, hbaseLogRootDir, this.conf, htd); Path regionDir = region1.getRegionFileSystem().getRegionDir(); HBaseTestingUtility.closeRegionAndWAL(region1); @@ -1030,9 +1039,9 @@ public class TestWALReplay { static class MockWAL extends FSHLog { boolean doCompleteCacheFlush = false; - public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf) + public MockWAL(FileSystem fs, Path logRootDir, String logName, Configuration conf) throws IOException { - super(fs, rootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); + super(fs, logRootDir, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); } @Override @@ -1052,7 +1061,7 @@ public class TestWALReplay { } private MockWAL createMockWAL() throws IOException { - MockWAL wal = new MockWAL(fs, hbaseRootDir, logName, conf); + MockWAL wal = new MockWAL(fs, hbaseLogRootDir, logName, conf); // Set down maximum recovery so we dfsclient doesn't linger retrying something // long gone. HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); @@ -1144,7 +1153,7 @@ public class TestWALReplay { */ private Path runWALSplit(final Configuration c) throws IOException { List splits = WALSplitter.split( - hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); + hbaseLogRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); // Split should generate only 1 file since there's only 1 region assertEquals("splits=" + splits, 1, splits.size()); // Make sure the file exists @@ -1159,7 +1168,7 @@ public class TestWALReplay { * @throws IOException */ private WAL createWAL(final Configuration c) throws IOException { - FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c); + FSHLog wal = new FSHLog(FileSystem.get(c), hbaseLogRootDir, logName, c); // Set down maximum recovery so we dfsclient doesn't linger retrying something // long gone. HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index 2699292..24d604f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -381,6 +381,43 @@ public class TestFSUtils { verifyFileInDirWithStoragePolicy("1772"); } + @Test + public void testSetLogRootDir() throws Exception { + HBaseTestingUtility htu = new HBaseTestingUtility(); + Configuration conf = htu.getConfiguration(); + Path p = new Path("file:///hbase/root"); + FSUtils.setLogRootDir(conf, p); + assertEquals(p.toString(), conf.get(HConstants.HBASE_LOG_DIR)); + } + + @Test + public void testGetLogRootDir() throws Exception { + HBaseTestingUtility htu = new HBaseTestingUtility(); + Configuration conf = htu.getConfiguration(); + Path root = new Path("file:///hbase/root"); + Path logRoot = new Path("file:///hbase/logroot"); + FSUtils.setRootDir(conf, root); + assertEquals(FSUtils.getRootDir(conf), root); + assertEquals(FSUtils.getLogRootDir(conf), root); + FSUtils.setLogRootDir(conf, logRoot); + assertEquals(FSUtils.getLogRootDir(conf), logRoot); + } + + @Test + public void testRemoveLogRootPath() throws Exception { + HBaseTestingUtility htu = new HBaseTestingUtility(); + Configuration conf = htu.getConfiguration(); + FSUtils.setRootDir(conf, new Path("file:///user/hbase")); + Path testFile = new Path(FSUtils.getRootDir(conf), "test/testfile"); + Path tmpFile = new Path("file:///test/testfile"); + assertEquals(FSUtils.removeLogRootPath(testFile, conf), "test/testfile"); + assertEquals(FSUtils.removeLogRootPath(tmpFile, conf), tmpFile.toString()); + FSUtils.setLogRootDir(conf, new Path("file:///user/hbaseLogDir")); + assertEquals(FSUtils.removeLogRootPath(testFile, conf), testFile.toString()); + Path logFile = new Path(FSUtils.getLogRootDir(conf), "test/testlog"); + assertEquals(FSUtils.removeLogRootPath(logFile, conf), "test/testlog"); + } + private void cleanupFile(FileSystem fileSys, Path name) throws IOException { assertTrue(fileSys.exists(name)); assertTrue(fileSys.delete(name, true)); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java index 3029601..771a81c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSVisitor.java @@ -29,6 +29,7 @@ import java.util.UUID; import java.util.Set; import java.util.HashSet; +import org.apache.hadoop.conf.Configuration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataOutputStream; @@ -60,15 +61,21 @@ public class TestFSVisitor { private Set serverLogs; private FileSystem fs; + private FileSystem logFs; private Path tableDir; private Path logsDir; private Path rootDir; + private Path logRootDir; + private Configuration conf; @Before public void setUp() throws Exception { - fs = FileSystem.get(TEST_UTIL.getConfiguration()); - rootDir = TEST_UTIL.getDataTestDir("hbase"); - logsDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + conf = TEST_UTIL.getConfiguration(); + rootDir = TEST_UTIL.createRootDir(); + logRootDir = TEST_UTIL.createLogRootDir(); + logsDir = new Path(logRootDir, HConstants.HREGION_LOGDIR_NAME); + fs = FSUtils.getRootDirFileSystem(conf); + logFs = FSUtils.getLogRootDirFileSystem(conf); tableFamilies = new HashSet(); tableRegions = new HashSet(); @@ -85,6 +92,7 @@ public class TestFSVisitor { @After public void tearDown() throws Exception { fs.delete(rootDir, true); + logFs.delete(logRootDir, true); } @Test @@ -124,7 +132,7 @@ public class TestFSVisitor { public void testVisitLogFiles() throws IOException { final Set servers = new HashSet(); final Set logs = new HashSet(); - FSVisitor.visitLogFiles(fs, rootDir, new FSVisitor.LogFileVisitor() { + FSVisitor.visitLogFiles(logFs, logRootDir, new FSVisitor.LogFileVisitor() { public void logFile (final String server, final String logfile) throws IOException { servers.add(server); logs.add(logfile); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index d2581a1..b1800f8 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -101,7 +101,7 @@ public class IOTestProvider implements WALProvider { providerId = DEFAULT_PROVIDER_ID; } final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; - log = new IOTestWAL(FileSystem.get(conf), FSUtils.getRootDir(conf), + log = new IOTestWAL(FSUtils.getLogRootDirFileSystem(conf), FSUtils.getLogRootDir(conf), DefaultWALProvider.getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java index 4402909..ce8f2de 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java @@ -66,6 +66,9 @@ public class TestDefaultWALProvider { protected static Configuration conf; protected static FileSystem fs; + protected static FileSystem logFs; + protected static Path rootDir; + protected static Path logRootDir; protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); protected MultiVersionConcurrencyControl mvcc; @@ -79,6 +82,8 @@ public class TestDefaultWALProvider { for (FileStatus dir : entries) { fs.delete(dir.getPath(), true); } + fs.delete(rootDir, true); + logFs.delete(logRootDir, true); } @After @@ -104,13 +109,17 @@ public class TestDefaultWALProvider { TEST_UTIL.startMiniDFSCluster(3); // Set up a working space for our tests. - TEST_UTIL.createRootDir(); + rootDir = TEST_UTIL.createRootDir(); + logRootDir = TEST_UTIL.createLogRootDir(); conf = TEST_UTIL.getConfiguration(); - fs = TEST_UTIL.getDFSCluster().getFileSystem(); + fs = FSUtils.getRootDirFileSystem(conf); + logFs = FSUtils.getLogRootDirFileSystem(conf); } @AfterClass public static void tearDownAfterClass() throws Exception { + fs.delete(rootDir, true); + logFs.delete(logRootDir, true); TEST_UTIL.shutdownMiniCluster(); } @@ -121,13 +130,13 @@ public class TestDefaultWALProvider { @Test public void testGetServerNameFromWALDirectoryName() throws IOException { ServerName sn = ServerName.valueOf("hn", 450, 1398); - String hl = FSUtils.getRootDir(conf) + "/" + + String hl = FSUtils.getLogRootDir(conf) + "/" + DefaultWALProvider.getWALDirectoryName(sn.toString()); // Must not throw exception assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, null)); assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, - FSUtils.getRootDir(conf).toUri().toString())); + FSUtils.getLogRootDir(conf).toUri().toString())); assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, "")); assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, " ")); assertNull(DefaultWALProvider.getServerNameFromWALDirectoryName(conf, hl)); @@ -136,7 +145,7 @@ public class TestDefaultWALProvider { final String wals = "/WALs/"; ServerName parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, - FSUtils.getRootDir(conf).toUri().toString() + wals + sn + + FSUtils.getLogRootDir(conf).toUri().toString() + wals + sn + "/localhost%2C32984%2C1343316388997.1343316390417"); assertEquals("standard", sn, parsed); @@ -144,7 +153,7 @@ public class TestDefaultWALProvider { assertEquals("subdir", sn, parsed); parsed = DefaultWALProvider.getServerNameFromWALDirectoryName(conf, - FSUtils.getRootDir(conf).toUri().toString() + wals + sn + + FSUtils.getLogRootDir(conf).toUri().toString() + wals + sn + "-splitting/localhost%3A57020.1340474893931"); assertEquals("split", sn, parsed); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALLogDir.java hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALLogDir.java new file mode 100644 index 0000000..8e3aa31 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALLogDir.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +@Category(MediumTests.class) +public class TestWALLogDir { + private static final Log LOG = LogFactory.getLog(TestWALLogDir.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Configuration conf; + private static FileSystem fs; + private static FileSystem logFs; + static final TableName tableName = TableName.valueOf("TestWALLogDir"); + private static final byte [] rowName = Bytes.toBytes("row"); + private static final byte [] family = Bytes.toBytes("column"); + private static HTableDescriptor htd; + private static Path logRootDir; + private static Path rootDir; + + @Before + public void setUp() throws Exception { + cleanup(); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + TEST_UTIL.startMiniDFSCluster(1); + rootDir = TEST_UTIL.createRootDir(); + logRootDir = TEST_UTIL.createLogRootDir(); + fs = FSUtils.getRootDirFileSystem(conf); + logFs = FSUtils.getLogRootDirFileSystem(conf); + htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + cleanup(); + TEST_UTIL.shutdownMiniDFSCluster(); + } + + @Test + public void testWALRootDir() throws Exception { + FileSystem logFs = logRootDir.getFileSystem(conf); + HRegionInfo regionInfo = new HRegionInfo(tableName); + HRegion region = HRegion.createHRegion(regionInfo, rootDir, this.conf, htd); + assertEquals("Expect 2 file in rootDir, which are regionInfo file and recovered.edits", 2, getWALFiles(fs, rootDir).size()); + WAL log = region.getWAL(); + byte [] value = Bytes.toBytes("value"); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), + System.currentTimeMillis(), value)); + long txid = log.append(htd, regionInfo, getWalKey(System.currentTimeMillis(), regionInfo), edit, true); + log.sync(txid); + assertEquals("Expect 1 log have been created", 1, getWALFiles(logFs, logRootDir).size()); + log.rollWriter(); + //Create 1 more WAL, and put old WAL into oldwal dir + assertEquals(1, getWALFiles(logFs, new Path(logRootDir, HConstants.HREGION_OLDLOGDIR_NAME)).size()); + assertEquals(1, getWALFiles(logFs, new Path(logRootDir, HConstants.HREGION_LOGDIR_NAME)).size()); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), + System.currentTimeMillis(), value)); + txid = log.append(htd, regionInfo, getWalKey(System.currentTimeMillis(), regionInfo), edit, true); + log.sync(txid); + log.rollWriter(); + log.shutdown(); + + assertEquals("Expect 2 logs in oldWALs dir", 2, getWALFiles(logFs, new Path(logRootDir, HConstants.HREGION_OLDLOGDIR_NAME)).size()); + assertEquals("Expect 1 logs in WALs dir", 1, getWALFiles(logFs, new Path(logRootDir, HConstants.HREGION_LOGDIR_NAME)).size()); + // No changes in number of files in rootDir + assertEquals("Should have no changes in number of files in rootDir, still be 2", 2, getWALFiles(fs, rootDir).size()); + } + + protected WALKey getWalKey(final long time, HRegionInfo hri) { + return new WALKey(hri.getEncodedNameAsBytes(), tableName, time); + } + + private List getWALFiles(FileSystem fs, Path dir) + throws IOException { + List result = new ArrayList(); + LOG.debug("Scanning " + dir.toString() + " for WAL files"); + + FileStatus[] files = fs.listStatus(dir); + if (files == null) return Collections.emptyList(); + for (FileStatus file : files) { + if (file.isDirectory()) { + // recurse into sub directories + result.addAll(getWALFiles(fs, file.getPath())); + } else { + String name = file.getPath().toString(); + if (!name.startsWith(".")) { + result.add(file); + } + } + } + return result; + } + + private static void cleanup() throws Exception{ + logFs.delete(logRootDir, true); + fs.delete(rootDir, true); + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index b599d1c..66a2069 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -304,7 +304,9 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { // First set the fs from configs. In case we are on hadoop1 FSUtils.setFsDefault(getConf(), FSUtils.getRootDir(getConf())); FileSystem fs = FileSystem.get(getConf()); + FileSystem logFs = FSUtils.getLogRootDirFileSystem(getConf()); LOG.info("FileSystem: " + fs); + LOG.info("Log FileSystem: " + logFs); SpanReceiverHost receiverHost = trace ? SpanReceiverHost.getInstance(getConf()) : null; final Sampler sampler = trace ? Sampler.ALWAYS : Sampler.NEVER; @@ -346,14 +348,14 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { } if (verify) { LOG.info("verifying written log entries."); - Path dir = new Path(FSUtils.getRootDir(getConf()), + Path dir = new Path(FSUtils.getLogRootDir(getConf()), DefaultWALProvider.getWALDirectoryName("wals")); long editCount = 0; - FileStatus [] fsss = fs.listStatus(dir); + FileStatus [] fsss = logFs.listStatus(dir); if (fsss.length == 0) throw new IllegalStateException("No WAL found"); for (FileStatus fss: fsss) { Path p = fss.getPath(); - if (!fs.exists(p)) throw new IllegalStateException(p.toString()); + if (!logFs.exists(p)) throw new IllegalStateException(p.toString()); editCount += verify(wals, p, verbose); } long expected = numIterations * numThreads;