Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/SplitLogWorkerConsensus.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/SplitLogWorkerConsensus.java (revision ) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/SplitLogWorkerConsensus.java (revision ) @@ -0,0 +1,59 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.consensus; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.SplitLogTask; + +/** + * Consensus operations related to WAL splitting. + * + * The goal is to abstract out 2 types of direct usage of low-level + * ZooKeeper API: + * + * - executing actions upon receiving callback + * - operations on transient shared state. + */ +@InterfaceAudience.Private +public interface SplitLogWorkerConsensus { + + /** + * Notify consensus engine that splitting task has completed. + * + * @param slt See {@link SplitLogTask} + * @param ctr counter to be updated + * @param splitTaskDetails details about log split task (specific to + * consensus engine being used). + */ + void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails splitTaskDetails); + + /** + * Interface for log-split tasks Used to carry implementation details in encapsulated way + * through Handlers to the consensus API. + */ + static interface SplitTaskDetails { + + /** + * @return full file path in HDFS for the WAL file to be split. + */ + String getTaskFile(); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (revision 485dd36ffdd7d1dfe1920bc8df79bb1a9fbea037) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (revision ) @@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.master.SplitLogManager; +import org.apache.hadoop.hbase.regionserver.consensus.SplitLogWorkerConsensus; +import org.apache.hadoop.hbase.regionserver.consensus.zk.ZkSplitLogWorkerConsensus; import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; @@ -108,6 +110,8 @@ protected final AtomicInteger tasksInProgress = new AtomicInteger(0); private int maxConcurrentTasks = 0; + private final SplitLogWorkerConsensus consensus; + public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server, TaskExecutor splitTaskExecutor) { super(watcher); @@ -120,6 +124,7 @@ this.executorService = this.server.getExecutorService(); this.maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS); + consensus = new ZkSplitLogWorkerConsensus(watcher); } public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf, @@ -342,8 +347,13 @@ } if (ZKSplitLog.isRescanNode(watcher, currentTask)) { - HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName), - SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion); + ZkSplitLogWorkerConsensus.ZkSplitTaskDetails splitTaskDetails = + new ZkSplitLogWorkerConsensus.ZkSplitTaskDetails(); + splitTaskDetails.setTaskNode(currentTask); + splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion)); + + consensus.endTask(new SplitLogTask.Done(this.serverName), + SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails); return; } @@ -469,9 +479,14 @@ } }; + ZkSplitLogWorkerConsensus.ZkSplitTaskDetails splitTaskDetails = + new ZkSplitLogWorkerConsensus.ZkSplitTaskDetails(); + splitTaskDetails.setTaskNode(curTask); + splitTaskDetails.setCurTaskZKVersion(zkVersion); + HLogSplitterHandler hsh = - new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, this.tasksInProgress, - this.splitTaskExecutor); + new HLogSplitterHandler(this.server, splitTaskDetails, this.consensus, + reporter, this.tasksInProgress, this.splitTaskExecutor); this.executorService.submit(hsh); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/zk/ZkSplitLogWorkerConsensus.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/zk/ZkSplitLogWorkerConsensus.java (revision ) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/consensus/zk/ZkSplitLogWorkerConsensus.java (revision ) @@ -0,0 +1,121 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.consensus.zk; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.SplitLogCounters; +import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.regionserver.consensus.SplitLogWorkerConsensus; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * ZooKeeper-based implementation of consensus for WAL splitting. + */ +public class ZkSplitLogWorkerConsensus implements SplitLogWorkerConsensus { + private static final Log LOG = LogFactory.getLog(ZkSplitLogWorkerConsensus.class); + private final ZooKeeperWatcher watcher; + + /** + * Construct consensus implementation. + * + * @param watcher + */ + public ZkSplitLogWorkerConsensus(ZooKeeperWatcher watcher) { + this.watcher = watcher; + } + + /** + * endTask() can fail and the only way to recover out of it is for the + * {@link org.apache.hadoop.hbase.master.SplitLogManager} to timeout the task node. + * See {@link SplitLogWorkerConsensus#endTask} + */ + @Override + public void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails splitTaskDetails) { + ZkSplitTaskDetails zttbe = (ZkSplitTaskDetails) splitTaskDetails; + String task = zttbe.getTaskNode(); + int taskZKVersion = zttbe.getCurTaskZKVersion().intValue(); + try { + if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) { + LOG.info("successfully transitioned task " + task + " to final state " + slt); + ctr.incrementAndGet(); + return; + } + LOG.warn("failed to transistion task " + task + " to end state " + slt + + " because of version mismatch "); + } catch (KeeperException.BadVersionException bve) { + LOG.warn("transisition task " + task + " to " + slt + + " failed because of version mismatch", bve); + } catch (KeeperException.NoNodeException e) { + LOG.fatal( + "logic error - end task " + task + " " + slt + + " failed because task doesn't exist", e); + } catch (KeeperException e) { + LOG.warn("failed to end task, " + task + " " + slt, e); + } + SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet(); + } + + /** + * When ZK-based implementation wants to complete the task, it needs to know + * task znode and current znode cversion (needed for subsequent update operation). + */ + public static class ZkSplitTaskDetails implements SplitTaskDetails { + private String taskNode; + private MutableInt curTaskZKVersion; + + public ZkSplitTaskDetails() { + } + + public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) { + this.taskNode = taskNode; + this.curTaskZKVersion = curTaskZKVersion; + } + + public String getTaskNode() { + return taskNode; + } + + public void setTaskNode(String taskNode) { + this.taskNode = taskNode; + } + + public MutableInt getCurTaskZKVersion() { + return curTaskZKVersion; + } + + public void setCurTaskZKVersion(MutableInt curTaskZKVersion) { + this.curTaskZKVersion = curTaskZKVersion; + } + + /** + * See {@link SplitTaskDetails#getTaskFile()} + */ + @Override + public String getTaskFile() { + return ZKSplitLog.getFileName(taskNode); + } + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java (revision 485dd36ffdd7d1dfe1920bc8df79bb1a9fbea037) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java (revision ) @@ -20,9 +20,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -32,14 +30,10 @@ import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; -import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status; +import org.apache.hadoop.hbase.regionserver.consensus.SplitLogWorkerConsensus; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; /** * Handles log splitting a wal @@ -48,27 +42,24 @@ public class HLogSplitterHandler extends EventHandler { private static final Log LOG = LogFactory.getLog(HLogSplitterHandler.class); private final ServerName serverName; - private final String curTask; - private final String wal; - private final ZooKeeperWatcher zkw; private final CancelableProgressable reporter; private final AtomicInteger inProgressTasks; - private final MutableInt curTaskZKVersion; private final TaskExecutor splitTaskExecutor; + private final SplitLogWorkerConsensus.SplitTaskDetails splitTaskDetails; + private final SplitLogWorkerConsensus consensus; - public HLogSplitterHandler(final Server server, String curTask, - final MutableInt curTaskZKVersion, + public HLogSplitterHandler(final Server server, + SplitLogWorkerConsensus.SplitTaskDetails splitTaskDetails, + SplitLogWorkerConsensus consensus, CancelableProgressable reporter, AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) { super(server, EventType.RS_LOG_REPLAY); - this.curTask = curTask; - this.wal = ZKSplitLog.getFileName(curTask); + this.consensus = consensus; + this.splitTaskDetails = splitTaskDetails; this.reporter = reporter; this.inProgressTasks = inProgressTasks; this.inProgressTasks.incrementAndGet(); this.serverName = server.getServerName(); - this.zkw = server.getZooKeeper(); - this.curTaskZKVersion = curTaskZKVersion; this.splitTaskExecutor = splitTaskExecutor; } @@ -76,20 +67,21 @@ public void process() throws IOException { long startTime = System.currentTimeMillis(); try { - Status status = this.splitTaskExecutor.exec(wal, reporter); + Status status = this.splitTaskExecutor.exec( + splitTaskDetails.getTaskFile(), reporter); switch (status) { case DONE: - endTask(zkw, new SplitLogTask.Done(this.serverName), - SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue()); + consensus.endTask(new SplitLogTask.Done(this.serverName), + SplitLogCounters.tot_wkr_task_done, splitTaskDetails); break; case PREEMPTED: SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); - LOG.warn("task execution prempted " + wal); + LOG.warn("task execution prempted " + splitTaskDetails.getTaskFile()); break; case ERR: if (server != null && !server.isStopped()) { - endTask(zkw, new SplitLogTask.Err(this.serverName), - SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue()); + consensus.endTask(new SplitLogTask.Err(this.serverName), + SplitLogCounters.tot_wkr_task_err, splitTaskDetails); break; } // if the RS is exiting then there is probably a tons of stuff @@ -97,45 +89,18 @@ //$FALL-THROUGH$ case RESIGNED: if (server != null && server.isStopped()) { - LOG.info("task execution interrupted because worker is exiting " + curTask); + LOG.info("task execution interrupted because worker is exiting " + + splitTaskDetails.toString()); } - endTask(zkw, new SplitLogTask.Resigned(this.serverName), - SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue()); + consensus.endTask(new SplitLogTask.Resigned(this.serverName), + SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails); break; } } finally { - LOG.info("worker " + serverName + " done with task " + curTask + " in " + LOG.info("worker " + serverName + " done with task " + + splitTaskDetails.toString() + " in " - + (System.currentTimeMillis() - startTime) + "ms"); + + (System.currentTimeMillis() - startTime) + "ms"); this.inProgressTasks.decrementAndGet(); } - } - - /** - * endTask() can fail and the only way to recover out of it is for the - * {@link SplitLogManager} to timeout the task node. - * @param slt - * @param ctr - */ - public static void endTask(ZooKeeperWatcher zkw, SplitLogTask slt, AtomicLong ctr, String task, - int taskZKVersion) { - try { - if (ZKUtil.setData(zkw, task, slt.toByteArray(), taskZKVersion)) { - LOG.info("successfully transitioned task " + task + " to final state " + slt); - ctr.incrementAndGet(); - return; - } - LOG.warn("failed to transistion task " + task + " to end state " + slt - + " because of version mismatch "); - } catch (KeeperException.BadVersionException bve) { - LOG.warn("transisition task " + task + " to " + slt - + " failed because of version mismatch", bve); - } catch (KeeperException.NoNodeException e) { - LOG.fatal( - "logic error - end task " + task + " " + slt - + " failed because task doesn't exist", e); - } catch (KeeperException e) { - LOG.warn("failed to end task, " + task + " " + slt, e); - } - SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet(); } }