diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 0fa366e..02751a3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -46,7 +46,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Chore; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; @@ -124,7 +123,7 @@ public class SplitLogManager extends ZooKeeperListener { private long resubmit_threshold; private long timeout; private long unassignedTimeout; - private long lastNodeCreateTime = Long.MAX_VALUE; + private long lastTaskCreateTime = Long.MAX_VALUE; public boolean ignoreZKDeleteForTesting = false; private volatile long lastRecoveringNodeCreationTime = 0; // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check @@ -270,7 +269,7 @@ public class SplitLogManager extends ZooKeeperListener { * @throws IOException * if there was an error while splitting any log file * @return cumulative size of the logfiles split - * @throws IOException + * @throws IOException */ public long splitLogDistributed(final Path logDir) throws IOException { List logDirs = new ArrayList(); @@ -371,7 +370,7 @@ public class SplitLogManager extends ZooKeeperListener { } catch (IOException ioe) { FileStatus[] files = fs.listStatus(logDir); if (files != null && files.length > 0) { - LOG.warn("returning success without actually splitting and " + + LOG.warn("returning success without actually splitting and " + "deleting all the log files in path " + logDir); } else { LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); @@ -389,7 +388,7 @@ public class SplitLogManager extends ZooKeeperListener { /** * Add a task entry to splitlog znode if it is not already there. - * + * * @param taskname the path of the log to be split * @param batch the batch this task belongs to * @return true if a new entry is created, false if it is already there. @@ -399,6 +398,7 @@ public class SplitLogManager extends ZooKeeperListener { // This is a znode path under the splitlog dir with the rest of the path made up of an // url encoding of the passed in log to split. String path = ZKSplitLog.getEncodedNodeName(watcher, taskname); + lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis(); Task oldtask = createTaskIfAbsent(path, batch); if (oldtask == null) { // publish the task in zk @@ -528,7 +528,7 @@ public class SplitLogManager extends ZooKeeperListener { if(isMetaRecovery != null) { if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName)) || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) { - // skip non-meta regions when recovering the meta region or + // skip non-meta regions when recovering the meta region or // skip the meta region when recovering user regions continue; } @@ -712,7 +712,6 @@ public class SplitLogManager extends ZooKeeperListener { } private void createNodeSuccess(String path) { - lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis(); LOG.debug("put up splitlog task at znode " + path); getDataSetWatch(path, zkretries); } @@ -727,7 +726,7 @@ public class SplitLogManager extends ZooKeeperListener { private void getDataSetWatch(String path, Long retry_count) { this.watcher.getRecoverableZooKeeper().getZooKeeper(). getData(path, this.watcher, - new GetDataAsyncCallback(), retry_count); + new GetDataAsyncCallback(true), retry_count); SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); } @@ -735,7 +734,7 @@ public class SplitLogManager extends ZooKeeperListener { // A negative retry count will lead to ignoring all error processing. this.watcher.getRecoverableZooKeeper().getZooKeeper(). getData(path, this.watcher, - new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */); + new GetDataAsyncCallback(false), Long.valueOf(-1) /* retry count */); SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); } @@ -963,7 +962,7 @@ public class SplitLogManager extends ZooKeeperListener { /** * signal the workers that a task was resubmitted by creating the * RESCAN node. - * @throws KeeperException + * @throws KeeperException */ private void createRescanNode(long retries) { // The RESCAN node will be deleted almost immediately by the @@ -973,6 +972,7 @@ public class SplitLogManager extends ZooKeeperListener { // might miss the watch-trigger that creation of RESCAN node provides. // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks // therefore this behavior is safe. + lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis(); SplitLogTask slt = new SplitLogTask.Done(this.serverName); this.watcher.getRecoverableZooKeeper().getZooKeeper(). create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(), @@ -981,7 +981,6 @@ public class SplitLogManager extends ZooKeeperListener { } private void createRescanSuccess(String path) { - lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis(); SplitLogCounters.tot_mgr_rescan.incrementAndGet(); getDataSetWatch(path, zkretries); } @@ -1240,7 +1239,7 @@ public class SplitLogManager extends ZooKeeperListener { // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits, // last flushed sequence Id changes when newly assigned RS flushes writes to the region. // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed - // sequence Id name space (sequence Id only valid for a particular RS instance), changes + // sequence Id name space (sequence Id only valid for a particular RS instance), changes // when different newly assigned RS flushes the region. // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of // last flushed sequence Id for each failed RS instance. @@ -1433,7 +1432,7 @@ public class SplitLogManager extends ZooKeeperListener { // master should spawn both a manager and a worker thread to guarantee // that there is always one worker in the system if (tot > 0 && !found_assigned_task && - ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) > + ((EnvironmentEdgeManager.currentTimeMillis() - lastTaskCreateTime) > unassignedTimeout)) { for (Map.Entry e : tasks.entrySet()) { String path = e.getKey(); @@ -1530,6 +1529,17 @@ public class SplitLogManager extends ZooKeeperListener { */ class GetDataAsyncCallback implements AsyncCallback.DataCallback { private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class); + private boolean completeTaskOnNoNode; + + /** + * @param completeTaskOnNoNode Complete the task if the znode cannot be found. + * Since in-memory task creation and znode creation are not atomic, there might be + * a race where there is a task in memory but the znode is not created yet (TimeoutMonitor). + * In this case completeTaskOnNoNode should be set to false. See HBASE-11217. + */ + public GetDataAsyncCallback(boolean completeTaskOnNoNode) { + this.completeTaskOnNoNode = completeTaskOnNoNode; + } @Override public void processResult(int rc, String path, Object ctx, byte[] data, @@ -1541,14 +1551,16 @@ public class SplitLogManager extends ZooKeeperListener { } if (rc == KeeperException.Code.NONODE.intValue()) { SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet(); - // The task znode has been deleted. Must be some pending delete - // that deleted the task. Assume success because a task-znode is - // is only deleted after TaskFinisher is successful. LOG.warn("task znode " + path + " vanished."); - try { - getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); - } catch (DeserializationException e) { - LOG.warn("Deserialization problem", e); + if (completeTaskOnNoNode) { + // The task znode has been deleted. Must be some pending delete + // that deleted the task. Assume success because a task-znode is + // is only deleted after TaskFinisher is successful. + try { + getDataSetWatchSuccess(path, null, Integer.MIN_VALUE); + } catch (DeserializationException e) { + LOG.warn("Deserialization problem", e); + } } return; } @@ -1698,7 +1710,7 @@ public class SplitLogManager extends ZooKeeperListener { TerminationStatus(String msg) { statusMsg = msg; } - + @Override public String toString() { return statusMsg;