diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index c8f6e11..79a48ba 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -301,6 +301,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { if (this.catalogTracker != null) this.catalogTracker.stop(); if (this.serverManager != null) this.serverManager.stop(); if (this.assignmentManager != null) this.assignmentManager.stop(); + if (this.fileSystemManager != null) this.fileSystemManager.stop(); HConnectionManager.deleteConnection(this.conf, true); this.zooKeeper.close(); } diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index bbdd3d9..3c3c558 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; /** @@ -71,6 +72,8 @@ public class MasterFileSystem { private final Path rootdir; // create the split log lock final Lock splitLogLock = new ReentrantLock(); + final boolean distributedLogSplitting; + final SplitLogManager splitLogManager; public MasterFileSystem(Server master, MasterMetrics metrics) throws IOException { @@ -88,6 +91,15 @@ public class MasterFileSystem { String fsUri = this.fs.getUri().toString(); conf.set("fs.default.name", fsUri); conf.set("fs.defaultFS", fsUri); + this.distributedLogSplitting = + conf.getBoolean("hbase.master.distributed.log.splitting", true); + if (this.distributedLogSplitting) { + this.splitLogManager = new SplitLogManager(master.getZooKeeper(), + master.getConfiguration(), master, master.getServerName()); + this.splitLogManager.finishInitialization(); + } else { + this.splitLogManager = null; + } // setup the filesystem variable // set up the archived logs path this.oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME); @@ -198,29 +210,47 @@ public class MasterFileSystem { } public void splitLog(final String serverName) { - this.splitLogLock.lock(); long splitTime = 0, splitLogSize = 0; Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName)); - try { - HLogSplitter splitter = HLogSplitter.createLogSplitter( - conf, rootdir, logDir, oldLogDir, this.fs); + if (distributedLogSplitting) { + splitTime = EnvironmentEdgeManager.currentTimeMillis(); + try { + try { + splitLogSize = splitLogManager.splitLogDistributed(logDir); + } catch (OrphanHLogAfterSplitException e) { + LOG.warn("Retrying distributed splitting for " + + serverName + "because of:", e); + splitLogManager.splitLogDistributed(logDir); + } + } catch (IOException e) { + LOG.error("Failed distributed splitting " + serverName, e); + } + splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime; + } else { + // splitLogLock ensures that dead region servers' logs are processed + // one at a time + this.splitLogLock.lock(); + try { - splitter.splitLog(); - } catch (OrphanHLogAfterSplitException e) { - LOG.warn("Retrying splitting because of:", e); - // An HLogSplitter instance can only be used once. Get new instance. - splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir, - oldLogDir, this.fs); - splitter.splitLog(); + HLogSplitter splitter = HLogSplitter.createLogSplitter( + conf, rootdir, logDir, oldLogDir, this.fs); + try { + splitter.splitLog(); + } catch (OrphanHLogAfterSplitException e) { + LOG.warn("Retrying splitting because of:", e); + // An HLogSplitter instance can only be used once. Get new instance. + splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir, + oldLogDir, this.fs); + splitter.splitLog(); + } + splitTime = splitter.getTime(); + splitLogSize = splitter.getSize(); + } catch (IOException e) { + LOG.error("Failed splitting " + logDir.toString(), e); + } finally { + this.splitLogLock.unlock(); + } } - splitTime = splitter.getTime(); - splitLogSize = splitter.getSize(); - } catch (IOException e) { - checkFileSystem(); - LOG.error("Failed splitting " + logDir.toString(), e); - } finally { - this.splitLogLock.unlock(); - } if (this.metrics != null) { this.metrics.addSplit(splitTime, splitLogSize); } @@ -335,4 +365,10 @@ public class MasterFileSystem { new Path(rootdir, region.getTableDesc().getNameAsString()), region.getEncodedName(), familyName), true); } + + public void stop() { + if (splitLogManager != null) { + this.splitLogManager.stop(); + } + } } diff --git a/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java new file mode 100644 index 0000000..fb80525 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -0,0 +1,914 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; +import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.Stat; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState; + +/** + * Distributes the task of log splitting to the available region servers. + * Coordination happens via zookeeper. For every log file that has to be split a + * znode is created under /hbase/splitlog. SplitLogWorkers race to grab a task. + * + * SplitLogManager monitors the task znodes that it creates using the + * {@link #timeoutMonitor} thread. If a task's progress is slow then + * {@link #resubmit(String)} will take away the task from the owner + * {@link SplitLogWorker} and the task will be + * upforgrabs again. When the task is done then the task's znode is deleted by + * SplitLogManager. + * + * Clients call {@link #splitLogDistrubuted(String)} to split a region server's + * log files. The caller thread waits in this method until all the log files + * have been split. + * + * All the zookeeper calls made by this class are asynchronous. This is mainly + * to help reduce response time seen by the callers. + * + * There is race in this design between the SplitLogManager and the + * SplitLogWorker. SplitLogManager might re-queue a task that has in reality + * already been completed by a SplitLogWorker. We rely on the idempotency of + * the log splitting task for correctness. + * + * It is also assumed that every log splitting task is unique and once + * completed (either with success or with error) it will be not be submitted + * again. If a task is resubmitted then there is a risk that old "delete task" + * can delete the re-submission. + */ +public class SplitLogManager extends ZooKeeperListener { + private static final Log LOG = LogFactory.getLog(SplitLogManager.class); + + private final Stoppable stopper; + private final String serverName; + private final TaskFinisher taskFinisher; + private FileSystem fs; + private Configuration conf; + + private long zkretries; + private long resubmit_threshold; + private long timeout; + private long unassignedTimeout; + private long lastNodeCreateTime = Long.MAX_VALUE; + + private ConcurrentMap tasks = + new ConcurrentHashMap(); + private TimeoutMonitor timeoutMonitor; + + /** + * Its OK to construct this object even when region-servers are not online. It + * does lookup the orphan tasks in zk but it doesn't block for them to be + * done. + * + * @param zkw + * @param conf + * @param stopper + * @param serverName + * @param services + * @param service + */ + public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf, + Stoppable stopper, String serverName) { + this(zkw, conf, stopper, serverName, new TaskFinisher() { + @Override + public Status finish(String workerName, String logfile) { + String tmpname = + ZKSplitLog.getSplitLogDirTmpComponent(workerName, logfile); + try { + HLogSplitter.moveRecoveredEditsFromTemp(tmpname, logfile, conf); + } catch (IOException e) { + LOG.warn("Could not finish splitting of log file " + logfile); + return Status.ERR; + } + return Status.DONE; + } + }); + } + public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, + Stoppable stopper, String serverName, TaskFinisher tf) { + super(zkw); + this.taskFinisher = tf; + this.conf = conf; + this.stopper = stopper; + this.zkretries = conf.getLong("hbase.splitlog.zk.retries", + ZKSplitLog.DEFAULT_ZK_RETRIES); + this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", + ZKSplitLog.DEFAULT_MAX_RESUBMIT); + this.timeout = conf.getInt("hbase.splitlog.manager.timeout", + ZKSplitLog.DEFAULT_TIMEOUT); + this.unassignedTimeout = + conf.getInt("hbase.splitlog.manager.unassigned.timeout", + ZKSplitLog.DEFAULT_UNASSIGNED_TIMEOUT); + LOG.debug("timeout = " + timeout); + LOG.debug("unassigned timeout = " + unassignedTimeout); + + this.serverName = serverName; + this.timeoutMonitor = new TimeoutMonitor( + conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", + 1000), + stopper); + } + + public void finishInitialization() { + Threads.setDaemonThreadRunning(timeoutMonitor, serverName + + ".splitLogManagerTimeoutMonitor"); + this.watcher.registerListener(this); + lookForOrphans(); + } + + /** + * The caller will block until all the log files of the given region server + * have been processed - successfully split or an error is encountered - by an + * available worker region server. This method must only be called after the + * region servers have been brought online. + * + * @param serverName + * region server name + * @throws IOException + * if there was an error while splitting any log file + * @return cumulative size of the logfiles split + */ + public long splitLogDistributed(final Path logDir) throws IOException { + this.fs = logDir.getFileSystem(conf); + if (!fs.exists(logDir)) { + LOG.warn(logDir + " doesn't exist. Nothing to do!"); + return 0; + } + FileStatus[] logfiles = fs.listStatus(logDir); // XXX filter filenames? + if (logfiles == null || logfiles.length == 0) { + LOG.info(logDir + " is empty dir, no logs to split"); + return 0; + } + tot_mgr_log_split_batch_start.incrementAndGet(); + LOG.info("started splitting logs in " + logDir); + long t = EnvironmentEdgeManager.currentTimeMillis(); + long totalSize = 0; + TaskBatch batch = new TaskBatch(); + for (FileStatus lf : logfiles) { + // XXX If the log file is still being written to - which is most likely + // the case for the last log file - then its length will show up here + // as zero. The size of such a file can only be retrieved after after + // recover-lease is done. totalSize will be under in most cases and the + // metrics that it drives will also be under-reported. + totalSize += lf.getLen(); + if (installTask(lf.getPath().toString(), batch) == false) { + throw new IOException("duplicate log split scheduled for " + + lf.getPath()); + } + } + waitTasks(batch); + if (batch.done != batch.installed) { + stopTrackingTasks(batch); + tot_mgr_log_split_batch_err.incrementAndGet(); + LOG.warn("error while splitting logs in " + logDir + + " installed = " + batch.installed + " but only " + batch.done + " done"); + throw new IOException("error or interrupt while splitting logs in " + + logDir + " Task = " + batch); + } + if (anyNewLogFiles(logDir, logfiles)) { + tot_mgr_new_unexpected_hlogs.incrementAndGet(); + LOG.warn("new hlogs were produced while logs in " + logDir + + " were being split"); + throw new OrphanHLogAfterSplitException(); + } + tot_mgr_log_split_batch_success.incrementAndGet(); + if (!fs.delete(logDir, true)) { + throw new IOException("Unable to delete src dir: " + logDir); + } + LOG.info("finished splitting (more than or equal to) " + totalSize + + " bytes in " + batch.installed + " log files in " + logDir + " in " + + (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms"); + return totalSize; + } + + boolean installTask(String taskname, TaskBatch batch) { + tot_mgr_log_split_start.incrementAndGet(); + String path = ZKSplitLog.getNodeName(watcher, taskname); + Task oldtask = createTaskIfAbsent(path, batch); + if (oldtask == null) { + // publish the task in zk + createNode(path, zkretries); + return true; + } + assert false; + LOG.warn(path + "is already being split. " + + "Two threads cannot wait for the same task"); + return false; + } + + private void waitTasks(TaskBatch batch) { + synchronized (batch) { + while ((batch.done + batch.error) != batch.installed) { + try { + batch.wait(100); + if (stopper.isStopped()) { + LOG.warn("Stopped while waiting for log splits to be completed"); + return; + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for log splits to be completed"); + Thread.currentThread().interrupt(); + return; + } + } + } + } + + private void setDone(String path, boolean err) { + if (!ZKSplitLog.isRescanNode(watcher, path)) { + if (!err) { + tot_mgr_log_split_success.incrementAndGet(); + LOG.info("Done splitting " + path); + } else { + tot_mgr_log_split_err.incrementAndGet(); + LOG.warn("Error splitting " + path); + } + } + Task task = tasks.get(path); + if (task == null) { + if (!ZKSplitLog.isRescanNode(watcher, path)) { + tot_mgr_unacquired_orphan_done.incrementAndGet(); + LOG.debug("unacquired orphan task is done " + path); + } + } else { + // if in stopTrackingTasks() we were to make tasks orphan instead of + // forgetting about them then we will have to handle the race when + // accessing task.batch here. + if (!task.isOrphan()) { + synchronized (task.batch) { + if (!err) { + task.batch.done++; + } else { + task.batch.error++; + } + if ((task.batch.done + task.batch.error) == task.batch.installed) { + task.batch.notify(); + } + } + } + task.deleted = true; + } + // delete the task node in zk. Keep trying indefinitely - its an async + // call and no one is blocked waiting for this node to be deleted. All + // task names are unique (log.) there is no risk of deleting + // a future task. + deleteNode(path, Long.MAX_VALUE); + return; + } + + private void createNode(String path, Long retry_count) { + ZKUtil.asyncCreate(this.watcher, path, + TaskState.TASK_UNASSIGNED.get(serverName), new CreateAsyncCallback(), + retry_count); + tot_mgr_node_create_queued.incrementAndGet(); + return; + } + + private void createNodeSuccess(String path) { + lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis(); + LOG.debug("put up splitlog task at znode " + path); + getDataSetWatch(path, zkretries); + } + + private void createNodeFailure(String path) { + // XXX the Manger should split the log locally instead of giving up + LOG.warn("failed to create task node" + path); + setDone(path, true); + } + + + private void getDataSetWatch(String path, Long retry_count) { + this.watcher.getZooKeeper().getData(path, this.watcher, + new GetDataAsyncCallback(), retry_count); + tot_mgr_get_data_queued.incrementAndGet(); + } + + private void getDataSetWatchSuccess(String path, byte[] data, int version) { + if (data == null) { + tot_mgr_null_data.incrementAndGet(); + LOG.fatal("logic error - got null data " + path); + setDone(path, true); + return; + } + // LOG.debug("set watch on " + path + " got data " + new String(data)); + if (TaskState.TASK_UNASSIGNED.equals(data)) { + LOG.debug("task not yet acquired " + path + " ver = " + version); + handleUnassignedTask(path); + } else if (TaskState.TASK_OWNED.equals(data)) { + registerHeartbeat(path, version, + TaskState.TASK_OWNED.getWriterName(data)); + } else if (TaskState.TASK_RESIGNED.equals(data)) { + LOG.info("task " + path + " entered state " + new String(data)); + resubmit(path, true); + } else if (TaskState.TASK_DONE.equals(data)) { + LOG.info("task " + path + " entered state " + new String(data)); + if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) { + if (taskFinisher.finish(TaskState.TASK_DONE.getWriterName(data), + ZKSplitLog.getFileName(path)) == Status.DONE) { + setDone(path, false); // success + } else { + resubmit(path, false); // err + } + } else { + setDone(path, false); // success + } + } else if (TaskState.TASK_ERR.equals(data)) { + LOG.info("task " + path + " entered state " + new String(data)); + resubmit(path, false); + } else { + LOG.fatal("logic error - unexpected zk state for path = " + path + + " data = " + new String(data)); + setDone(path, true); + } + } + + private void getDataSetWatchFailure(String path) { + LOG.warn("failed to set data watch " + path); + setDone(path, true); + } + + /** + * It is possible for a task to stay in UNASSIGNED state indefinitely - say + * SplitLogManager wants to resubmit a task. It forces the task to UNASSIGNED + * state but it dies before it could create the RESCAN task node to signal + * the SplitLogWorkers to pick up the task. To prevent this scenario the + * SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup. + * + * @param path + */ + private void handleUnassignedTask(String path) { + if (ZKSplitLog.isRescanNode(watcher, path)) { + return; + } + Task task = findOrCreateOrphanTask(path); + if (task.isOrphan() && (task.incarnation == 0)) { + LOG.info("resubmitting unassigned orphan task " + path); + // ignore failure to resubmit. The timeout-monitor will handle it later + // albeit in a more crude fashion + resubmit(path, task, true); + } + } + + private void registerHeartbeat(String path, int new_version, + String workerName) { + Task task = findOrCreateOrphanTask(path); + if (new_version != task.last_version) { + if (task.isUnassigned()) { + LOG.info("task " + path + " acquired by " + workerName); + } + // very noisy + //LOG.debug("heartbeat for " + path + " last_version=" + task.last_version + + // " last_update=" + task.last_update + " new_version=" + + // new_version); + task.last_update = EnvironmentEdgeManager.currentTimeMillis(); + task.last_version = new_version; + tot_mgr_heartbeat.incrementAndGet(); + } else { + assert false; + LOG.warn("got dup heartbeat for " + path + " ver = " + new_version); + } + return; + } + + private boolean resubmit(String path, Task task, boolean force) { + // its ok if this thread misses the update to task.deleted. It will + // fail later + if (task.deleted) { + return false; + } + int version; + if (!force) { + if ((EnvironmentEdgeManager.currentTimeMillis() - task.last_update) < + timeout) { + return false; + } + if (task.unforcedResubmits >= resubmit_threshold) { + if (task.unforcedResubmits == resubmit_threshold) { + tot_mgr_resubmit_threshold_reached.incrementAndGet(); + LOG.info("Skipping resubmissions of task " + path + + " because threshold " + resubmit_threshold + " reached"); + } + return false; + } + // race with registerHeartBeat that might be changing last_version + version = task.last_version; + } else { + version = -1; + } + LOG.info("resubmitting task " + path); + task.incarnation++; + try { + // blocking zk call but this is done from the timeout thread + if (ZKUtil.setData(this.watcher, path, + TaskState.TASK_UNASSIGNED.get(serverName), + version) == false) { + LOG.debug("failed to resubmit task " + path + + " version changed"); + return false; + } + } catch (NoNodeException e) { + LOG.debug("failed to resubmit " + path + " task done"); + return false; + } catch (KeeperException e) { + tot_mgr_resubmit_failed.incrementAndGet(); + LOG.warn("failed to resubmit " + path, e); + return false; + } + // don't count forced resubmits + if (!force) { + task.unforcedResubmits++; + } + task.setUnassigned(); + createRescanNode(Long.MAX_VALUE); + tot_mgr_resubmit.incrementAndGet(); + return true; + } + + private void resubmit(String path, boolean force) { + if (resubmit(path, findOrCreateOrphanTask(path), force) == false) { + setDone(path, true); // error + } + } + + private void deleteNode(String path, Long retries) { + tot_mgr_node_delete_queued.incrementAndGet(); + this.watcher.getZooKeeper().delete(path, -1, new DeleteAsyncCallback(), + retries); + } + + private void deleteNodeSuccess(String path) { + Task task; + task = tasks.remove(path); + if (task == null) { + if (ZKSplitLog.isRescanNode(watcher, path)) { + tot_mgr_rescan_deleted.incrementAndGet(); + } + tot_mgr_missing_state_in_delete.incrementAndGet(); + LOG.debug("deleted task without in memory state " + path); + return; + } + tot_mgr_task_deleted.incrementAndGet(); + } + + private void deleteNodeFailure(String path) { + LOG.fatal("logic failure, failing to delete a node should never happen " + + "because delete has infinite retries"); + return; + } + + /** + * signal the workers that a task was resubmitted by creating the + * RESCAN node. + */ + private void createRescanNode(long retries) { + watcher.getZooKeeper().create(ZKSplitLog.getRescanNode(watcher), + TaskState.TASK_UNASSIGNED.get(serverName), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL, + new CreateRescanAsyncCallback(), new Long(retries)); + } + + private void createRescanSuccess(String path) { + tot_mgr_rescan.incrementAndGet(); + getDataSetWatch(path, zkretries); + } + + private void createRescanFailure() { + LOG.fatal("logic failure, rescan failure must not happen"); + } + + /** + * @param path + * @param batch + * @return null on success, existing task on error + */ + private Task createTaskIfAbsent(String path, TaskBatch batch) { + Task oldtask; + oldtask = tasks.putIfAbsent(path, new Task(batch)); + if (oldtask != null && oldtask.isOrphan()) { + LOG.info("Previously orphan task " + path + + " is now being waited upon"); + oldtask.setBatch(batch); + return (null); + } + return oldtask; + } + + /** + * This function removes any knowledge of this batch's tasks from the + * manager. It doesn't actually stop the active tasks. If the tasks are + * resubmitted then the active tasks will be reacquired and monitored by the + * manager. It is important to call this function when batch processing + * terminates prematurely, otherwise if the tasks are re-submitted + * then they might fail. + *

+ * there is a slight race here. even after a task has been removed from + * {@link #tasks} someone who had acquired a reference to it will continue to + * process the task. That is OK since we don't actually change the task and + * the batch objects. + *

+ * XXX Its probably better to convert these to orphan tasks but then we + * have to deal with race conditions as we nullify Task's batch pointer etc. + *

+ * @param batch + */ + void stopTrackingTasks(TaskBatch batch) { + for (Map.Entry e : tasks.entrySet()) { + String path = e.getKey(); + Task t = e.getValue(); + if (t.batch == batch) { // == is correct. equals not necessary. + tasks.remove(path); + } + } + } + + Task findOrCreateOrphanTask(String path) { + Task orphanTask = new Task(null); + Task task; + task = tasks.putIfAbsent(path, orphanTask); + if (task == null) { + LOG.info("creating orphan task " + path); + tot_mgr_orphan_task_acquired.incrementAndGet(); + task = orphanTask; + } + return task; + } + + @Override + public void nodeDataChanged(String path) { + if (tasks.get(path) != null || ZKSplitLog.isRescanNode(watcher, path)) { + getDataSetWatch(path, zkretries); + } + } + + public void stop() { + if (timeoutMonitor != null) { + timeoutMonitor.interrupt(); + } + } + + private void lookForOrphans() { + List orphans; + try { + orphans = ZKUtil.listChildrenNoWatch(this.watcher, + this.watcher.splitLogZNode); + if (orphans == null) { + LOG.warn("could not get children of " + this.watcher.splitLogZNode); + return; + } + } catch (KeeperException e) { + LOG.warn("could not get children of " + this.watcher.splitLogZNode + + " " + StringUtils.stringifyException(e)); + return; + } + LOG.info("found " + orphans.size() + " orphan tasks"); + for (String path : orphans) { + LOG.info("found orphan task " + path); + getDataSetWatch(ZKSplitLog.getNodeName(watcher, path), zkretries); + } + } + + /** + * Keeps track of the batch of tasks submitted together by a caller in + * splitLogDistributed(). Clients threads use this object to wait for all + * their tasks to be done. + *

+ * All access is synchronized. + */ + static class TaskBatch { + int installed; + int done; + int error; + + @Override + public String toString() { + return ("installed = " + installed + " done = " + done + " error = " + + error); + } + } + + /** + * in memory state of an active task. + */ + static class Task { + long last_update; + int last_version; + TaskBatch batch; + boolean deleted; + int incarnation; + int unforcedResubmits; + + @Override + public String toString() { + return ("last_update = " + last_update + + " last_version = " + last_version + + " deleted = " + deleted + + " incarnation = " + incarnation + + " resubmits = " + unforcedResubmits + + " batch = " + batch); + } + + Task(TaskBatch tb) { + incarnation = 0; + last_version = -1; + deleted = false; + setBatch(tb); + setUnassigned(); + } + + public void setBatch(TaskBatch batch) { + if (batch != null && this.batch != null) { + LOG.fatal("logic error - batch being overwritten"); + } + this.batch = batch; + if (batch != null) { + batch.installed++; + } + } + + public boolean isOrphan() { + return (batch == null); + } + + public boolean isUnassigned() { + return (last_update == -1); + } + + public void setUnassigned() { + last_update = -1; + } + } + + /** + * Periodically checks all active tasks and resubmits the ones that have timed + * out + */ + private class TimeoutMonitor extends Chore { + public TimeoutMonitor(final int period, Stoppable stopper) { + super("SplitLogManager Timeout Monitor", period, stopper); + } + + @Override + protected void chore() { + int resubmitted = 0; + int unassigned = 0; + int tot = 0; + boolean found_assigned_task = false; + + for (Map.Entry e : tasks.entrySet()) { + String path = e.getKey(); + Task task = e.getValue(); + tot++; + // don't easily resubmit a task which hasn't been picked up yet. It + // might be a long while before a SplitLogWorker is free to pick up a + // task. This is because a SplitLogWorker picks up a task one at a + // time. If we want progress when there are no region servers then we + // will have to run a SplitLogWorker thread in the Master. + if (task.isUnassigned()) { + unassigned++; + continue; + } + found_assigned_task = true; + if (resubmit(path, task, false)) { + resubmitted++; + } + } + if (tot > 0) { + LOG.debug("total tasks = " + tot + " unassigned = " + unassigned); + } + if (resubmitted > 0) { + LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks"); + } + // If there are pending tasks and all of them have been unassigned for + // some time then put up a RESCAN node to ping the workers. + // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes + // because a. it is very unlikely that every worker had a + // transient error when trying to grab the task b. if there are no + // workers then all tasks wills stay unassigned indefinitely and the + // manager will be indefinitely creating RESCAN nodes. XXX may be the + // master should spawn both a manager and a worker thread to guarantee + // that there is always one worker in the system + if (tot > 0 && !found_assigned_task && + ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) > + unassignedTimeout)) { + createRescanNode(Long.MAX_VALUE); + tot_mgr_resubmit_unassigned.incrementAndGet(); + LOG.debug("resubmitting unassigned task(s) after timeout"); + } + } + } + + class CreateAsyncCallback implements AsyncCallback.StringCallback { + private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + tot_mgr_node_create_result.incrementAndGet(); + if (rc != 0) { + if (rc == KeeperException.Code.NODEEXISTS.intValue()) { + LOG.debug("found pre-existing znode " + path); + tot_mgr_node_already_exists.incrementAndGet(); + } else { + LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for "+ path); + Long retry_count = (Long)ctx; + if (retry_count == 0) { + tot_mgr_node_create_err.incrementAndGet(); + createNodeFailure(path); + } else { + tot_mgr_node_create_retry.incrementAndGet(); + createNode(path, retry_count - 1); + } + return; + } + } + createNodeSuccess(path); + } + } + + class GetDataAsyncCallback implements AsyncCallback.DataCallback { + private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, + Stat stat) { + tot_mgr_get_data_result.incrementAndGet(); + if (rc != 0) { + LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " "+ path); + Long retry_count = (Long) ctx; + if (retry_count == 0) { + tot_mgr_get_data_err.incrementAndGet(); + getDataSetWatchFailure(path); + } else { + tot_mgr_get_data_retry.incrementAndGet(); + getDataSetWatch(path, retry_count - 1); + } + return; + } + getDataSetWatchSuccess(path, data, stat.getVersion()); + return; + } + } + + class DeleteAsyncCallback implements AsyncCallback.VoidCallback { + private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx) { + tot_mgr_node_delete_result.incrementAndGet(); + if (rc != 0) { + if (rc != KeeperException.Code.NONODE.intValue()) { + tot_mgr_node_delete_err.incrementAndGet(); + LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path); + Long retry_count = (Long) ctx; + if (retry_count == 0) { + LOG.warn("delete failed " + path); + deleteNodeFailure(path); + } else { + deleteNode(path, retry_count - 1); + } + return; + } else { + LOG.debug(path + + " does not exist, either was never created or was deleted" + + " in earlier rounds, zkretries = " + (Long) ctx); + } + } else { + LOG.debug("deleted " + path); + } + deleteNodeSuccess(path); + } + } + + class CreateRescanAsyncCallback implements AsyncCallback.StringCallback { + private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + if (rc != 0) { + LOG.warn("rc =" + KeeperException.Code.get(rc) + " for "+ path); + Long retry_count = (Long)ctx; + if (retry_count == 0) { + createRescanFailure(); + } else { + createRescanNode(retry_count - 1); + } + return; + } + // path is the original arg, name is the actual name that was created + createRescanSuccess(name); + } + } + + /** + * checks whether any new files have appeared in logDir which were + * not present in the original logfiles set + * @param logdir + * @param logfiles + * @return True if a new log file is found + * @throws IOException + */ + public boolean anyNewLogFiles(Path logdir, FileStatus[] logfiles) + throws IOException { + if (logdir == null) { + return false; + } + LOG.debug("re-listing " + logdir); + tot_mgr_relist_logdir.incrementAndGet(); + FileStatus[] newfiles = fs.listStatus(logdir); + if (newfiles == null) { + return false; + } + boolean matched; + for (FileStatus newfile : newfiles) { + matched = false; + for (FileStatus origfile : logfiles) { + if (origfile.equals(newfile)) { + matched = true; + break; + } + } + if (matched == false) { + LOG.warn("Discovered orphan hlog " + newfile + " after split." + + " Maybe HRegionServer was not dead when we started"); + return true; + } + } + return false; + } + + /** + * {@link SplitLogManager} can use objects implementing this interface to + * finish off a partially done task by {@link SplitLogWorker}. This provides + * a serialization point at the end of the task processing. + */ + static public interface TaskFinisher { + /** + * status that can be returned finish() + */ + static public enum Status { + /** + * task completed successfully + */ + DONE(), + /** + * task completed with error + */ + ERR(); + } + /** + * finish the partially done task. workername provides clue to where the + * partial results of the partially done tasks are present. taskname is the + * name of the task that was put up in zookeeper. + *

+ * @param workerName + * @param taskname + * @return DONE if task completed successfully, ERR otherwise + */ + public Status finish(String workerName, String taskname); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 85b4144..a830522 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -258,6 +258,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Cluster Status Tracker private ClusterStatusTracker clusterStatusTracker; + // Log Splitting Worker + private SplitLogWorker splitLogWorker; + // A sleeper that sleeps for msgInterval. private final Sleeper sleeper; @@ -377,7 +380,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, qosMap.put(m.getName(), p.priority()); } } - + annotatedQos = qosMap; } @@ -397,7 +400,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, Invocation inv = (Invocation) from; String methodName = inv.getMethodName(); - + Integer priorityByAnnotation = annotatedQos.get(methodName); if (priorityByAnnotation != null) { return priorityByAnnotation; @@ -510,6 +513,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); + + // Create the log splitting worker and start it + this.splitLogWorker = new SplitLogWorker(this.zooKeeper, + this.getConfiguration(), this.getServerName()); + splitLogWorker.start(); } /** @@ -636,6 +644,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } this.leases.closeAfterLeasesExpire(); this.server.stop(); + if (this.splitLogWorker != null) { + splitLogWorker.stop(); + } if (this.infoServer != null) { LOG.info("Stopping infoServer"); try { @@ -2810,6 +2821,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return this.zooKeeper; } + // // Main program and support routines // diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java new file mode 100644 index 0000000..164768a --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -0,0 +1,553 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.StringUtils; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +/** + * This worker is spawned in every regionserver (should we also spawn one in + * the master?). The Worker waits for log splitting tasks to be put up by the + * {@link SplitLogManager} running in the master and races with other workers + * in other serves to acquire those tasks. The coordination is done via + * zookeeper. All the action takes place at /hbase/splitlog znode. + *

+ * If a worker has successfully moved the task from state UNASSIGNED to + * OWNED then it owns the task. It keeps heart beating the manager by + * periodically moving the task from OWNED to OWNED state. On success it + * moves the task to SUCCESS. On unrecoverable error it moves task state to + * ERR. If it cannot continue but wants the master to retry the task then it + * moves the task state to RESIGNED. + *

+ * The manager can take a task away from a worker by moving the task from + * OWNED to UNASSIGNED. In the absence of a global lock there is a + * unavoidable race here - a worker might have just finished its task when it + * is stripped of its ownership. Here we rely on the idempotency of the log + * splitting task for correctness + */ +public class SplitLogWorker extends ZooKeeperListener implements Runnable { + private static final Log LOG = LogFactory.getLog(SplitLogWorker.class); + + Thread worker; + private final String serverName; + private final TaskExecutor executor; + private long zkretries; + + private Object taskReadyLock = new Object(); + volatile int taskReadySeq = 0; + private volatile String currentTask = null; + private int currentVersion; + private volatile boolean exitWorker; + private Object grabTaskLock = new Object(); + private boolean workerInGrabTask = false; + + + public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, + String serverName, TaskExecutor executor) { + super(watcher); + this.serverName = serverName; + this.executor = executor; + this.zkretries = conf.getLong("hbase.splitlog.zk.retries", 3); + } + + public SplitLogWorker(ZooKeeperWatcher watcher, final Configuration conf, + final String serverName) { + this(watcher, conf, serverName, new TaskExecutor () { + @Override + public Status exec(String filename, CancelableProgressable p) { + Path rootdir; + FileSystem fs; + try { + rootdir = FSUtils.getRootDir(conf); + fs = rootdir.getFileSystem(conf); + } catch (IOException e) { + LOG.warn("could not find root dir or fs", e); + return Status.RESIGNED; + } + // XXX have to correctly figure out when log splitting has been + // interrupted or has encountered a transient error and when it has + // encountered a bad non-retry-able persistent error. + try { + String tmpname = + ZKSplitLog.getSplitLogDirTmpComponent(serverName, filename); + if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname, + fs.getFileStatus(new Path(filename)), fs, conf, p) == false) { + return Status.PREEMPTED; + } + } catch (InterruptedIOException iioe) { + LOG.warn("log splitting of " + filename + " interrupted, resigning", + iioe); + return Status.RESIGNED; + } catch (IOException e) { + Throwable cause = e.getCause(); + if (cause instanceof InterruptedException) { + LOG.warn("log splitting of " + filename + " interrupted, resigning", + e); + return Status.RESIGNED; + } + LOG.warn("log splitting of " + filename + " failed, returning error", + e); + return Status.ERR; + } + return Status.DONE; + } + }); + } + + @Override + public void run() { + LOG.info("SplitLogWorker starting"); + this.watcher.registerListener(this); + int res; + // wait for master to create the splitLogZnode + res = -1; + while (res == -1) { + try { + res = ZKUtil.checkExists(watcher, watcher.splitLogZNode); + } catch (KeeperException e) { + // ignore + LOG.warn("Exception when checking for " + watcher.splitLogZNode + + " ... retrying", e); + } + if (exitWorker) { + assert Thread.currentThread().isInterrupted(); + return; + } + if (res == -1) { + try { + LOG.info(watcher.splitLogZNode + " znode does not exist," + + " waiting for master to create one"); + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode); + assert exitWorker == true; + } + } + } + + taskLoop(); + + LOG.info("SplitLogWorker exiting"); + } + + /** + * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task + * one at a time. This policy puts an upper-limit on the number of + * simultaneous log splitting that could be happening in a cluster. + *

+ * Synchronization using {@link #task_ready_signal_seq} ensures that it will + * try to grab every task that has been put up + */ + private void taskLoop() { + while (true) { + int seq_start = taskReadySeq; + List paths = getTaskList(); + if (paths == null) { + LOG.warn("Could not get tasks, did someone remove " + + this.watcher.splitLogZNode + " ... worker thread exiting."); + return; + } + int offset = (int)(Math.random() * paths.size()); + for (int i = 0; i < paths.size(); i ++) { + int idx = (i + offset) % paths.size(); + // don't call ZKSplitLog.getNodeName() because that will lead to + // double encoding of the path name + grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx))); + if (exitWorker == true) { + return; + } + } + synchronized (taskReadyLock) { + while (seq_start == taskReadySeq) { + try { + taskReadyLock.wait(); + } catch (InterruptedException e) { + LOG.warn("SplitLogWorker inteurrpted while waiting for task," + + " exiting", e); + assert exitWorker == true; + return; + } + } + } + } + } + + /** + * try to grab a 'lock' on the task zk node to own and execute the task. + *

+ * @param path zk node for the task + */ + private void grabTask(String path) { + Stat stat = new Stat(); + long t = -1; + byte[] data; + synchronized (grabTaskLock) { + currentTask = path; + workerInGrabTask = true; + if (Thread.interrupted()) { + return; + } + } + try { + try { + if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) { + tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); + return; + } + } catch (KeeperException e) { + LOG.warn("Failed to get data for znode " + path, e); + tot_wkr_failed_to_grab_task_exception.incrementAndGet(); + return; + } + if (TaskState.TASK_UNASSIGNED.equals(data) == false) { + tot_wkr_failed_to_grab_task_owned.incrementAndGet(); + return; + } + + currentVersion = stat.getVersion(); + if (ownTask() == false) { + tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); + return; + } + + if (ZKSplitLog.isRescanNode(watcher, currentTask)) { + endTask(TaskState.TASK_DONE, tot_wkr_task_acquired_rescan); + return; + } + LOG.info("worker " + serverName + " acquired task " + path); + tot_wkr_task_acquired.incrementAndGet(); + getDataSetWatchAsync(); + + t = System.currentTimeMillis(); + TaskExecutor.Status status; + + status = executor.exec(ZKSplitLog.getFileName(currentTask), + new CancelableProgressable() { + + @Override + public boolean progress() { + if (ownTask() == false) { + LOG.warn("Failed to heartbeat the task" + currentTask); + return false; + } + return true; + } + }); + switch (status) { + case DONE: + endTask(TaskState.TASK_DONE, tot_wkr_task_done); + break; + case PREEMPTED: + tot_wkr_preempt_task.incrementAndGet(); + LOG.warn("task execution prempted " + path); + break; + case ERR: + if (!exitWorker) { + endTask(TaskState.TASK_ERR, tot_wkr_task_err); + break; + } + // if the RS is exiting then there is probably a tons of stuff + // that can go wrong. Resign instead of signaling error. + //$FALL-THROUGH$ + case RESIGNED: + if (exitWorker) { + tot_wkr_task_resigned.incrementAndGet(); + LOG.info("task execution interrupted because worker is exiting " + + path); + endTask(TaskState.TASK_RESIGNED, tot_wkr_task_resigned); + } else { + tot_wkr_preempt_task.incrementAndGet(); + LOG.info("task execution interrupted via zk by manager " + + path); + } + break; + } + } finally { + if (t > 0) { + LOG.info("worker " + serverName + " done with task " + path + + " in " + (System.currentTimeMillis() - t) + "ms"); + } + synchronized (grabTaskLock) { + workerInGrabTask = false; + // clear the interrupt from stopTask() otherwise the next task will + // suffer + Thread.interrupted(); + } + } + return; + } + + /** + * Try to own the task by transitioning the zk node data from UNASSIGNED to + * OWNED. + *

+ * This method is also used to periodically heartbeat the task progress by + * transitioning the node from OWNED to OWNED. + *

+ * @return true if task path is successfully locked + */ + private boolean ownTask() { + try { + Stat stat = this.watcher.getZooKeeper().setData(currentTask, + TaskState.TASK_OWNED.get(serverName), currentVersion); + if (stat == null) { + return (false); + } + currentVersion = stat.getVersion(); + if (LOG.isDebugEnabled()) { + LOG.debug ("hearbeat for path " + currentTask + + " successful, version = " + currentVersion); + } + tot_wkr_task_heartbeat.incrementAndGet(); + return (true); + } catch (KeeperException e) { + // either Bad Version or Node has been removed + LOG.warn("failed to assert ownership for " + currentTask, e); + } catch (InterruptedException e1) { + LOG.warn("Interrupted while trying to assert ownership of " + + currentTask + " " + StringUtils.stringifyException(e1)); + Thread.currentThread().interrupt(); + } + tot_wkr_task_heartbeat_failed.incrementAndGet(); + return (false); + } + + /** + * endTask() can fail and the only way to recover out of it is for the + * {@link SplitLogManager} to timeout the task node. + * @param ts + * @param ctr + */ + private void endTask(ZKSplitLog.TaskState ts, AtomicLong ctr) { + String path = currentTask; + currentTask = null; + try { + if (ZKUtil.setData(this.watcher, path, ts.get(serverName), + currentVersion)) { + LOG.info("successfully transitioned task " + path + + " to final state " + ts); + ctr.incrementAndGet(); + return; + } + LOG.warn("failed to transistion task " + path + " to end state " + ts + + " because of version mismatch "); + } catch (KeeperException.BadVersionException bve) { + LOG.warn("transisition task " + path + " to " + ts + + " failed because of version mismatch", bve); + } catch (KeeperException.NoNodeException e) { + LOG.fatal("logic error - end task " + path + " " + ts + + " failed because task doesn't exist", e); + } catch (KeeperException e) { + LOG.warn("failed to end task, " + path + " " + ts, e); + } + tot_wkr_final_transistion_failed.incrementAndGet(); + return; + } + + void getDataSetWatchAsync() { + this.watcher.getZooKeeper().getData(currentTask, this.watcher, + new GetDataAsyncCallback(), null); + tot_wkr_get_data_queued.incrementAndGet(); + } + + void getDataSetWatchSuccess(String path, byte[] data) { + synchronized (grabTaskLock) { + if (workerInGrabTask) { + // currentTask can change but that's ok + String taskpath = currentTask; + if (taskpath != null && taskpath.equals(path)) { + // have to compare data. cannot compare version because then there + // will be race with ownTask() + // cannot just check whether the node has been transitioned to + // UNASSIGNED because by the time this worker sets the data watch + // the node might have made two transitions - from owned by this + // worker to unassigned to owned by another worker + // XXX is comparing against serverName OK? don't we need encoded + // region-server-name? + if (! TaskState.TASK_OWNED.equals(data, serverName) && + ! TaskState.TASK_DONE.equals(data, serverName) && + ! TaskState.TASK_ERR.equals(data, serverName) && + ! TaskState.TASK_RESIGNED.equals(data, serverName)) { + LOG.info("task " + taskpath + " preempted from server " + + serverName + " ... current task state and owner - " + + new String(data)); + stopTask(); + } + } + } + } + } + + void getDataSetWatchFailure(String path) { + synchronized (grabTaskLock) { + if (workerInGrabTask) { + // currentTask can change but that's ok + String taskpath = currentTask; + if (taskpath != null && taskpath.equals(path)) { + LOG.info("retrying data watch on " + path); + tot_wkr_get_data_retry.incrementAndGet(); + getDataSetWatchAsync(); + } else { + // no point setting a watch on the task which this worker is not + // working upon anymore + } + } + } + } + + + + + @Override + public void nodeDataChanged(String path) { + // there will be a self generated dataChanged event every time ownTask() + // heartbeats the task znode by upping its version + synchronized (grabTaskLock) { + if (workerInGrabTask) { + // currentTask can change + String taskpath = currentTask; + if (taskpath!= null && taskpath.equals(path)) { + getDataSetWatchAsync(); + } + } + } + } + + + private List getTaskList() { + for (int i = 0; i < zkretries; i++) { + try { + return (ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, + this.watcher.splitLogZNode)); + } catch (KeeperException e) { + LOG.warn("Could not get children of znode " + + this.watcher.splitLogZNode, e); + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + LOG.warn("Interrupted while trying to get task list ...", e1); + Thread.currentThread().interrupt(); + return null; + } + } + } + LOG.warn("Tried " + zkretries + " times, still couldn't fetch " + + "children of " + watcher.splitLogZNode + " giving up"); + return null; + } + + + @Override + public void nodeChildrenChanged(String path) { + if(path.equals(watcher.splitLogZNode)) { + LOG.debug("tasks arrived or departed"); + synchronized (taskReadyLock) { + taskReadySeq++; + taskReadyLock.notify(); + } + } + } + + /** + * If the worker is doing a task i.e. splitting a log file then stop the task. + * It doesn't exit the worker thread. + */ + void stopTask() { + LOG.info("Sending interrupt to stop the worker thread"); + worker.interrupt(); + } + + + /** + * start the SplitLogWorker thread + */ + public void start() { + worker = new Thread(null, this, "SplitLogWorker-" + serverName); + exitWorker = false; + worker.start(); + return; + } + + /** + * stop the SplitLogWorker thread + */ + public void stop() { + exitWorker = true; + stopTask(); + } + + class GetDataAsyncCallback implements AsyncCallback.DataCallback { + private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); + + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, + Stat stat) { + tot_wkr_get_data_result.incrementAndGet(); + if (rc != 0) { + LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); + getDataSetWatchFailure(path); + return; + } + getDataSetWatchSuccess(path, data); + return; + } + } + + /** + * Objects implementing this interface actually do the task that has been + * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight + * guarantee that two workers will not be executing the same task therefore it + * is better to have workers prepare the task and then have the + * {@link SplitLogManager} commit the work in + * {@link SplitLogManager.TaskFinisher} + */ + static public interface TaskExecutor { + static public enum Status { + DONE(), + ERR(), + RESIGNED(), + PREEMPTED(); + } + public Status exec(String name, CancelableProgressable p); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 1435598..411fdd3 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.text.ParseException; @@ -44,6 +45,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.RemoteExceptionHandler; @@ -52,8 +54,12 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -73,7 +79,7 @@ public class HLogSplitter { */ public static final String RECOVERED_EDITS = "recovered.edits"; - + static final Log LOG = LogFactory.getLog(HLogSplitter.class); private boolean hasSplit = false; @@ -87,7 +93,7 @@ public class HLogSplitter { protected final Path oldLogDir; protected final FileSystem fs; protected final Configuration conf; - + // Major subcomponents of the split process. // These are separated into inner classes to make testing easier. OutputSink outputSink; @@ -106,12 +112,13 @@ public class HLogSplitter { * Create a new HLogSplitter using the given {@link Configuration} and the * hbase.hlog.splitter.impl property to derived the instance * class to use. - * + *

* @param conf * @param rootDir hbase directory * @param srcDir logs directory * @param oldLogDir directory where processed logs are archived to * @param fs FileSystem + * @return New HLogSplitter instance */ public static HLogSplitter createLogSplitter(Configuration conf, final Path rootDir, final Path srcDir, @@ -151,18 +158,18 @@ public class HLogSplitter { this.srcDir = srcDir; this.oldLogDir = oldLogDir; this.fs = fs; - + entryBuffers = new EntryBuffers( conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128*1024*1024)); outputSink = new OutputSink(); } - + /** * Split up a bunch of regionserver commit log files that are no longer being * written to, into new files, one per region for region to replay on startup. * Delete the old log files when finished. - * + * * @throws IOException will throw if corrupted hlogs aren't tolerated * @return the list of splits */ @@ -172,7 +179,7 @@ public class HLogSplitter { "An HLogSplitter instance may only be used once"); hasSplit = true; - long startTime = System.currentTimeMillis(); + long startTime = EnvironmentEdgeManager.currentTimeMillis(); List splits = null; if (!fs.exists(srcDir)) { // Nothing to do @@ -186,20 +193,20 @@ public class HLogSplitter { LOG.info("Splitting " + logfiles.length + " hlog(s) in " + srcDir.toString()); splits = splitLog(logfiles); - - splitTime = System.currentTimeMillis() - startTime; + + splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; LOG.info("hlog file splitting completed in " + splitTime + " ms for " + srcDir.toString()); return splits; } - + /** * @return time that this split took */ public long getTime() { return this.splitTime; } - + /** * @return aggregate size of hlogs that were split */ @@ -215,12 +222,12 @@ public class HLogSplitter { Preconditions.checkState(hasSplit); return outputSink.getOutputCounts(); } - + /** * Splits the HLog edits in the given list of logfiles (that are a mix of edits * on multiple regions) by region and then splits them per region directories, * in batches of (hbase.hlog.split.batch.size) - * + *

* This process is split into multiple threads. In the main thread, we loop * through the logs to be split. For each log, we: *

    @@ -228,13 +235,13 @@ public class HLogSplitter { *
  • Read each edit (see {@link #parseHLog}
  • *
  • Mark as "processed" or "corrupt" depending on outcome
  • *
- * + *

* Each edit is passed into the EntryBuffers instance, which takes care of * memory accounting and splitting the edits by region. - * + *

* The OutputSink object then manages N other WriterThreads which pull chunks * of edits from EntryBuffers and write them to the output region directories. - * + *

* After the process is complete, the log files are archived to a separate * directory. */ @@ -248,7 +255,7 @@ public class HLogSplitter { splitSize = 0; outputSink.startWriterThreads(entryBuffers); - + try { int i = 0; for (FileStatus log : logfiles) { @@ -257,36 +264,24 @@ public class HLogSplitter { splitSize += logLength; LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length + ": " + logPath + ", length=" + logLength); + Reader in; try { - recoverFileLease(fs, logPath, conf); - parseHLog(log, entryBuffers, fs, conf); - processedLogs.add(logPath); - } catch (EOFException eof) { - // truncated files are expected if a RS crashes (see HBASE-2643) - LOG.info("EOF from hlog " + logPath + ". Continuing"); - processedLogs.add(logPath); - } catch (FileNotFoundException fnfe) { - // A file may be missing if the region server was able to archive it - // before shutting down. This means the edits were persisted already - LOG.info("A log was missing " + logPath + - ", probably because it was moved by the" + - " now dead region server. Continuing"); - processedLogs.add(logPath); - } catch (IOException e) { - // If the IOE resulted from bad file format, - // then this problem is idempotent and retrying won't help - if (e.getCause() instanceof ParseException) { - LOG.warn("Parse exception from hlog " + logPath + ". continuing", e); - processedLogs.add(logPath); - } else { - if (skipErrors) { - LOG.info("Got while parsing hlog " + logPath + - ". Marking as corrupted", e); - corruptedLogs.add(logPath); - } else { - throw e; + in = getReader(fs, log, conf, skipErrors); + if (in != null) { + parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors); + try { + in.close(); + } catch (IOException e) { + LOG.warn("Close log reader threw exception -- continuing", + e); } } + processedLogs.add(logPath); + } catch (CorruptedLogFileException e) { + LOG.info("Got while parsing hlog " + logPath + + ". Marking as corrupted", e); + corruptedLogs.add(logPath); + continue; } } if (fs.listStatus(srcDir).length > processedLogs.size() @@ -295,7 +290,7 @@ public class HLogSplitter { "Discovered orphan hlog after split. Maybe the " + "HRegionServer was not dead when we started"); } - archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf); + archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf); } finally { splits = outputSink.finishWritingAndClose(); } @@ -303,10 +298,212 @@ public class HLogSplitter { } /** + * Splits a HLog file into a temporary staging area. tmpname is used to build + * the name of the staging area where the recovered-edits will be separated + * out by region and stored. + *

+ * If the log file has N regions then N recovered.edits files will be + * produced. There is no buffering in this code. Instead it relies on the + * buffering in the SequenceFileWriter. + *

+ * @param rootDir + * @param tmpname + * @param logfile + * @param fs + * @param conf + * @param reporter + * @return false if it is interrupted by the progress-able. + * @throws IOException + */ + static public boolean splitLogFileToTemp(Path rootDir, String tmpname, + FileStatus logfile, FileSystem fs, + Configuration conf, CancelableProgressable reporter) throws IOException { + HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */, + fs); + return s.splitLogFileToTemp(logfile, tmpname, reporter); + } + + public boolean splitLogFileToTemp(FileStatus logfile, String tmpname, + CancelableProgressable reporter) throws IOException { + final Map logWriters = Collections. + synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR)); + boolean isCorrupted = false; + + Object BAD_WRITER = new Object(); + + boolean progress_failed = false; + + boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false); + int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); + // How often to send a progress report (default 1/2 master timeout) + int period = conf.getInt("hbase.splitlog.report.period", + conf.getInt("hbase.splitlog.manager.timeout", + ZKSplitLog.DEFAULT_TIMEOUT) / 2); + Path logPath = logfile.getPath(); + long logLength = logfile.getLen(); + LOG.info("Splitting hlog: " + logPath + ", length=" + logLength); + Reader in = null; + try { + in = getReader(fs, logfile, conf, skipErrors); + } catch (CorruptedLogFileException e) { + ZKSplitLog.markCorrupted(rootDir, tmpname, fs); + isCorrupted = true; + } + if (in == null) { + LOG.warn("Nothing to split in log file " + logPath); + return true; + } + long t = EnvironmentEdgeManager.currentTimeMillis(); + long last_report_at = t; + if (reporter != null && reporter.progress() == false) { + return false; + } + int editsCount = 0; + Entry entry; + try { + while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) { + byte[] region = entry.getKey().getEncodedRegionName(); + Object o = logWriters.get(region); + if (o == BAD_WRITER) { + continue; + } + WriterAndPath wap = (WriterAndPath)o; + if (wap == null) { + wap = createWAP(region, entry, rootDir, tmpname, fs, conf); + if (wap == null) { + logWriters.put(region, BAD_WRITER); + } else { + logWriters.put(region, wap); + } + } + wap.w.append(entry); + editsCount++; + if (editsCount % interval == 0) { + long t1 = EnvironmentEdgeManager.currentTimeMillis(); + if ((t1 - last_report_at) > period) { + last_report_at = t; + if (reporter != null && reporter.progress() == false) { + progress_failed = true; + return false; + } + } + } + } + } catch (CorruptedLogFileException e) { + ZKSplitLog.markCorrupted(rootDir, tmpname, fs); + isCorrupted = true; + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + throw e; + } finally { + int n = 0; + for (Object o : logWriters.values()) { + long t1 = EnvironmentEdgeManager.currentTimeMillis(); + if ((t1 - last_report_at) > period) { + last_report_at = t; + if ((progress_failed == false) && (reporter != null) && + (reporter.progress() == false)) { + progress_failed = true; + } + } + if (o == BAD_WRITER) { + continue; + } + n++; + WriterAndPath wap = (WriterAndPath)o; + wap.w.close(); + LOG.debug("Closed " + wap.p); + } + LOG.info("processed " + editsCount + " edits across " + n + " regions" + + " threw away edits for " + (logWriters.size() - n) + " regions" + + " log file = " + logPath + + " is corrupted = " + isCorrupted); + } + return true; + } + + /** + * Completes the work done by splitLogFileToTemp by moving the + * recovered.edits from the staging area to the respective region server's + * directories. + *

+ * It is invoked by SplitLogManager once it knows that one of the + * SplitLogWorkers have completed the splitLogFileToTemp() part. If the + * master crashes then this function might get called multiple times. + *

+ * @param tmpname + * @param conf + * @throws IOException + */ + public static void moveRecoveredEditsFromTemp(String tmpname, + String logfile, Configuration conf) + throws IOException{ + Path rootdir = FSUtils.getRootDir(conf); + Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME); + moveRecoveredEditsFromTemp(tmpname, rootdir, oldLogDir, logfile, conf); + } + + public static void moveRecoveredEditsFromTemp(String tmpname, + Path rootdir, Path oldLogDir, + String logfile, Configuration conf) + throws IOException{ + List processedLogs = new ArrayList(); + List corruptedLogs = new ArrayList(); + FileSystem fs; + fs = rootdir.getFileSystem(conf); + Path logPath = new Path(logfile); + if (ZKSplitLog.isCorrupted(rootdir, tmpname, fs)) { + corruptedLogs.add(logPath); + } else { + processedLogs.add(logPath); + } + Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, tmpname); + List files = listAll(fs, stagingDir); + for (FileStatus f : files) { + Path src = f.getPath(); + Path dst = ZKSplitLog.stripSplitLogTempDir(rootdir, src); + if (ZKSplitLog.isCorruptFlagFile(dst)) { + continue; + } + if (fs.exists(dst)) { + fs.delete(dst, false); + } else { + Path dstdir = dst.getParent(); + if (!fs.exists(dstdir)) { + if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir); + } + } + fs.rename(src, dst); + LOG.debug(" moved " + src + " => " + dst); + } + archiveLogs(null, corruptedLogs, processedLogs, + oldLogDir, fs, conf); + fs.delete(stagingDir, true); + return; + } + + private static List listAll(FileSystem fs, Path dir) + throws IOException { + List fset = new ArrayList(100); + FileStatus [] files = fs.listStatus(dir); + if (files != null) { + for (FileStatus f : files) { + if (f.isDir()) { + fset.addAll(listAll(fs, f.getPath())); + } else { + fset.add(f); + } + } + } + return fset; + } + + + /** * Moves processed logs to a oldLogDir after successful processing Moves * corrupted logs (any log that couldn't be successfully parsed to corruptDir * (.corrupt) for later investigation - * + * * @param corruptedLogs * @param processedLogs * @param oldLogDir @@ -329,7 +526,7 @@ public class HLogSplitter { for (Path corrupted : corruptedLogs) { Path p = new Path(corruptDir, corrupted.getName()); - if (!fs.rename(corrupted, p)) { + if (!fs.rename(corrupted, p)) { LOG.info("Unable to move corrupted log " + corrupted + " to " + p); } else { LOG.info("Moving corrupted log " + corrupted + " to " + p); @@ -344,8 +541,8 @@ public class HLogSplitter { LOG.info("Archived processed log " + p + " to " + newPath); } } - - if (!fs.delete(srcDir, true)) { + + if (srcDir != null && !fs.delete(srcDir, true)) { throw new IOException("Unable to delete src dir: " + srcDir); } } @@ -363,19 +560,21 @@ public class HLogSplitter { * @throws IOException */ static Path getRegionSplitEditsPath(final FileSystem fs, - final Entry logEntry, final Path rootDir) throws IOException { + final Entry logEntry, final Path rootDir, boolean isCreate) + throws IOException { Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey() .getTablename()); Path regiondir = HRegion.getRegionDir(tableDir, Bytes.toString(logEntry.getKey().getEncodedRegionName())); + Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir); + if (!fs.exists(regiondir)) { LOG.info("This region's directory doesn't exist: " + regiondir.toString() + ". It is very likely that it was" + " already split so it's safe to discard those edits."); return null; } - Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir); - if (!fs.exists(dir)) { + if (isCreate && !fs.exists(dir)) { if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir); } return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey() @@ -385,7 +584,7 @@ public class HLogSplitter { static String formatRecoveredEditsFileName(final long seqid) { return String.format("%019d", seqid); } - + /* * Parse a single hlog and put the edits in @splitLogsMap * @@ -394,61 +593,115 @@ public class HLogSplitter { * list of edits as values * @param fs the filesystem * @param conf the configuration - * @throws IOException if hlog is corrupted, or can't be open + * @throws IOException + * @throws CorruptedLogFileException if hlog is corrupted */ - private void parseHLog(final FileStatus logfile, + private void parseHLog(final Reader in, Path path, EntryBuffers entryBuffers, final FileSystem fs, - final Configuration conf) - throws IOException { - // Check for possibly empty file. With appends, currently Hadoop reports a - // zero length even if the file has been sync'd. Revisit if HDFS-376 or - // HDFS-878 is committed. - long length = logfile.getLen(); - if (length <= 0) { - LOG.warn("File " + logfile.getPath() + " might be still open, length is 0"); - } - Path path = logfile.getPath(); - Reader in; + final Configuration conf, boolean skipErrors) + throws IOException, CorruptedLogFileException { int editsCount = 0; try { - in = getReader(fs, path, conf); - } catch (EOFException e) { - if (length <= 0) { - //TODO should we ignore an empty, not-last log file if skip.errors is false? - //Either way, the caller should decide what to do. E.g. ignore if this is the last - //log in sequence. - //TODO is this scenario still possible if the log has been recovered (i.e. closed) - LOG.warn("Could not open " + path + " for reading. File is empty" + e); - return; - } else { - throw e; - } - } - try { Entry entry; - while ((entry = in.next()) != null) { + while ((entry = getNextLogLine(in, path, skipErrors)) != null) { entryBuffers.appendEntry(entry); editsCount++; } } catch (InterruptedException ie) { - throw new RuntimeException(ie); + IOException t = new InterruptedIOException(); + t.initCause(ie); + throw t; } finally { LOG.debug("Pushed=" + editsCount + " entries from " + path); + } + } + + /** + * Create a new {@link Reader} for reading logs to split. + * + * @param fs + * @param file + * @param conf + * @return A new Reader instance + * @throws IOException + * @throws CorruptedLogFile + */ + protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf, + boolean skipErrors) + throws IOException, CorruptedLogFileException { + Path path = file.getPath(); + long length = file.getLen(); + Reader in; + + + // Check for possibly empty file. With appends, currently Hadoop reports a + // zero length even if the file has been sync'd. Revisit if HDFS-376 or + // HDFS-878 is committed. + if (length <= 0) { + LOG.warn("File " + path + " might be still open, length is 0"); + } + + try { + recoverFileLease(fs, path, conf); try { - if (in != null) { - in.close(); + in = getReader(fs, path, conf); + } catch (EOFException e) { + if (length <= 0) { + // TODO should we ignore an empty, not-last log file if skip.errors + // is false? Either way, the caller should decide what to do. E.g. + // ignore if this is the last log in sequence. + // TODO is this scenario still possible if the log has been + // recovered (i.e. closed) + LOG.warn("Could not open " + path + " for reading. File is empty", e); + return null; + } else { + // EOFException being ignored + return null; } - } catch (IOException e) { - LOG.warn("Close log reader in finally threw exception -- continuing", - e); } + } catch (IOException e) { + if (!skipErrors) { + throw e; + } + LOG.warn("skipErrors=true Could not open hlog " + path + " ignoring", e); + CorruptedLogFileException t = new CorruptedLogFileException(); + t.initCause(e); + throw t; } + return in; } + static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) + throws CorruptedLogFileException, IOException { + try { + return in.next(); + } catch (EOFException eof) { + // truncated files are expected if a RS crashes (see HBASE-2643) + LOG.info("EOF from hlog " + path + ". continuing"); + return null; + } catch (IOException e) { + // If the IOE resulted from bad file format, + // then this problem is idempotent and retrying won't help + if (e.getCause() instanceof ParseException) { + LOG.warn("ParseException from hlog " + path + ". continuing"); + return null; + } + if (!skipErrors) { + throw e; + } + LOG.warn("skipErrors=true Ignoring exception while parsing hlog " + + path + ". Marking as corrupted", e); + CorruptedLogFileException t = new CorruptedLogFileException(); + t.initCause(e); + throw t; + } + } + + private void writerThreadError(Throwable t) { thrown.compareAndSet(null, t); } - + /** * Check for errors in the writer threads. If any is found, rethrow it. */ @@ -477,26 +730,25 @@ public class HLogSplitter { return HLog.getReader(fs, curLogFile, conf); } - /** * Class which accumulates edits and separates them into a buffer per region * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses * a predefined threshold. - * + * * Writer threads then pull region-specific buffers from this class. */ class EntryBuffers { Map buffers = new TreeMap(Bytes.BYTES_COMPARATOR); - + /* Track which regions are currently in the middle of writing. We don't allow an IO thread to pick up bytes from a region if we're already writing - data for that region in a different IO thread. */ + data for that region in a different IO thread. */ Set currentlyWriting = new TreeSet(Bytes.BYTES_COMPARATOR); long totalBuffered = 0; long maxHeapUsage; - + EntryBuffers(long maxHeapUsage) { this.maxHeapUsage = maxHeapUsage; } @@ -504,13 +756,13 @@ public class HLogSplitter { /** * Append a log entry into the corresponding region buffer. * Blocks if the total heap usage has crossed the specified threshold. - * + * * @throws InterruptedException - * @throws IOException + * @throws IOException */ void appendEntry(Entry entry) throws InterruptedException, IOException { HLogKey key = entry.getKey(); - + RegionEntryBuffer buffer; synchronized (this) { buffer = buffers.get(key.getEncodedRegionName()); @@ -566,7 +818,7 @@ public class HLogSplitter { dataAvailable.notifyAll(); } } - + synchronized boolean isRegionCurrentlyWriting(byte[] region) { return currentlyWriting.contains(region); } @@ -614,11 +866,11 @@ public class HLogSplitter { class WriterThread extends Thread { private volatile boolean shouldStop = false; - + WriterThread(int i) { super("WriterThread-" + i); } - + public void run() { try { doRun(); @@ -627,7 +879,7 @@ public class HLogSplitter { writerThreadError(t); } } - + private void doRun() throws IOException { LOG.debug("Writer thread " + this + ": starting"); while (true) { @@ -646,7 +898,7 @@ public class HLogSplitter { } continue; } - + assert buffer != null; try { writeBuffer(buffer); @@ -655,16 +907,16 @@ public class HLogSplitter { } } } - + private void writeBuffer(RegionEntryBuffer buffer) throws IOException { - List entries = buffer.entryBuffer; + List entries = buffer.entryBuffer; if (entries.isEmpty()) { LOG.warn(this.getName() + " got an empty buffer, skipping"); return; } WriterAndPath wap = null; - + long startTime = System.nanoTime(); try { int editsCount = 0; @@ -690,12 +942,74 @@ public class HLogSplitter { throw e; } } - + void finish() { shouldStop = true; } } + private WriterAndPath createWAP(byte[] region, Entry entry, + Path rootdir, String tmpname, FileSystem fs, Configuration conf) + throws IOException { + Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, + tmpname==null); + if (regionedits == null) { + return null; + } + if ((tmpname == null) && fs.exists(regionedits)) { + LOG.warn("Found existing old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + + regionedits + ", length=" + + fs.getFileStatus(regionedits).getLen()); + if (!fs.delete(regionedits, false)) { + LOG.warn("Failed delete of old " + regionedits); + } + } + Path editsfile; + if (tmpname != null) { + // During distributed log splitting the output by each + // SplitLogWorker is written to a temporary area. + editsfile = convertRegionEditsToTemp(rootdir, regionedits, tmpname); + } else { + editsfile = regionedits; + } + Writer w = createWriter(fs, editsfile, conf); + LOG.debug("Creating writer path=" + editsfile + " region=" + + Bytes.toStringBinary(region)); + return (new WriterAndPath(editsfile, w)); + } + + Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) { + List components = new ArrayList(10); + do { + components.add(edits.getName()); + edits = edits.getParent(); + } while (edits.depth() > rootdir.depth()); + Path ret = ZKSplitLog.getSplitLogDir(rootdir, tmpname); + for (int i = components.size() - 1; i >= 0; i--) { + ret = new Path(ret, components.get(i)); + } + try { + if (fs.exists(ret)) { + LOG.warn("Found existing old temporary edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + + ret + ", length=" + + fs.getFileStatus(ret).getLen()); + if (!fs.delete(ret, false)) { + LOG.warn("Failed delete of old " + ret); + } + } + Path dir = ret.getParent(); + if (!fs.exists(dir)) { + if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir); + } + } catch (IOException e) { + LOG.warn("Could not prepare temp staging area ", e); + // ignore, exceptions will be thrown elsewhere + } + return ret; + } + /** * Class that manages the output streams from the log splitting process. */ @@ -703,13 +1017,13 @@ public class HLogSplitter { private final Map logWriters = Collections.synchronizedMap( new TreeMap(Bytes.BYTES_COMPARATOR)); private final List writerThreads = Lists.newArrayList(); - + /* Set of regions which we've decided should not output edits */ private final Set blacklistedRegions = Collections.synchronizedSet( new TreeSet(Bytes.BYTES_COMPARATOR)); - - private boolean hasClosed = false; - + + private boolean hasClosed = false; + /** * Start the threads that will pump data from the entryBuffers * to the output files. @@ -730,7 +1044,7 @@ public class HLogSplitter { writerThreads.add(t); } } - + List finishWritingAndClose() throws IOException { LOG.info("Waiting for split writer threads to finish"); for (WriterThread t : writerThreads) { @@ -745,7 +1059,7 @@ public class HLogSplitter { checkForErrors(); } LOG.info("Split writers finished"); - + return closeStreams(); } @@ -755,10 +1069,10 @@ public class HLogSplitter { */ private List closeStreams() throws IOException { Preconditions.checkState(!hasClosed); - + List paths = new ArrayList(); List thrown = Lists.newArrayList(); - + for (WriterAndPath wap : logWriters.values()) { try { wap.w.close(); @@ -774,67 +1088,40 @@ public class HLogSplitter { if (!thrown.isEmpty()) { throw MultipleIOException.createIOException(thrown); } - + hasClosed = true; return paths; } /** * Get a writer and path for a log starting at the given entry. - * + * * This function is threadsafe so long as multiple threads are always * acting on different regions. - * + * * @return null if this region shouldn't output any logs */ WriterAndPath getWriterAndPath(Entry entry) throws IOException { - byte region[] = entry.getKey().getEncodedRegionName(); WriterAndPath ret = logWriters.get(region); if (ret != null) { return ret; } - // If we already decided that this region doesn't get any output // we don't need to check again. if (blacklistedRegions.contains(region)) { return null; } - - // Need to create writer - Path regionedits = getRegionSplitEditsPath(fs, - entry, rootDir); - if (regionedits == null) { - // Edits dir doesn't exist + ret = createWAP(region, entry, rootDir, null, fs, conf); + if (ret == null) { blacklistedRegions.add(region); return null; } - deletePreexistingOldEdits(regionedits); - Writer w = createWriter(fs, regionedits, conf); - ret = new WriterAndPath(regionedits, w); logWriters.put(region, ret); - LOG.debug("Creating writer path=" + regionedits + " region=" - + Bytes.toStringBinary(region)); - return ret; } /** - * If the specified path exists, issue a warning and delete it. - */ - private void deletePreexistingOldEdits(Path regionedits) throws IOException { - if (fs.exists(regionedits)) { - LOG.warn("Found existing old edits file. It could be the " - + "result of a previous failed split attempt. Deleting " - + regionedits + ", length=" - + fs.getFileStatus(regionedits).getLen()); - if (!fs.delete(regionedits, false)) { - LOG.warn("Failed delete of old " + regionedits); - } - } - } - - /** * @return a map from encoded region ID to the number of edits written out * for that region. */ @@ -850,6 +1137,8 @@ public class HLogSplitter { } } + + /** * Private data structure that wraps a Writer and its Path, * also collecting statistics about the data written to this @@ -877,4 +1166,8 @@ public class HLogSplitter { nanosSpent += nanos; } } + + static class CorruptedLogFileException extends Exception { + private static final long serialVersionUID = 1L; + } } diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java new file mode 100644 index 0000000..a45e5e1 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -0,0 +1,265 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.lang.reflect.Field; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.util.Bytes; + +public class ZKSplitLog { + private static final Log LOG = LogFactory.getLog(ZKSplitLog.class); + + public static final int DEFAULT_TIMEOUT = 25000; // 25 sec + public static final int DEFAULT_ZK_RETRIES = 3; + public static final int DEFAULT_MAX_RESUBMIT = 3; + public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min + + /** + * Gets the full path node name for the log file being split + * @param zkw zk reference + * @param filename log file name (only the basename) + */ + public static String getNodeName(ZooKeeperWatcher zkw, String filename) { + return ZKUtil.joinZNode(zkw.splitLogZNode, encode(filename)); + } + + public static String getFileName(String node) { + String basename = node.substring(node.lastIndexOf('/') + 1); + return decode(basename); + } + + + public static String encode(String s) { + try { + return URLEncoder.encode(s, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("URLENCODER doesn't support UTF-8"); + } + } + + public static String decode(String s) { + try { + return URLDecoder.decode(s, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("URLDecoder doesn't support UTF-8"); + } + } + + public static String getRescanNode(ZooKeeperWatcher zkw) { + return getNodeName(zkw, "RESCAN"); + } + + public static boolean isRescanNode(ZooKeeperWatcher zkw, String path) { + String prefix = getRescanNode(zkw); + if (path.length() <= prefix.length()) { + return false; + } + for (int i = 0; i < prefix.length(); i++) { + if (prefix.charAt(i) != path.charAt(i)) { + return false; + } + } + return true; + } + + public static boolean isTaskPath(ZooKeeperWatcher zkw, String path) { + String dirname = path.substring(0, path.lastIndexOf('/')); + return dirname.equals(zkw.splitLogZNode); + } + + public static enum TaskState { + TASK_UNASSIGNED("unassigned"), + TASK_OWNED("owned"), + TASK_RESIGNED("resigned"), + TASK_DONE("done"), + TASK_ERR("err"); + + private final byte[] state; + private TaskState(String s) { + state = s.getBytes(); + } + + public byte[] get(String serverName) { + return (Bytes.add(state, " ".getBytes(), serverName.getBytes())); + } + + public String getWriterName(byte[] data) { + String str = Bytes.toString(data); + return str.substring(str.indexOf(' ') + 1); + } + + + /** + * @param s + * @return True if {@link #state} is a prefix of s. False otherwise. + */ + public boolean equals(byte[] s) { + if (s.length < state.length) { + return (false); + } + for (int i = 0; i < state.length; i++) { + if (state[i] != s[i]) { + return (false); + } + } + return (true); + } + + public boolean equals(byte[] s, String serverName) { + return (Arrays.equals(s, get(serverName))); + } + @Override + public String toString() { + return new String(state); + } + } + + public static Path getSplitLogDir(Path rootdir, String tmpname) { + return new Path(new Path(rootdir, "splitlog"), tmpname); + } + + public static Path stripSplitLogTempDir(Path rootdir, Path file) { + int skipDepth = rootdir.depth() + 2; + List components = new ArrayList(10); + do { + components.add(file.getName()); + file = file.getParent(); + } while (file.depth() > skipDepth); + Path ret = rootdir; + for (int i = components.size() - 1; i >= 0; i--) { + ret = new Path(ret, components.get(i)); + } + return ret; + } + + public static String getSplitLogDirTmpComponent(String worker, String file) { + return (worker + "_" + ZKSplitLog.encode(file)); + } + + public static void markCorrupted(Path rootdir, String tmpname, + FileSystem fs) { + Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt"); + try { + fs.createNewFile(file); + } catch (IOException e) { + LOG.warn("Could not flag a log file as corrupted. Failed to create " + + file, e); + } + } + + public static boolean isCorrupted(Path rootdir, String tmpname, + FileSystem fs) throws IOException { + Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt"); + boolean isCorrupt; + isCorrupt = fs.exists(file); + return isCorrupt; + } + + public static boolean isCorruptFlagFile(Path file) { + return file.getName().equals("corrupt"); + } + + + public static class Counters { + //SplitLogManager counters + public static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0); + public static AtomicLong tot_mgr_log_split_batch_success = + new AtomicLong(0); + public static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0); + public static AtomicLong tot_mgr_new_unexpected_hlogs = new AtomicLong(0); + public static AtomicLong tot_mgr_log_split_start = new AtomicLong(0); + public static AtomicLong tot_mgr_log_split_success = new AtomicLong(0); + public static AtomicLong tot_mgr_log_split_err = new AtomicLong(0); + public static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0); + public static AtomicLong tot_mgr_node_create_result = new AtomicLong(0); + public static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0); + public static AtomicLong tot_mgr_node_create_err = new AtomicLong(0); + public static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0); + public static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0); + public static AtomicLong tot_mgr_get_data_result = new AtomicLong(0); + public static AtomicLong tot_mgr_get_data_err = new AtomicLong(0); + public static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0); + public static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0); + public static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0); + public static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0); + public static AtomicLong tot_mgr_resubmit = new AtomicLong(0); + public static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0); + public static AtomicLong tot_mgr_null_data = new AtomicLong(0); + public static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0); + public static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0); + public static AtomicLong tot_mgr_resubmit_threshold_reached = + new AtomicLong(0); + public static AtomicLong tot_mgr_missing_state_in_delete = + new AtomicLong(0); + public static AtomicLong tot_mgr_heartbeat = new AtomicLong(0); + public static AtomicLong tot_mgr_rescan = new AtomicLong(0); + public static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0); + public static AtomicLong tot_mgr_task_deleted = new AtomicLong(0); + public static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0); + public static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0); + + + + // SplitLogWorker counters + public static AtomicLong tot_wkr_failed_to_grab_task_no_data = + new AtomicLong(0); + public static AtomicLong tot_wkr_failed_to_grab_task_exception = + new AtomicLong(0); + public static AtomicLong tot_wkr_failed_to_grab_task_owned = + new AtomicLong(0); + public static AtomicLong tot_wkr_failed_to_grab_task_lost_race = + new AtomicLong(0); + public static AtomicLong tot_wkr_task_acquired = new AtomicLong(0); + public static AtomicLong tot_wkr_task_resigned = new AtomicLong(0); + public static AtomicLong tot_wkr_task_done = new AtomicLong(0); + public static AtomicLong tot_wkr_task_err = new AtomicLong(0); + public static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0); + public static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0); + public static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0); + public static AtomicLong tot_wkr_get_data_result = new AtomicLong(0); + public static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0); + public static AtomicLong tot_wkr_preempt_task = new AtomicLong(0); + public static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0); + public static AtomicLong tot_wkr_final_transistion_failed = + new AtomicLong(0); + + public static void resetCounters() throws Exception { + Class cl = (new Counters()).getClass(); + Field[] flds = cl.getDeclaredFields(); + for (Field fld : flds) { + ((AtomicLong)fld.get(null)).set(0); + } + } + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 827073c..6c80670 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -575,7 +575,7 @@ public class ZKUtil { * * @param zkw zk reference * @param znode path of node - * @param stat node status to set if node exists + * @param stat node status to get if node exists * @return data of the specified znode, or null if node does not exist * @throws KeeperException if unexpected zookeeper exception */ @@ -583,7 +583,7 @@ public class ZKUtil { Stat stat) throws KeeperException { try { - byte [] data = zkw.getZooKeeper().getData(znode, zkw, stat); + byte [] data = zkw.getZooKeeper().getData(znode, null, stat); logRetrievedMsg(zkw, znode, data, false); return data; } catch (KeeperException.NoNodeException e) { @@ -874,13 +874,10 @@ public class ZKUtil { * @param data data of node to create * @param cb * @param ctx - * @throws KeeperException if unexpected zookeeper exception - * @throws KeeperException.NodeExistsException if node already exists */ public static void asyncCreate(ZooKeeperWatcher zkw, String znode, byte [] data, final AsyncCallback.StringCallback cb, - final Object ctx) - throws KeeperException, KeeperException.NodeExistsException { + final Object ctx) { zkw.getZooKeeper().create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, cb, ctx); } diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 2380c66..dc471c4 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -89,6 +89,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable { public String tableZNode; // znode containing the unique cluster ID public String clusterIdZNode; + // znode used for log splitting work assignment + public String splitLogZNode; private final Configuration conf; @@ -165,6 +167,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable { ZKUtil.createAndFailSilent(this, assignmentZNode); ZKUtil.createAndFailSilent(this, rsZNode); ZKUtil.createAndFailSilent(this, tableZNode); + ZKUtil.createAndFailSilent(this, splitLogZNode); } catch (KeeperException e) { throw new ZooKeeperConnectionException( prefix("Unexpected KeeperException creating base node"), e); @@ -210,6 +213,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable { conf.get("zookeeper.znode.tableEnableDisable", "table")); clusterIdZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.clusterId", "hbaseid")); + splitLogZNode = ZKUtil.joinZNode(baseZNode, + conf.get("zookeeper.znode.splitlog", "splitlog")); } /** @@ -247,7 +252,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable { /** * Method called from ZooKeeper for events and connection status. - * + *

* Valid events are passed along to listeners. Connection status changes * are dealt with locally. */ @@ -302,12 +307,12 @@ public class ZooKeeperWatcher implements Watcher, Abortable { /** * Called when there is a connection-related event via the Watcher callback. - * + *

* If Disconnected or Expired, this should shutdown the cluster. But, since * we send a KeeperException.SessionExpiredException along with the abort * call, it's possible for the Abortable to catch it and try to create a new * session with ZooKeeper. This is what the client does in HCM. - * + *

* @param event */ private void connectionEvent(WatchedEvent event) { @@ -376,11 +381,11 @@ public class ZooKeeperWatcher implements Watcher, Abortable { /** * Handles KeeperExceptions in client calls. - * + *

* This may be temporary but for now this gives one place to deal with these. - * + *

* TODO: Currently this method rethrows the exception to let the caller handle - * + *

* @param ke * @throws KeeperException */ @@ -392,13 +397,13 @@ public class ZooKeeperWatcher implements Watcher, Abortable { /** * Handles InterruptedExceptions in client calls. - * + *

* This may be temporary but for now this gives one place to deal with these. - * + *

* TODO: Currently, this method does nothing. * Is this ever expected to happen? Do we abort or can we let it run? * Maybe this should be logged as WARN? It shouldn't happen? - * + *

* @param ie */ public void interruptedException(InterruptedException ie) { diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java new file mode 100644 index 0000000..4b2d8eb --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -0,0 +1,443 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.junit.Before; +import org.junit.After; +import org.junit.Test; + +public class TestDistributedLogSplitting { + private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); + static { + Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); + } + + // Start a cluster with 2 masters and 3 regionservers + final int NUM_MASTERS = 2; + final int NUM_RS = 6; + + MiniHBaseCluster cluster; + HMaster master; + Configuration conf; + HBaseTestingUtility TEST_UTIL; + + @Before + public void before() throws Exception { + + } + + private void startCluster(int num_rs) throws Exception{ + ZKSplitLog.Counters.resetCounters(); + LOG.info("Starting cluster"); + conf = HBaseConfiguration.create(); + conf.getLong("hbase.splitlog.max.resubmit", 0); + TEST_UTIL = new HBaseTestingUtility(conf); + TEST_UTIL.startMiniCluster(NUM_MASTERS, num_rs); + cluster = TEST_UTIL.getHBaseCluster(); + LOG.info("Waiting for active/ready master"); + cluster.waitForActiveAndReadyMaster(); + master = cluster.getMaster(); + } + + @After + public void after() throws Exception { + cluster.shutdown(); + } + + @Test + public void testThreeRSAbort() throws Exception { + LOG.info("testThreeRSAbort"); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_ROWS_PER_REGION = 100; + + startCluster(NUM_RS); + + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, + "distributed log splitting test", null); + + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + populateDataInTable(NUM_ROWS_PER_REGION, "family"); + + + List rsts = cluster.getLiveRegionServerThreads(); + assertEquals(NUM_RS, rsts.size()); + rsts.get(0).getRegionServer().abort("testing"); + rsts.get(1).getRegionServer().abort("testing"); + rsts.get(2).getRegionServer().abort("testing"); + + long start = EnvironmentEdgeManager.currentTimeMillis(); + while (cluster.getLiveRegionServerThreads().size() > (NUM_RS - 3)) { + if (EnvironmentEdgeManager.currentTimeMillis() - start > 30000) { + assertTrue(false); + } + Thread.sleep(200); + } + + start = EnvironmentEdgeManager.currentTimeMillis(); + while (getAllOnlineRegions(cluster).size() < (NUM_REGIONS_TO_CREATE + 2)) { + if (EnvironmentEdgeManager.currentTimeMillis() - start > 30000) { + assertTrue(false); + } + Thread.sleep(200); + } + + assertEquals(NUM_REGIONS_TO_CREATE * NUM_ROWS_PER_REGION, + TEST_UTIL.countRows(ht)); + } + + @Test(expected=OrphanHLogAfterSplitException.class) + public void testOrphanLogCreation() throws Exception { + LOG.info("testOrphanLogCreation"); + startCluster(NUM_RS); + final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; + final FileSystem fs = master.getMasterFileSystem().getFileSystem(); + + List rsts = cluster.getLiveRegionServerThreads(); + HRegionServer hrs = rsts.get(0).getRegionServer(); + Path rootdir = FSUtils.getRootDir(conf); + final Path logDir = new Path(rootdir, + HLog.getHLogDirectoryName(hrs.getServerName())); + + installTable(new ZooKeeperWatcher(conf, "table-creation", null), + "table", "family", 40); + + makeHLog(hrs.getWAL(), hrs.getOnlineRegions(), "table", + 1000, 100); + + new Thread() { + public void run() { + while (true) { + int i = 0; + try { + while(ZKSplitLog.Counters.tot_mgr_log_split_batch_start.get() == + 0) { + Thread.yield(); + } + fs.createNewFile(new Path(logDir, "foo" + i++)); + } catch (Exception e) { + LOG.debug("file creation failed", e); + return; + } + } + } + }.start(); + slm.splitLogDistributed(logDir); + FileStatus[] files = fs.listStatus(logDir); + if (files != null) { + for (FileStatus file : files) { + LOG.debug("file still there " + file.getPath()); + } + } + } + + @Test + public void testRecoveredEdits() throws Exception { + LOG.info("testRecoveredEdits"); + startCluster(NUM_RS); + final int NUM_LOG_LINES = 1000; + final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + FileSystem fs = master.getMasterFileSystem().getFileSystem(); + + List rsts = cluster.getLiveRegionServerThreads(); + HRegionServer hrs = rsts.get(0).getRegionServer(); + Path rootdir = FSUtils.getRootDir(conf); + final Path logDir = new Path(rootdir, + HLog.getHLogDirectoryName(hrs.getServerName())); + + installTable(new ZooKeeperWatcher(conf, "table-creation", null), + "table", "family", 40); + byte[] table = Bytes.toBytes("table"); + List regions = hrs.getOnlineRegions(); + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaRegion() || region.isRootRegion()) { + it.remove(); + } + } + makeHLog(hrs.getWAL(), regions, "table", + NUM_LOG_LINES, 100); + + slm.splitLogDistributed(logDir); + + int count = 0; + for (HRegionInfo hri : regions) { + + Path tdir = HTableDescriptor.getTableDir(rootdir, table); + Path editsdir = + HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, + hri.getEncodedName())); + LOG.debug("checking edits dir " + editsdir); + FileStatus[] files = fs.listStatus(editsdir); + assertEquals(1, files.length); + int c = countHLog(files[0].getPath(), fs, conf); + count += c; + LOG.info(c + " edits in " + files[0].getPath()); + } + assertEquals(NUM_LOG_LINES, count); + } + + @Test + public void testWorkerAbort() throws Exception { + LOG.info("testWorkerAbort"); + startCluster(1); + final int NUM_LOG_LINES = 10000; + final SplitLogManager slm = master.getMasterFileSystem().splitLogManager; + FileSystem fs = master.getMasterFileSystem().getFileSystem(); + + final List rsts = cluster.getLiveRegionServerThreads(); + HRegionServer hrs = rsts.get(0).getRegionServer(); + Path rootdir = FSUtils.getRootDir(conf); + final Path logDir = new Path(rootdir, + HLog.getHLogDirectoryName(hrs.getServerName())); + + installTable(new ZooKeeperWatcher(conf, "table-creation", null), + "table", "family", 40); + byte[] table = Bytes.toBytes("table"); + makeHLog(hrs.getWAL(), hrs.getOnlineRegions(), "table", + NUM_LOG_LINES, 100); + + new Thread() { + public void run() { + waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + for (RegionServerThread rst : rsts) { + rst.getRegionServer().abort("testing"); + } + } + }.start(); + // slm.splitLogDistributed(logDir); + FileStatus[] logfiles = fs.listStatus(logDir); + TaskBatch batch = new TaskBatch(); + slm.installTask(logfiles[0].getPath().toString(), batch); + //waitForCounter but for one of the 2 counters + long curt = System.currentTimeMillis(); + long endt = curt + 30000; + while (curt < endt) { + if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get()) == 0) { + Thread.yield(); + curt = System.currentTimeMillis(); + } else { + assertEquals(1, (tot_wkr_task_resigned.get() + tot_wkr_task_err.get())); + return; + } + } + assertEquals(1, batch.done); + fail("region server completed the split before aborting"); + return; + } + + HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, + int nrs ) throws Exception { + // Create a table with regions + byte [] table = Bytes.toBytes(tname); + byte [] family = Bytes.toBytes(fname); + LOG.info("Creating table with " + nrs + " regions"); + HTable ht = TEST_UTIL.createTable(table, family); + int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family, nrs); + assertEquals(nrs, numRegions); + LOG.info("Waiting for no more RIT\n"); + blockUntilNoRIT(zkw, master); + // disable-enable cycle to get rid of table's dead regions left behind + // by createMultiRegions + LOG.debug("Disabling table\n"); + TEST_UTIL.getHBaseAdmin().disableTable(table); + LOG.debug("Waiting for no more RIT\n"); + blockUntilNoRIT(zkw, master); + NavigableSet regions = getAllOnlineRegions(cluster); + LOG.debug("Verifying only catalog regions are assigned\n"); + if (regions.size() != 2) { + for (String oregion : regions) + LOG.debug("Region still online: " + oregion); + } + assertEquals(2, regions.size()); + LOG.debug("Enabling table\n"); + TEST_UTIL.getHBaseAdmin().enableTable(table); + LOG.debug("Waiting for no more RIT\n"); + blockUntilNoRIT(zkw, master); + LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); + regions = getAllOnlineRegions(cluster); + assertEquals(numRegions + 2, regions.size()); + return ht; + } + + void populateDataInTable(int nrows, String fname) throws Exception { + byte [] family = Bytes.toBytes(fname); + + List rsts = cluster.getLiveRegionServerThreads(); + assertEquals(NUM_RS, rsts.size()); + + for (RegionServerThread rst : rsts) { + HRegionServer hrs = rst.getRegionServer(); + List hris = hrs.getOnlineRegions(); + for (HRegionInfo hri : hris) { + if (hri.isMetaRegion() || hri.isRootRegion()) { + continue; + } + LOG.debug("adding data to rs = " + rst.getName() + + " region = "+ hri.getRegionNameAsString()); + HRegion region = hrs.getOnlineRegion(hri.getRegionName()); + assertTrue(region != null); + putData(region, hri.getStartKey(), nrows, Bytes.toBytes("q"), family); + } + } + } + + public void makeHLog(HLog log, + List hris, String tname, + int num_edits, int edit_size) throws IOException { + + byte[] table = Bytes.toBytes(tname); + byte[] value = new byte[edit_size]; + for (int i = 0; i < edit_size; i++) { + value[i] = (byte)('a' + (i % 26)); + } + int n = hris.size(); + int[] counts = new int[n]; + int j = 0; + for (int i = 0; i < num_edits; i += 1) { + WALEdit e = new WALEdit(); + byte [] row = Bytes.toBytes("r" + Integer.toString(i)); + byte [] family = Bytes.toBytes("f"); + byte [] qualifier = Bytes.toBytes("c" + Integer.toString(i)); + e.add(new KeyValue(row, family, qualifier, + System.currentTimeMillis(), value)); + // LOG.info("Region " + i + ": " + e); + j++; + log.append(hris.get(j % n), table, e, System.currentTimeMillis()); + counts[j % n] += 1; + // if ((i % 8096) == 0) { + // log.sync(); + // } + } + log.sync(); + log.close(); + for (int i = 0; i < n; i++) { + LOG.info("region " + hris.get(i).getRegionNameAsString() + + " has " + counts[i] + " edits"); + } + return; + } + + private int countHLog(Path log, FileSystem fs, Configuration conf) + throws IOException { + int count = 0; + HLog.Reader in = HLog.getReader(fs, log, conf); + while (in.next() != null) { + count++; + } + return count; + } + + private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master) + throws KeeperException, InterruptedException { + ZKAssign.blockUntilNoRIT(zkw); + master.assignmentManager.waitUntilNoRegionsInTransition(60000); + } + + private void blockUntilRIT(ZooKeeperWatcher zkw) + throws KeeperException, InterruptedException { + ZKAssign.blockUntilRIT(zkw); + } + + private void putData(HRegion region, byte[] startRow, int numRows, byte [] qf, + byte [] ...families) + throws IOException { + for(int i = 0; i < numRows; i++) { + Put put = new Put(Bytes.add(startRow, Bytes.toBytes(i))); + for(byte [] family : families) { + put.add(family, qf, null); + } + region.put(put); + } + } + + private NavigableSet getAllOnlineRegions(MiniHBaseCluster cluster) { + NavigableSet online = new TreeSet(); + for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) { + for (HRegionInfo region : rst.getRegionServer().getOnlineRegions()) { + online.add(region.getRegionNameAsString()); + } + } + return online; + } + + private void waitForCounter(AtomicLong ctr, long oldval, long newval, + long timems) { + long curt = System.currentTimeMillis(); + long endt = curt + timems; + while (curt < endt) { + if (ctr.get() == oldval) { + Thread.yield(); + curt = System.currentTimeMillis(); + } else { + assertEquals(newval, ctr.get()); + return; + } + } + assertTrue(false); + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java new file mode 100644 index 0000000..744fcfa --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java @@ -0,0 +1,433 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.master.SplitLogManager.Task; +import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; +import org.apache.hadoop.hbase.regionserver.TestMasterAddressManager.NodeCreationListener; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs.Ids; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class TestSplitLogManager { + private static final Log LOG = LogFactory.getLog(TestSplitLogManager.class); + static { + Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); + } + + private ZooKeeperWatcher zkw; + private static boolean stopped = false; + private SplitLogManager slm; + private Configuration conf; + + + private final static HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + static Stoppable stopper = new Stoppable() { + @Override + public void stop(String why) { + stopped = true; + } + + @Override + public boolean isStopped() { + return stopped; + } + + }; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Before + public void setup() throws Exception { + TEST_UTIL.startMiniZKCluster(); + conf = TEST_UTIL.getConfiguration(); + zkw = new ZooKeeperWatcher(conf, "split-log-manager-tests", null); + ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); + ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); + assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1); + LOG.debug(zkw.baseZNode + " created"); + ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode); + assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1); + LOG.debug(zkw.splitLogZNode + " created"); + + stopped = false; + resetCounters(); + } + + @After + public void teardown() throws IOException, KeeperException { + stopper.stop(""); + slm.stop(); + TEST_UTIL.shutdownMiniZKCluster(); + } + + private void waitForCounter(AtomicLong ctr, long oldval, long newval, + long timems) { + long curt = System.currentTimeMillis(); + long endt = curt + timems; + while (curt < endt) { + if (ctr.get() == oldval) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + curt = System.currentTimeMillis(); + } else { + assertEquals(newval, ctr.get()); + return; + } + } + assertTrue(false); + } + + private int numRescanPresent() throws KeeperException { + int num = 0; + List nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode); + for (String node : nodes) { + if (ZKSplitLog.isRescanNode(zkw, ZKSplitLog.getNodeName(zkw, node))) { + num++; + } + } + return num; + } + + private void setRescanNodeDone(int count) throws KeeperException { + List nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode); + for (String node : nodes) { + if (ZKSplitLog.isRescanNode(zkw, ZKSplitLog.getNodeName(zkw, node))) { + ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, node), + TaskState.TASK_DONE.get("some-worker")); + count--; + } + } + assertEquals(0, count); + } + + private String submitTaskAndWait(TaskBatch batch, String name) + throws KeeperException, InterruptedException { + String tasknode = ZKSplitLog.getNodeName(zkw, "foo"); + NodeCreationListener listener = new NodeCreationListener(zkw, tasknode); + zkw.registerListener(listener); + ZKUtil.watchAndCheckExists(zkw, tasknode); + + slm.installTask("foo", batch); + assertEquals(1, batch.installed); + assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch); + assertEquals(1L, tot_mgr_node_create_queued.get()); + + LOG.debug("waiting for task node creation"); + listener.waitForCreation(); + LOG.debug("task created"); + return tasknode; + } + + /** + * Test whether the splitlog correctly creates a task in zookeeper + * @throws Exception + */ + @Test + public void testTaskCreation() throws Exception { + LOG.info("TestTaskCreation - test the creation of a task in zk"); + + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + TaskBatch batch = new TaskBatch(); + + String tasknode = submitTaskAndWait(batch, "foo"); + + byte[] data = ZKUtil.getData(zkw, tasknode); + LOG.info("Task node created " + new String(data)); + assertTrue(TaskState.TASK_UNASSIGNED.equals(data, "dummy-master")); + } + + @Test + public void testOrphanTaskAcquisition() throws Exception { + LOG.info("TestOrphanTaskAcquisition"); + + String tasknode = ZKSplitLog.getNodeName(zkw, "orphan"); + zkw.getZooKeeper().create(tasknode, + TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + int to = 1000; + conf.setInt("hbase.splitlog.manager.timeout", to); + conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); + to = to + 2 * 100; + + + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); + Task task = slm.findOrCreateOrphanTask(tasknode); + assertTrue(task.isOrphan()); + waitForCounter(tot_mgr_heartbeat, 0, 1, 100); + assertFalse(task.isUnassigned()); + long curt = System.currentTimeMillis(); + assertTrue((task.last_update <= curt) && + (task.last_update > (curt - 1000))); + LOG.info("waiting for manager to resubmit the orphan task"); + waitForCounter(tot_mgr_resubmit, 0, 1, to + 100); + assertTrue(task.isUnassigned()); + waitForCounter(tot_mgr_rescan, 0, 1, to + 100); + assertEquals(1, numRescanPresent()); + } + + @Test + public void testUnassignedOrphan() throws Exception { + LOG.info("TestUnassignedOrphan - an unassigned task is resubmitted at" + + " startup"); + String tasknode = ZKSplitLog.getNodeName(zkw, "orphan"); + //create an unassigned orphan task + zkw.getZooKeeper().create(tasknode, + TaskState.TASK_UNASSIGNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + int version = ZKUtil.checkExists(zkw, tasknode); + + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); + Task task = slm.findOrCreateOrphanTask(tasknode); + assertTrue(task.isOrphan()); + assertTrue(task.isUnassigned()); + // wait for RESCAN node to be created + waitForCounter(tot_mgr_rescan, 0, 1, 500); + Task task2 = slm.findOrCreateOrphanTask(tasknode); + assertTrue(task == task2); + LOG.debug("task = " + task); + assertEquals(1L, tot_mgr_resubmit.get()); + assertEquals(1, task.incarnation); + assertEquals(0, task.unforcedResubmits); + assertTrue(task.isOrphan()); + assertTrue(task.isUnassigned()); + assertTrue(ZKUtil.checkExists(zkw, tasknode) > version); + assertEquals(1, numRescanPresent()); + } + + @Test + public void testMultipleResubmits() throws Exception { + LOG.info("TestMultipleResbmits - no indefinite resubmissions"); + + int to = 1000; + conf.setInt("hbase.splitlog.manager.timeout", to); + conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); + to = to + 2 * 100; + + conf.setInt("hbase.splitlog.max.resubmit", 2); + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + TaskBatch batch = new TaskBatch(); + + String tasknode = submitTaskAndWait(batch, "foo"); + int version = ZKUtil.checkExists(zkw, tasknode); + + ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); + waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); + waitForCounter(tot_mgr_resubmit, 0, 1, to + 100); + int version1 = ZKUtil.checkExists(zkw, tasknode); + assertTrue(version1 > version); + ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker2")); + waitForCounter(tot_mgr_heartbeat, 1, 2, 1000); + waitForCounter(tot_mgr_resubmit, 1, 2, to + 100); + int version2 = ZKUtil.checkExists(zkw, tasknode); + assertTrue(version2 > version1); + ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker3")); + waitForCounter(tot_mgr_heartbeat, 1, 2, 1000); + waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + 100); + assertEquals(2, numRescanPresent()); + Thread.sleep(to + 100); + assertEquals(2L, tot_mgr_resubmit.get()); + } + + @Test + public void testRescanCleanup() throws Exception { + LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up"); + + int to = 1000; + conf.setInt("hbase.splitlog.manager.timeout", to); + conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); + to = to + 2 * 100; + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + TaskBatch batch = new TaskBatch(); + + String tasknode = submitTaskAndWait(batch, "foo"); + int version = ZKUtil.checkExists(zkw, tasknode); + + ZKUtil.setData(zkw, tasknode, TaskState.TASK_OWNED.get("worker1")); + waitForCounter(tot_mgr_heartbeat, 0, 1, 1000); + waitForCounter(tot_mgr_resubmit, 0, 1, to + 100); + int version1 = ZKUtil.checkExists(zkw, tasknode); + assertTrue(version1 > version); + assertEquals(1, numRescanPresent()); + byte[] taskstate = ZKUtil.getData(zkw, tasknode); + assertTrue(Arrays.equals(TaskState.TASK_UNASSIGNED.get("dummy-master"), + taskstate)); + + setRescanNodeDone(1); + + waitForCounter(tot_mgr_rescan_deleted, 0, 1, 1000); + + assertEquals(0, numRescanPresent()); + return; + } + + @Test + public void testTaskDone() throws Exception { + LOG.info("TestTaskDone - cleanup task node once in DONE state"); + + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + TaskBatch batch = new TaskBatch(); + String tasknode = submitTaskAndWait(batch, "foo"); + ZKUtil.setData(zkw, tasknode, TaskState.TASK_DONE.get("worker")); + synchronized (batch) { + while (batch.installed != batch.done) { + batch.wait(); + } + } + waitForCounter(tot_mgr_task_deleted, 0, 1, 1000); + assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); + } + + @Test + public void testTaskErr() throws Exception { + LOG.info("TestTaskErr - cleanup task node once in ERR state"); + + conf.setInt("hbase.splitlog.max.resubmit", 0); + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + TaskBatch batch = new TaskBatch(); + + String tasknode = submitTaskAndWait(batch, "foo"); + ZKUtil.setData(zkw, tasknode, TaskState.TASK_ERR.get("worker")); + synchronized (batch) { + while (batch.installed != batch.error) { + batch.wait(); + } + } + waitForCounter(tot_mgr_task_deleted, 0, 1, 1000); + assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1); + conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLog.DEFAULT_MAX_RESUBMIT); + } + + @Test + public void testTaskResigned() throws Exception { + LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); + + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + TaskBatch batch = new TaskBatch(); + String tasknode = submitTaskAndWait(batch, "foo"); + ZKUtil.setData(zkw, tasknode, TaskState.TASK_RESIGNED.get("worker")); + int version = ZKUtil.checkExists(zkw, tasknode); + + waitForCounter(tot_mgr_resubmit, 0, 1, 1000); + int version1 = ZKUtil.checkExists(zkw, tasknode); + assertTrue(version1 > version); + assertEquals(1, numRescanPresent()); + + byte[] taskstate = ZKUtil.getData(zkw, tasknode); + assertTrue(Arrays.equals(taskstate, + TaskState.TASK_UNASSIGNED.get("dummy-master"))); + } + + @Test + public void testUnassignedTimeout() throws Exception { + LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" + + " resubmit"); + + // create an orphan task in OWNED state + String tasknode1 = ZKSplitLog.getNodeName(zkw, "orphan"); + zkw.getZooKeeper().create(tasknode1, + TaskState.TASK_OWNED.get("dummy-worker"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + int to = 1000; + conf.setInt("hbase.splitlog.manager.timeout", to); + conf.setInt("hbase.splitlog.manager.unassigned.timeout", 2 * to); + conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 100); + + + slm = new SplitLogManager(zkw, conf, stopper, "dummy-master", null); + slm.finishInitialization(); + waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, 100); + + + // submit another task which will stay in unassigned mode + TaskBatch batch = new TaskBatch(); + submitTaskAndWait(batch, "foo"); + + // keep updating the orphan owned node every to/2 seconds + for (int i = 0; i < (3 * to)/100; i++) { + Thread.sleep(100); + ZKUtil.setData(zkw, tasknode1, + TaskState.TASK_OWNED.get("dummy-worker")); + } + + // since all the nodes in the system are not unassigned the + // unassigned_timeout must not have kicked in + assertEquals(0, numRescanPresent()); + + // since we have stopped heartbeating the owned node therefore it should + // get resubmitted + LOG.info("waiting for manager to resubmit the orphan task"); + waitForCounter(tot_mgr_resubmit, 0, 1, to + 500); + assertEquals(1, numRescanPresent()); + + // now all the nodes are unassigned. manager should post another rescan + waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + 500); + assertEquals(2, numRescanPresent()); + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java new file mode 100644 index 0000000..8177daf --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -0,0 +1,280 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + + +public class TestSplitLogWorker { + private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class); + static { + Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); + } + private final static HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + private ZooKeeperWatcher zkw; + private SplitLogWorker slw; + + private void waitForCounter(AtomicLong ctr, long oldval, long newval, + long timems) { + long curt = System.currentTimeMillis(); + long endt = curt + timems; + while (curt < endt) { + if (ctr.get() == oldval) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + curt = System.currentTimeMillis(); + } else { + assertEquals(newval, ctr.get()); + return; + } + } + assertTrue(false); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Before + public void setup() throws Exception { + slw = null; + TEST_UTIL.startMiniZKCluster(); + zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + "split-log-worker-tests", null); + ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); + ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); + assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1); + LOG.debug(zkw.baseZNode + " created"); + ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode); + assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1); + LOG.debug(zkw.splitLogZNode + " created"); + resetCounters(); + + } + + @After + public void teardown() throws Exception { + if (slw != null) { + slw.stop(); + slw.worker.join(3000); + if (slw.worker.isAlive()) { + assertTrue("could not stop the worker thread" == null); + } + } + TEST_UTIL.shutdownMiniZKCluster(); + } + + SplitLogWorker.TaskExecutor neverEndingTask = + new SplitLogWorker.TaskExecutor() { + + @Override + public Status exec(String name, CancelableProgressable p) { + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + return Status.PREEMPTED; + } + if (!p.progress()) { + return Status.PREEMPTED; + } + } + } + + }; + + @Test + public void testAcquireTaskAtStartup() throws Exception { + LOG.info("testAcquireTaskAtStartup"); + + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tatas"), + TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), "rs", + neverEndingTask); + slw.start(); + waitForCounter(tot_wkr_task_acquired, 0, 1, 100); + assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, + ZKSplitLog.getNodeName(zkw, "tatas")), "rs")); + } + + @Test + public void testRaceForTask() throws Exception { + LOG.info("testRaceForTask"); + + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "trft"), + TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + SplitLogWorker slw1 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), + "svr1", neverEndingTask); + SplitLogWorker slw2 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), + "svr2", neverEndingTask); + slw1.start(); + slw2.start(); + waitForCounter(tot_wkr_task_acquired, 0, 1, 100); + waitForCounter(tot_wkr_failed_to_grab_task_lost_race, 0, 1, 100); + assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, + ZKSplitLog.getNodeName(zkw, "trft")), "svr1") || + TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, + ZKSplitLog.getNodeName(zkw, "trft")), "svr2")); + slw1.stop(); + slw2.stop(); + slw1.worker.join(); + slw2.worker.join(); + } + + @Test + public void testPreemptTask() throws Exception { + LOG.info("testPreemptTask"); + + slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), + "tpt_svr", neverEndingTask); + slw.start(); + Thread.yield(); // let the worker start + Thread.sleep(100); + + // this time create a task node after starting the splitLogWorker + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tpt_task"), + TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + assertEquals(1, slw.taskReadySeq); + assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, + ZKSplitLog.getNodeName(zkw, "tpt_task")), "tpt_svr")); + + ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "tpt_task"), + TaskState.TASK_UNASSIGNED.get("manager")); + waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); + } + + @Test + public void testMultipleTasks() throws Exception { + LOG.info("testMultipleTasks"); + slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), + "tmt_svr", neverEndingTask); + slw.start(); + Thread.yield(); // let the worker start + Thread.sleep(100); + + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tmt_task"), + TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + // now the worker is busy doing the above task + + // create another task + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tmt_task_2"), + TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + // preempt the first task, have it owned by another worker + ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "tmt_task"), + TaskState.TASK_OWNED.get("another-worker")); + waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); + + waitForCounter(tot_wkr_task_acquired, 1, 2, 1000); + assertEquals(2, slw.taskReadySeq); + assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, + ZKSplitLog.getNodeName(zkw, "tmt_task_2")), "tmt_svr")); + } + + @Test + public void testRescan() throws Exception { + LOG.info("testRescan"); + slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), + "svr", neverEndingTask); + slw.start(); + Thread.yield(); // let the worker start + Thread.sleep(100); + + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "task"), + TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + // now the worker is busy doing the above task + + // preempt the task, have it owned by another worker + ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "task"), + TaskState.TASK_UNASSIGNED.get("manager")); + waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); + + // create a RESCAN node + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "RESCAN"), + TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); + + waitForCounter(tot_wkr_task_acquired, 1, 2, 1000); + // RESCAN node might not have been processed if the worker became busy + // with the above task. preempt the task again so that now the RESCAN + // node is processed + ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "task"), + TaskState.TASK_UNASSIGNED.get("manager")); + waitForCounter(tot_wkr_preempt_task, 1, 2, 1000); + waitForCounter(tot_wkr_task_acquired_rescan, 0, 1, 1000); + + List nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode); + LOG.debug(nodes); + int num = 0; + for (String node : nodes) { + num++; + if (node.startsWith("RESCAN")) { + assertTrue(TaskState.TASK_DONE.equals(ZKUtil.getData(zkw, + ZKSplitLog.getNodeName(zkw, node)), "svr")); + } + } + assertEquals(2, num); + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index 5555d32..ed0ab13 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -1,5 +1,5 @@ /** - * Copyright 2010 The Apache Software Foundation + * Copyright 2011 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.ipc.RemoteException; @@ -150,7 +151,7 @@ public class TestHLogSplit { } /** - * @throws IOException + * @throws IOException * @see https://issues.apache.org/jira/browse/HBASE-3020 */ @Test public void testRecoveredEditsPathForMeta() throws IOException { @@ -164,7 +165,7 @@ public class TestHLogSplit { HLog.Entry entry = new HLog.Entry(new HLogKey(encoded, HConstants.META_TABLE_NAME, 1, now), new WALEdit()); - Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, hbaseDir); + Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, hbaseDir, true); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); } @@ -173,13 +174,13 @@ public class TestHLogSplit { public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted() throws IOException { AtomicBoolean stop = new AtomicBoolean(false); - + FileStatus[] stats = fs.listStatus(new Path("/hbase/t1")); assertTrue("Previous test should clean up table dir", stats == null || stats.length == 0); generateHLogs(-1); - + try { (new ZombieNewLogWriterRegionServer(stop)).start(); HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, @@ -456,7 +457,7 @@ public class TestHLogSplit { FileStatus[] archivedLogs = fs.listStatus(corruptDir); assertEquals(archivedLogs.length, 0); } - + @Test public void testLogsGetArchivedAfterSplit() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); @@ -507,7 +508,7 @@ public class TestHLogSplit { // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null } } -/* DISABLED for now. TODO: HBASE-2645 +/* DISABLED for now. TODO: HBASE-2645 @Test public void testLogCannotBeWrittenOnceParsed() throws IOException { AtomicLong counter = new AtomicLong(0); @@ -545,7 +546,7 @@ public class TestHLogSplit { generateHLogs(-1); fs.initialize(fs.getUri(), conf); Thread zombie = new ZombieNewLogWriterRegionServer(stop); - + try { zombie.start(); try { @@ -644,10 +645,10 @@ public class TestHLogSplit { HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, hbaseDir, hlogDir, oldLogDir, fs); logSplitter.splitLog(); - + assertFalse(fs.exists(regiondir)); } - + @Test public void testIOEOnOutputThread() throws Exception { conf.setBoolean(HBASE_SKIP_ERRORS, false); @@ -663,7 +664,7 @@ public class TestHLogSplit { HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); Mockito.doThrow(new IOException("Injected")).when(mockWriter).append(Mockito.any()); return mockWriter; - + } }; try { @@ -700,7 +701,7 @@ public class TestHLogSplit { fail("There shouldn't be any exception but: " + e.toString()); } } - + /** * Test log split process with fake data and lots of edits to trigger threading * issues. @@ -709,7 +710,7 @@ public class TestHLogSplit { public void testThreading() throws Exception { doTestThreading(20000, 128*1024*1024, 0); } - + /** * Test blocking behavior of the log split process if writers are writing slower * than the reader is reading. @@ -718,7 +719,7 @@ public class TestHLogSplit { public void testThreadingSlowWriterSmallBuffer() throws Exception { doTestThreading(200, 1024, 50); } - + /** * Sets up a log splitter with a mock reader and writer. The mock reader generates * a specified number of edits spread across 5 regions. The mock writer optionally @@ -726,7 +727,7 @@ public class TestHLogSplit { * * * After the split is complete, verifies that the statistics show the correct number * of edits output into each region. - * + * * @param numFakeEdits number of fake edits to push through pipeline * @param bufferSize size of in-memory buffer * @param writerSlowness writer threads will sleep this many ms per edit @@ -743,20 +744,20 @@ public class TestHLogSplit { out.close(); // Make region dirs for our destination regions so the output doesn't get skipped - final List regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); + final List regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); makeRegionDirs(fs, regions); // Create a splitter that reads and writes the data without touching disk HLogSplitter logSplitter = new HLogSplitter( localConf, hbaseDir, hlogDir, oldLogDir, fs) { - + /* Produce a mock writer that doesn't write anywhere */ protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); Mockito.doAnswer(new Answer() { int expectedIndex = 0; - + @Override public Void answer(InvocationOnMock invocation) { if (writerSlowness > 0) { @@ -771,17 +772,17 @@ public class TestHLogSplit { List keyValues = edit.getKeyValues(); assertEquals(1, keyValues.size()); KeyValue kv = keyValues.get(0); - + // Check that the edits come in the right order. assertEquals(expectedIndex, Bytes.toInt(kv.getRow())); expectedIndex++; return null; } }).when(mockWriter).append(Mockito.any()); - return mockWriter; + return mockWriter; } - - + + /* Produce a mock reader that generates fake entries */ protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf) throws IOException { @@ -792,11 +793,11 @@ public class TestHLogSplit { @Override public HLog.Entry answer(InvocationOnMock invocation) throws Throwable { if (index >= numFakeEdits) return null; - + // Generate r0 through r4 in round robin fashion int regionIdx = index % regions.size(); byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)}; - + HLog.Entry ret = createTestEntry(TABLE_NAME, region, Bytes.toBytes((int)(index / regions.size())), FAMILY, QUALIFIER, VALUE, index); @@ -807,22 +808,22 @@ public class TestHLogSplit { return mockReader; } }; - + logSplitter.splitLog(); - + // Verify number of written edits per region Map outputCounts = logSplitter.getOutputCounts(); for (Map.Entry entry : outputCounts.entrySet()) { - LOG.info("Got " + entry.getValue() + " output edits for region " + + LOG.info("Got " + entry.getValue() + " output edits for region " + Bytes.toString(entry.getKey())); - + assertEquals((long)entry.getValue(), numFakeEdits / regions.size()); } assertEquals(regions.size(), outputCounts.size()); } - - + + /** * This thread will keep writing to the file after the split process has started @@ -849,7 +850,7 @@ public class TestHLogSplit { while (true) { try { String region = "juliet"; - + fs.mkdirs(new Path(new Path(hbaseDir, region), region)); appendEntry(lastLogWriter, TABLE_NAME, region.getBytes(), ("r" + editsCount).getBytes(), FAMILY, QUALIFIER, VALUE, 0); @@ -896,22 +897,22 @@ public class TestHLogSplit { return; } Path tableDir = new Path(hbaseDir, new String(TABLE_NAME)); - Path regionDir = new Path(tableDir, regions.get(0)); + Path regionDir = new Path(tableDir, regions.get(0)); Path recoveredEdits = new Path(regionDir, HLogSplitter.RECOVERED_EDITS); String region = "juliet"; Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet"); try { - + while (!fs.exists(recoveredEdits) && !stop.get()) { flushToConsole("Juliet: split not started, sleeping a bit..."); Threads.sleep(10); } - + fs.mkdirs(new Path(tableDir, region)); HLog.Writer writer = HLog.createWriter(fs, - julietLog, conf); + julietLog, conf); appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(), - ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0); + ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0); writer.close(); flushToConsole("Juliet file creator: created file " + julietLog); } catch (IOException e1) { @@ -920,6 +921,78 @@ public class TestHLogSplit { } } + private CancelableProgressable reporter = new CancelableProgressable() { + int count = 0; + + @Override + public boolean progress() { + count++; + LOG.debug("progress = " + count); + return true; + } + }; + + @Test + public void testSplitLogFileWithOneRegion() throws IOException { + LOG.info("testSplitLogFileWithOneRegion"); + final String REGION = "region__1"; + regions.removeAll(regions); + regions.add(REGION); + + + generateHLogs(1, 10, -1); + FileStatus logfile = fs.listStatus(hlogDir)[0]; + fs.initialize(fs.getUri(), conf); + HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, + conf, reporter); + HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, + logfile.getPath().toString(), conf); + + + Path originalLog = (fs.listStatus(oldLogDir))[0].getPath(); + Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION); + + + assertEquals(true, logsAreEqual(originalLog, splitLog)); + } + + @Test + public void testSplitLogFileEmpty() throws IOException { + LOG.info("testSplitLogFileEmpty"); + injectEmptyFile(".empty", true); + FileStatus logfile = fs.listStatus(hlogDir)[0]; + + fs.initialize(fs.getUri(), conf); + + HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, + conf, reporter); + HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, + logfile.getPath().toString(), conf); + Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME); + FileStatus [] files = this.fs.listStatus(tdir); + assertTrue(files == null || files.length == 0); + + assertEquals(0, countHLog(fs.listStatus(oldLogDir)[0].getPath(), fs, conf)); + } + + @Test + public void testSplitLogFileMultipleRegions() throws IOException { + LOG.info("testSplitLogFileMultipleRegions"); + generateHLogs(1, 10, -1); + FileStatus logfile = fs.listStatus(hlogDir)[0]; + fs.initialize(fs.getUri(), conf); + + HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, + conf, reporter); + HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, + logfile.getPath().toString(), conf); + for (String region : regions) { + Path recovered = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(10, countHLog(recovered, fs, conf)); + } + } + + private void flushToConsole(String s) { System.out.println(s); System.out.flush(); @@ -1001,13 +1074,13 @@ public class TestHLogSplit { out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); closeOrFlush(close, out); break; - + case TRUNCATE: fs.delete(path, false); out = fs.create(path); out.write(corrupted_bytes, 0, fileSize-32); closeOrFlush(close, out); - + break; } @@ -1052,7 +1125,7 @@ public class TestHLogSplit { writer.sync(); return seq; } - + private HLog.Entry createTestEntry( byte[] table, byte[] region, byte[] row, byte[] family, byte[] qualifier, @@ -1085,7 +1158,7 @@ public class TestHLogSplit { FileStatus[] f2 = fs.listStatus(p2); assertNotNull("Path " + p1 + " doesn't exist", f1); assertNotNull("Path " + p2 + " doesn't exist", f2); - + System.out.println("Files in " + p1 + ": " + Joiner.on(",").join(FileUtil.stat2Paths(f1))); System.out.println("Files in " + p2 + ": " +