Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java (revision 1544881) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java (working copy) @@ -90,7 +90,6 @@ thenReturn(false).thenReturn(false).thenReturn(true); Mockito.when(dfs.isFileClosed(FILE)).thenReturn(true); assertTrue(this.fsHDFSUtils.recoverDFSFileLease(dfs, FILE, HTU.getConfiguration(), reporter)); - Mockito.verify(dfs, Mockito.times(2)).recoverLease(FILE); Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (revision 1544881) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (working copy) @@ -33,7 +33,12 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; @@ -61,12 +66,14 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; @@ -121,6 +128,7 @@ private long zkretries; private long resubmit_threshold; + private int maxThreads; // max number of threads for submitting lease recovery private long timeout; private long unassignedTimeout; private long lastNodeCreateTime = Long.MAX_VALUE; @@ -220,6 +228,7 @@ this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES); this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT); this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT); + this.maxThreads = conf.getInt("hbase.splitlog.manager.max.threads", 6); this.unassignedTimeout = conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, @@ -328,63 +337,104 @@ SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet(); LOG.info("started splitting " + logfiles.length + " logs in " + logDirs); long t = EnvironmentEdgeManager.currentTimeMillis(); - long totalSize = 0; - TaskBatch batch = new TaskBatch(); + final AtomicLong totalSize = new AtomicLong(0); + final AtomicInteger count = new AtomicInteger(0); + final TaskBatch batch = new TaskBatch(); Boolean isMetaRecovery = (filter == null) ? null : false; - for (FileStatus lf : logfiles) { - // TODO If the log file is still being written to - which is most likely - // the case for the last log file - then its length will show up here - // as zero. The size of such a file can only be retrieved after - // recover-lease is done. totalSize will be under in most cases and the - // metrics that it drives will also be under-reported. - totalSize += lf.getLen(); - String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf); - if (!enqueueSplitTask(pathToLog, batch)) { - throw new IOException("duplicate log split scheduled for " + lf.getPath()); + ExecutorService service = null; + List results = null; + final List exceptions = new ArrayList(); + if (logfiles.length > 0) { + service = Executors.newFixedThreadPool(this.maxThreads); + results = new ArrayList(logfiles.length); + } + try { + for (final FileStatus lf : logfiles) { + // TODO If the log file is still being written to - which is most likely + // the case for the last log file - then its length will show up here + // as zero. The size of such a file can only be retrieved after + // recover-lease is done. totalSize will be under in most cases and the + // metrics that it drives will also be under-reported. + Runnable runnable = new Runnable() { + @Override + public void run() { + count.getAndIncrement(); + totalSize.getAndAdd(lf.getLen()); + try { + FileSystem fs = lf.getPath().getFileSystem(conf); + if (fs instanceof DistributedFileSystem) { + ((FSHDFSUtils)FSUtils.getInstance(fs, conf)) + .recoverLease((DistributedFileSystem)fs, 0, lf.getPath(), 0L); + } + String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf); + if (!enqueueSplitTask(pathToLog, batch)) { + throw new IOException("duplicate log split scheduled for " + lf.getPath()); + } + } catch (IOException ioe) { + exceptions.add(ioe); + } + } + }; + results.add(service.submit(runnable)); } - } - waitForSplittingCompletion(batch, status); - // remove recovering regions from ZK - if (filter == MasterFileSystem.META_FILTER /* reference comparison */) { - // we split meta regions and user regions separately therefore logfiles are either all for - // meta or user regions but won't for both( we could have mixed situations in tests) - isMetaRecovery = true; - } - this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery); + if (results != null) { + for (Future f : results) { + try { + f.get(); + } catch (ExecutionException ee) { + throw new IOException(ee.getCause()); + } catch (InterruptedException ie) { + LOG.warn("Interrupted while waiting for log splits submission"); + Thread.currentThread().interrupt(); + } + } + } + if (!exceptions.isEmpty()) throw exceptions.get(0); + waitForSplittingCompletion(batch, status); + // remove recovering regions from ZK + if (filter == MasterFileSystem.META_FILTER /* reference comparison */) { + // we split meta regions and user regions separately therefore logfiles are either all for + // meta or user regions but won't for both( we could have mixed situations in tests) + isMetaRecovery = true; + } + this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery); - if (batch.done != batch.installed) { - batch.isDead = true; - SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet(); - LOG.warn("error while splitting logs in " + logDirs + - " installed = " + batch.installed + " but only " + batch.done + " done"); - String msg = "error or interrupted while splitting logs in " - + logDirs + " Task = " + batch; - status.abort(msg); - throw new IOException(msg); - } - for(Path logDir: logDirs){ - status.setStatus("Cleaning up log directory..."); - try { - if (fs.exists(logDir) && !fs.delete(logDir, false)) { - LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); + if (batch.done != batch.installed) { + batch.isDead = true; + SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet(); + LOG.warn("error while splitting logs in " + logDirs + + " installed = " + batch.installed + " but only " + batch.done + " done"); + String msg = "error or interrupted while splitting logs in " + + logDirs + " Task = " + batch; + status.abort(msg); + throw new IOException(msg); + } + for (Path logDir: logDirs){ + status.setStatus("Cleaning up log directory..."); + try { + if (fs.exists(logDir) && !fs.delete(logDir, false)) { + LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); + } + } catch (IOException ioe) { + FileStatus[] files = fs.listStatus(logDir); + if (files != null && files.length > 0) { + LOG.warn("returning success without actually splitting and " + + "deleting all the log files in path " + logDir); + } else { + LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); + } } - } catch (IOException ioe) { - FileStatus[] files = fs.listStatus(logDir); - if (files != null && files.length > 0) { - LOG.warn("returning success without actually splitting and " + - "deleting all the log files in path " + logDir); - } else { - LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); - } + SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet(); } - SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet(); + String msg = "finished splitting (more than or equal to) " + totalSize + + " bytes in " + batch.installed + " log files in " + logDirs + " in " + + (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms"; + status.markComplete(msg); + LOG.info(msg); + } finally { + if (service != null) service.shutdown(); } - String msg = "finished splitting (more than or equal to) " + totalSize + - " bytes in " + batch.installed + " log files in " + logDirs + " in " + - (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms"; - status.markComplete(msg); - LOG.info(msg); - return totalSize; + return totalSize.longValue(); } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (revision 1544881) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (working copy) @@ -100,9 +100,25 @@ // whether we need to look for isFileClosed method boolean findIsFileClosedMeth = true; boolean recovered = false; + if (findIsFileClosedMeth) { + try { + isFileClosedMeth = dfs.getClass().getMethod("isFileClosed", + new Class[]{ Path.class }); + } catch (NoSuchMethodException nsme) { + LOG.debug("isFileClosed not available"); + } finally { + findIsFileClosedMeth = false; + } + } + if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) { + return true; + } // We break the loop if we succeed the lease recovery, timeout, or we throw an exception. for (int nbAttempt = 0; !recovered; nbAttempt++) { - recovered = recoverLease(dfs, nbAttempt, p, startWaiting); + // The very first attempt was made by SplitLogManager + if (nbAttempt != 0) { + recovered = recoverLease(dfs, nbAttempt, p, startWaiting); + } if (recovered) break; checkIfCancelled(reporter); if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) break; @@ -117,16 +133,6 @@ while ((EnvironmentEdgeManager.currentTimeMillis() - localStartWaiting) < subsequentPause) { Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000)); - if (findIsFileClosedMeth) { - try { - isFileClosedMeth = dfs.getClass().getMethod("isFileClosed", - new Class[]{ Path.class }); - } catch (NoSuchMethodException nsme) { - LOG.debug("isFileClosed not available"); - } finally { - findIsFileClosedMeth = false; - } - } if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) { recovered = true; break; @@ -164,7 +170,7 @@ * @return True if dfs#recoverLease came by true. * @throws FileNotFoundException */ - boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p, + public boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p, final long startWaiting) throws FileNotFoundException { boolean recovered = false;