diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index e01bb00..a964ba8 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.regionserver.consensus.SplitLogWorkerConsensus; +import org.apache.hadoop.hbase.regionserver.consensus.zk.ZkSplitLogWorkerConsensus; import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; @@ -108,6 +110,8 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { protected final AtomicInteger tasksInProgress = new AtomicInteger(0); private int maxConcurrentTasks = 0; + private final SplitLogWorkerConsensus consensus; + public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server, TaskExecutor splitTaskExecutor) { super(watcher); @@ -120,6 +124,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { this.executorService = this.server.getExecutorService(); this.maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS); + consensus = new ZkSplitLogWorkerConsensus(watcher); } public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf, @@ -342,8 +347,13 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } if (ZKSplitLog.isRescanNode(watcher, currentTask)) { - HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName), - SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion); + ZkSplitLogWorkerConsensus.ZkSplitTaskDetails splitTaskDetails = + new ZkSplitLogWorkerConsensus.ZkSplitTaskDetails(); + splitTaskDetails.setTaskNode(currentTask); + splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion)); + + consensus.endTask(new SplitLogTask.Done(this.serverName), + SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails); return; } @@ -469,9 +479,14 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { } }; + ZkSplitLogWorkerConsensus.ZkSplitTaskDetails splitTaskDetails = + new ZkSplitLogWorkerConsensus.ZkSplitTaskDetails(); + splitTaskDetails.setTaskNode(curTask); + splitTaskDetails.setCurTaskZKVersion(zkVersion); + HLogSplitterHandler hsh = - new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, this.tasksInProgress, - this.splitTaskExecutor); + new HLogSplitterHandler(this.server, splitTaskDetails, this.consensus, + reporter, this.tasksInProgress, this.splitTaskExecutor); this.executorService.submit(hsh); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/SplitLogWorkerConsensus.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/SplitLogWorkerConsensus.java index 36a99b5..5eb0434 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/SplitLogWorkerConsensus.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/SplitLogWorkerConsensus.java @@ -43,17 +43,17 @@ public interface SplitLogWorkerConsensus { * @param splitTaskDetails details about log split task (specific to * consensus engine being used). */ - public void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails splitTaskDetails); + void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails splitTaskDetails); /** * Interface for log-split tasks Used to carry implementation details in encapsulated way * through Handlers to the consensus API. */ - public static interface SplitTaskDetails { + static interface SplitTaskDetails { /** * @return full file path in HDFS for the WAL file to be split. */ - public String getTaskFile(); + String getTaskFile(); } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java index 4ac800e..44ffa79 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hbase.regionserver.handler; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -32,14 +30,10 @@ import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status; +import org.apache.hadoop.hbase.regionserver.consensus.SplitLogWorkerConsensus; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; /** * Handles log splitting a wal @@ -48,27 +42,24 @@ import org.apache.zookeeper.KeeperException; public class HLogSplitterHandler extends EventHandler { private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class); private final ServerName serverName; - private final String curTask; - private final String wal; - private final ZooKeeperWatcher zkw; private final CancelableProgressable reporter; private final AtomicInteger inProgressTasks; - private final MutableInt curTaskZKVersion; private final TaskExecutor splitTaskExecutor; + private final SplitLogWorkerConsensus.SplitTaskDetails splitTaskDetails; + private final SplitLogWorkerConsensus consensus; - public HLogSplitterHandler(final Server server, String curTask, - final MutableInt curTaskZKVersion, + public HLogSplitterHandler(final Server server, + SplitLogWorkerConsensus.SplitTaskDetails splitTaskDetails, + SplitLogWorkerConsensus consensus, CancelableProgressable reporter, AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) { super(server, EventType.RS_LOG_REPLAY); - this.curTask = curTask; - this.wal = ZKSplitLog.getFileName(curTask); + this.consensus = consensus; + this.splitTaskDetails = splitTaskDetails; this.reporter = reporter; this.inProgressTasks = inProgressTasks; this.inProgressTasks.incrementAndGet(); this.serverName = server.getServerName(); - this.zkw = server.getZooKeeper(); - this.curTaskZKVersion = curTaskZKVersion; this.splitTaskExecutor = splitTaskExecutor; } @@ -76,20 +67,21 @@ public class HLogSplitterHandler extends EventHandler { public void process() throws IOException { long startTime = System.currentTimeMillis(); try { - Status status = this.splitTaskExecutor.exec(wal, reporter); + Status status = this.splitTaskExecutor.exec( + splitTaskDetails.getTaskFile(), reporter); switch (status) { case DONE: - endTask(zkw, new SplitLogTask.Done(this.serverName), - SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue()); + consensus.endTask(new SplitLogTask.Done(this.serverName), + SplitLogCounters.tot_wkr_task_done, splitTaskDetails); break; case PREEMPTED: SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); - LOG.warn("task execution prempted " + wal); + LOG.warn("task execution prempted " + splitTaskDetails.getTaskFile()); break; case ERR: if (server != null && !server.isStopped()) { - endTask(zkw, new SplitLogTask.Err(this.serverName), - SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue()); + consensus.endTask(new SplitLogTask.Err(this.serverName), + SplitLogCounters.tot_wkr_task_err, splitTaskDetails); break; } // if the RS is exiting then there is probably a tons of stuff @@ -97,45 +89,18 @@ public class HLogSplitterHandler extends EventHandler { //$FALL-THROUGH$ case RESIGNED: if (server != null && server.isStopped()) { - LOG.info("task execution interrupted because worker is exiting " + curTask); + LOG.info("task execution interrupted because worker is exiting " + + splitTaskDetails.toString()); } - endTask(zkw, new SplitLogTask.Resigned(this.serverName), - SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue()); + consensus.endTask(new SplitLogTask.Resigned(this.serverName), + SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails); break; } } finally { - LOG.info("worker " + serverName + " done with task " + curTask + " in " - + (System.currentTimeMillis() - startTime) + "ms"); + LOG.info("worker " + serverName + " done with task " + + splitTaskDetails.toString() + " in " + + (System.currentTimeMillis() - startTime) + "ms"); this.inProgressTasks.decrementAndGet(); } } - - /** - * endTask() can fail and the only way to recover out of it is for the - * {@link SplitLogManager} to timeout the task node. - * @param slt - * @param ctr - */ - public static void endTask(ZooKeeperWatcher zkw, SplitLogTask slt, AtomicLong ctr, String task, - int taskZKVersion) { - try { - if (ZKUtil.setData(zkw, task, slt.toByteArray(), taskZKVersion)) { - LOG.info("successfully transitioned task " + task + " to final state " + slt); - ctr.incrementAndGet(); - return; - } - LOG.warn("failed to transistion task " + task + " to end state " + slt - + " because of version mismatch "); - } catch (KeeperException.BadVersionException bve) { - LOG.warn("transisition task " + task + " to " + slt - + " failed because of version mismatch", bve); - } catch (KeeperException.NoNodeException e) { - LOG.fatal( - "logic error - end task " + task + " " + slt - + " failed because task doesn't exist", e); - } catch (KeeperException e) { - LOG.warn("failed to end task, " + task + " " + slt, e); - } - SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet(); - } }