diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java new file mode 100644 index 0000000000..70fd66d8d1 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience; + +/* + * This is distributed FS oriented implementation for WALIdentity + */ +@InterfaceAudience.Private +public class FSWALIdentity implements WALIdentity{ + private String name; + private Path path; + + public FSWALIdentity(String name) { + this.path = new Path(name); + this.name = path.getName(); + } + + public FSWALIdentity(Path path) { + this.path = path; + if (path !=null) { + this.name = path.getName(); + } + } + + @Override + public String getName() { + return name; + } + + /** + * @return {@link Path} object of the name encapsulated in WALIdentity + */ + public Path getPath() { + return path; + } + + @Override + public int compareTo(WALIdentity o) { + FSWALIdentity that = (FSWALIdentity)o; + return this.path.compareTo(that.getPath()); + } + + @Override + public String toString() { + return this.path.toString(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof FSWALIdentity)) { + return false; + } + FSWALIdentity that = (FSWALIdentity) obj; + return this.path.equals(that.getPath()); + } + @Override + public int hashCode() { + return this.path.hashCode(); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java new file mode 100644 index 0000000000..fa7d2fa041 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * This interface defines the identification of WAL for both stream based and distributed FileSystem + * based environments. + * See {@link #getName()} method. + */ +@InterfaceAudience.Private +public interface WALIdentity extends Comparable { + + /** + * WALIdentity is uniquely identifying a WAL stored in this WALProvider. + * This name can be thought of as a human-readable, serialized form of the WALIdentity. + * + * The same value should be returned across calls to this method. + * + * @return name of the wal + */ + String getName(); +} diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon index 7dc1c7f663..e1aceea0c7 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/ReplicationStatusTmpl.jamon @@ -71,7 +71,7 @@ <% entry.getValue().getPeerId() %> <% entry.getValue().getWalGroup() %> - <% entry.getValue().getCurrentPath() %> + <% entry.getValue().getCurrentWalId() %> <% StringUtils.humanSize(entry.getValue().getFileSize()) %> <% entry.getValue().getQueueSize() %> <% StringUtils.humanSize(entry.getValue().getCurrentPosition()) %> @@ -96,7 +96,7 @@ <% entry.getValue().getPeerId() %> <% entry.getValue().getWalGroup() %> - <% entry.getValue().getCurrentPath() %> + <% entry.getValue().getCurrentWalId() %> <% StringUtils.humanTimeDiff(entry.getValue().getAgeOfLastShippedOp()) %> <% entry.getValue().getReplicationDelay() == Long.MAX_VALUE ? "UNKNOWN" : StringUtils.humanTimeDiff(entry.getValue().getReplicationDelay()) %> diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java index b2fa7ca477..436f3c7dca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java @@ -21,10 +21,10 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -92,18 +92,18 @@ public interface WALObserver { /** * Called before rolling the current WAL - * @param oldPath the path of the current wal that we are replacing - * @param newPath the path of the wal we are going to create + * @param oldPath the identity of the current wal that we are replacing + * @param newPath the identity of the wal we are going to create */ default void preWALRoll(ObserverContext ctx, - Path oldPath, Path newPath) throws IOException {} + WALIdentity oldPath, WALIdentity newPath) throws IOException {} /** * Called after rolling the current WAL - * @param oldPath the path of the wal that we replaced - * @param newPath the path of the wal we have created and now is the current + * @param oldPath the identity of the wal that we replaced + * @param newPath the identity of the wal we have created and now is the current */ default void postWALRoll(ObserverContext ctx, - Path oldPath, Path newPath) throws IOException {} + WALIdentity oldPath, WALIdentity newPath) throws IOException {} } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 7915ac3cef..ab58b6700c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -65,9 +65,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALPrettyPrinter; import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; @@ -544,11 +546,11 @@ public abstract class AbstractFSWAL implements WAL { */ private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) throws IOException { - coprocessorHost.preWALRoll(oldPath, newPath); + coprocessorHost.preWALRoll(new FSWALIdentity(oldPath), new FSWALIdentity(newPath)); if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { - i.preLogRoll(oldPath, newPath); + i.preLogRoll(new FSWALIdentity(oldPath), new FSWALIdentity(newPath)); } } } @@ -560,11 +562,11 @@ public abstract class AbstractFSWAL implements WAL { throws IOException { if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { - i.postLogRoll(oldPath, newPath); + i.postLogRoll(new FSWALIdentity(oldPath), new FSWALIdentity(newPath)); } } - coprocessorHost.postWALRoll(oldPath, newPath); + coprocessorHost.postWALRoll(new FSWALIdentity(oldPath), new FSWALIdentity(newPath)); } // public only until class moves to o.a.h.h.wal @@ -650,7 +652,7 @@ public abstract class AbstractFSWAL implements WAL { // Tell our listeners that a log is going to be archived. if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { - i.preLogArchive(p, newPath); + i.preLogArchive(new FSWALIdentity(p), new FSWALIdentity(newPath)); } } LOG.info("Archiving " + p + " to " + newPath); @@ -660,7 +662,7 @@ public abstract class AbstractFSWAL implements WAL { // Tell our listeners that a log has been archived. if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { - i.postLogArchive(p, newPath); + i.postLogArchive(new FSWALIdentity(p), new FSWALIdentity(newPath)); } } } @@ -836,7 +838,7 @@ public abstract class AbstractFSWAL implements WAL { // Tell our listeners that a log is going to be archived. if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { - i.preLogArchive(file.getPath(), p); + i.preLogArchive(new FSWALIdentity(file.getPath()), new FSWALIdentity(p)); } } @@ -846,7 +848,7 @@ public abstract class AbstractFSWAL implements WAL { // Tell our listeners that a log was archived. if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { - i.postLogArchive(file.getPath(), p); + i.postLogArchive(new FSWALIdentity(file.getPath()), new FSWALIdentity(p)); } } } @@ -994,11 +996,11 @@ public abstract class AbstractFSWAL implements WAL { * https://issues.apache.org/jira/browse/HBASE-14004 for more details. */ @Override - public OptionalLong getLogFileSizeIfBeingWritten(Path path) { + public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity walId) { rollWriterLock.lock(); try { - Path currentPath = getOldPath(); - if (path.equals(currentPath)) { + FSWALIdentity currentPath = new FSWALIdentity(getOldPath()); + if (walId.equals(currentPath)) { W writer = this.writer; return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty(); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index 13ffac7562..6617f3b131 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; @@ -35,32 +36,32 @@ public interface WALActionsListener { /** * The WAL is going to be rolled. The oldPath can be null if this is * the first log file from the regionserver. - * @param oldPath the path to the old wal - * @param newPath the path to the new wal + * @param oldWalId the identity to the old wal + * @param newWalId the identity to the new wal */ - default void preLogRoll(Path oldPath, Path newPath) throws IOException {} + default void preLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {} /** * The WAL has been rolled. The oldPath can be null if this is * the first log file from the regionserver. - * @param oldPath the path to the old wal - * @param newPath the path to the new wal + * @param oldWalId the identity to the old wal + * @param newWalId the identity to the new wal */ - default void postLogRoll(Path oldPath, Path newPath) throws IOException {} + default void postLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {} /** * The WAL is going to be archived. - * @param oldPath the path to the old wal - * @param newPath the path to the new wal + * @param oldWalId the identity to the old wal + * @param newWalId the identity to the new wal */ - default void preLogArchive(Path oldPath, Path newPath) throws IOException {} + default void preLogArchive(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {} /** * The WAL has been archived. - * @param oldPath the path to the old wal - * @param newPath the path to the new wal + * @param oldWalId the identity to the old wal + * @param newWalId the identity to the new wal */ - default void postLogArchive(Path oldPath, Path newPath) throws IOException {} + default void postLogArchive(WALIdentity oldWalId, WALIdentity newWalId) throws IOException {} /** * A request was made that the WAL be rolled. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java index 40d6d0fc94..de8a1bf66c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; @@ -35,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.WALObserver; import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -170,28 +170,28 @@ public class WALCoprocessorHost /** * Called before rolling the current WAL - * @param oldPath the path of the current wal that we are replacing - * @param newPath the path of the wal we are going to create + * @param oldWalId the identity of the current wal that we are replacing + * @param newWalId the identity of the wal we are going to create */ - public void preWALRoll(Path oldPath, Path newPath) throws IOException { + public void preWALRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { @Override protected void call(WALObserver observer) throws IOException { - observer.preWALRoll(this, oldPath, newPath); + observer.preWALRoll(this, oldWalId, newWalId); } }); } /** * Called after rolling the current WAL - * @param oldPath the path of the wal that we replaced - * @param newPath the path of the wal we have created and now is the current + * @param oldWalId the identity of the wal that we replaced + * @param newWalId the identity of the wal we have created and now is the current */ - public void postWALRoll(Path oldPath, Path newPath) throws IOException { + public void postWALRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { @Override protected void call(WALObserver observer) throws IOException { - observer.postWALRoll(this, oldPath, newPath); + observer.postWALRoll(this, oldWalId, newWalId); } }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index f1bb5387ff..231bf89d8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALIdentity; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,17 +60,17 @@ public class RecoveredReplicationSource extends ReplicationSource { @Override protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId, - PriorityBlockingQueue queue) { + PriorityBlockingQueue queue) { return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); } - public void locateRecoveredPaths(PriorityBlockingQueue queue) throws IOException { + public void locateRecoveredWalIds(PriorityBlockingQueue queue) throws IOException { boolean hasPathChanged = false; - PriorityBlockingQueue newPaths = - new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator()); - pathsLoop: for (Path path : queue) { - if (fs.exists(path)) { // still in same location, don't need to do anything - newPaths.add(path); + PriorityBlockingQueue newWalIds = + new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator()); + pathsLoop: for (WALIdentity walId : queue) { + if (fs.exists(((FSWALIdentity)walId).getPath())) { // still in same location, don't need to do anything + newWalIds.add(walId); continue; } // Path changed - try to find the right path. @@ -76,8 +78,8 @@ public class RecoveredReplicationSource extends ReplicationSource { if (server instanceof ReplicationSyncUp.DummyServer) { // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists - Path newPath = getReplSyncUpPath(path); - newPaths.add(newPath); + Path newPath = getReplSyncUpPath(((FSWALIdentity)walId).getPath()); + newWalIds.add(new FSWALIdentity(newPath)); continue; } else { // See if Path exists in the dead RS folder (there could be a chain of failures @@ -89,27 +91,27 @@ public class RecoveredReplicationSource extends ReplicationSource { final Path deadRsDirectory = new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName .getServerName())); - Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path( - deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) }; + Path[] locs = new Path[] { new Path(deadRsDirectory, walId.getName()), new Path( + deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), walId.getName()) }; for (Path possibleLogLocation : locs) { LOG.info("Possible location " + possibleLogLocation.toUri().toString()); if (manager.getFs().exists(possibleLogLocation)) { // We found the right new location - LOG.info("Log " + path + " still exists at " + possibleLogLocation); - newPaths.add(possibleLogLocation); + LOG.info("Log " + walId + " still exists at " + possibleLogLocation); + newWalIds.add(new FSWALIdentity(possibleLogLocation)); continue pathsLoop; } } } // didn't find a new location LOG.error( - String.format("WAL Path %s doesn't exist and couldn't find its new location", path)); - newPaths.add(path); + String.format("WAL Path %s doesn't exist and couldn't find its new location", walId)); + newWalIds.add(walId); } } if (hasPathChanged) { - if (newPaths.size() != queue.size()) { // this shouldn't happen + if (newWalIds.size() != queue.size()) { // this shouldn't happen LOG.error("Recovery queue size is incorrect"); throw new IOException("Recovery queue size error"); } @@ -117,8 +119,8 @@ public class RecoveredReplicationSource extends ReplicationSource { // since this is a recovered queue with no new incoming logs, // there shouldn't be any concurrency issues queue.clear(); - for (Path path : newPaths) { - queue.add(path); + for (WALIdentity walId : newWalIds) { + queue.add(walId); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index b0d4db087b..dbf1296392 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -19,11 +19,12 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.concurrent.PriorityBlockingQueue; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +41,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper private final ReplicationQueueStorage replicationQueues; public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, - PriorityBlockingQueue queue, RecoveredReplicationSource source, + PriorityBlockingQueue queue, RecoveredReplicationSource source, ReplicationQueueStorage queueStorage) { super(conf, walGroupId, queue, source); this.source = source; @@ -58,7 +59,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper int numRetries = 0; while (numRetries <= maxRetriesMultiplier) { try { - source.locateRecoveredPaths(queue); + source.locateRecoveredWalIds(queue); break; } catch (IOException e) { LOG.error("Error while locating recovered queue paths, attempt #" + numRetries); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 10fa50f553..318f69dcc2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -61,6 +61,8 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALIdentity; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -86,7 +88,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); // Queues of logs to process, entry in format of walGroupId->queue, // each presents a queue for one wal group - private Map> queues = new HashMap<>(); + private Map> queues = new HashMap<>(); // per group queue size, keep no more than this number of logs in each wal group protected int queueSizePerGroup; protected ReplicationQueueStorage queueStorage; @@ -166,10 +168,9 @@ public class ReplicationSource implements ReplicationSourceInterface { this.queueStorage = queueStorage; this.replicationPeer = replicationPeer; this.manager = manager; - this.fs = fs; this.metrics = metrics; this.clusterId = clusterId; - + this.fs = fs; this.queueId = queueId; this.replicationQueueInfo = new ReplicationQueueInfo(queueId); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); @@ -191,9 +192,9 @@ public class ReplicationSource implements ReplicationSourceInterface { } @Override - public void enqueueLog(Path log) { + public void enqueueLog(WALIdentity log) { String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName()); - PriorityBlockingQueue queue = queues.get(logPrefix); + PriorityBlockingQueue queue = queues.get(logPrefix); if (queue == null) { queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise @@ -300,7 +301,7 @@ public class ReplicationSource implements ReplicationSourceInterface { this.walEntryFilter = new ChainWALEntryFilter(filters); } - private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { + private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { @@ -328,9 +329,9 @@ public class ReplicationSource implements ReplicationSourceInterface { int queueSize = queues.get(walGroupId).size(); replicationDelay = ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); - Path currentPath = shipper.getCurrentPath(); + WALIdentity currentPath = shipper.getCurrentWALIdentity(); try { - fileSize = getFileSize(currentPath); + fileSize = getFileSize(((FSWALIdentity)currentPath).getPath()); } catch (IOException e) { LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e); fileSize = -1; @@ -339,7 +340,7 @@ public class ReplicationSource implements ReplicationSourceInterface { statusBuilder.withPeerId(this.getPeerId()) .withQueueSize(queueSize) .withWalGroup(walGroupId) - .withCurrentPath(currentPath) + .withCurrentWalId(currentPath) .withCurrentPosition(shipper.getCurrentPosition()) .withFileSize(fileSize) .withAgeOfLastShippedOp(ageOfLastShippedOp) @@ -361,12 +362,12 @@ public class ReplicationSource implements ReplicationSourceInterface { } protected ReplicationSourceShipper createNewShipper(String walGroupId, - PriorityBlockingQueue queue) { + PriorityBlockingQueue queue) { return new ReplicationSourceShipper(conf, walGroupId, queue, this); } private ReplicationSourceWALReader createNewWALReader(String walGroupId, - PriorityBlockingQueue queue, long startPosition) { + PriorityBlockingQueue queue, long startPosition) { return replicationPeer.getPeerConfig().isSerial() ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); @@ -374,7 +375,7 @@ public class ReplicationSource implements ReplicationSourceInterface { protected final void uncaughtException(Thread t, Throwable e) { RSRpcServices.exitIfOOME(e); - LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e); + LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentWALIdentity(), e); server.abort("Unexpected exception in " + t.getName(), e); } @@ -497,9 +498,9 @@ public class ReplicationSource implements ReplicationSourceInterface { initializeWALEntryFilter(peerClusterId); // start workers - for (Map.Entry> entry : queues.entrySet()) { + for (Map.Entry> entry : queues.entrySet()) { String walGroupId = entry.getKey(); - PriorityBlockingQueue queue = entry.getValue(); + PriorityBlockingQueue queue = entry.getValue(); tryStartNewShipper(walGroupId, queue); } } @@ -593,11 +594,11 @@ public class ReplicationSource implements ReplicationSourceInterface { } @Override - public Path getCurrentPath() { + public WALIdentity getCurrentWALIdentity() { // only for testing for (ReplicationSourceShipper worker : workerThreads.values()) { - if (worker.getCurrentPath() != null) { - return worker.getCurrentPath(); + if (worker.getCurrentWALIdentity() != null) { + return worker.getCurrentWALIdentity(); } } return null; @@ -611,10 +612,10 @@ public class ReplicationSource implements ReplicationSourceInterface { /** * Comparator used to compare logs together based on their start time */ - public static class LogsComparator implements Comparator { + public static class LogsComparator implements Comparator { @Override - public int compare(Path o1, Path o2) { + public int compare(WALIdentity o1, WALIdentity o2) { return Long.compare(getTS(o1), getTS(o2)); } @@ -628,7 +629,7 @@ public class ReplicationSource implements ReplicationSourceInterface { * @param p path to split * @return start time */ - private static long getTS(Path p) { + private static long getTS(WALIdentity p) { return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName()); } } @@ -642,7 +643,7 @@ public class ReplicationSource implements ReplicationSourceInterface { String walGroupId = entry.getKey(); ReplicationSourceShipper worker = entry.getValue(); long position = worker.getCurrentPosition(); - Path currentPath = worker.getCurrentPath(); + WALIdentity currentPath = worker.getCurrentWALIdentity(); sb.append("walGroup [").append(walGroupId).append("]: "); if (currentPath != null) { sb.append("currently replicating from: ").append(currentPath).append(" at position: ") diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index df7a8cc7b2..3058fcc5a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; /** @@ -60,7 +61,7 @@ public interface ReplicationSourceInterface { * Add a log to the list of logs to replicate * @param log path to the log to replicate */ - void enqueueLog(Path log); + void enqueueLog(WALIdentity log); /** * Add hfile names to the queue to be replicated. @@ -95,7 +96,7 @@ public interface ReplicationSourceInterface { * Get the current log that's replicated * @return the current log */ - Path getCurrentPath(); + WALIdentity getCurrentWALIdentity(); /** * Get the queue id that the source is replicating to diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 20c1215950..d626967f19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -41,6 +41,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -63,16 +64,17 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Sets; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.collect.Sets; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; - /** * This class is responsible to manage all the replication sources. There are two classes of * sources: @@ -114,7 +116,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there * is already synchronized on {@link #oldsources}. So no need synchronized on * {@link #walsByIdRecoveredQueues}. - *
  • Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.
  • + *
  • Need synchronized on {@link #latestWalIds} to avoid the new open source miss new log.
  • *
  • Need synchronized on {@link #oldsources} to avoid adding recovered source for the * to-be-removed peer.
  • * @@ -148,7 +150,7 @@ public class ReplicationSourceManager implements ReplicationListener { private final Configuration conf; private final FileSystem fs; // The paths to the latest log of each wal group, for new coming peers - private final Map latestPaths; + private final Map latestWalIds; // Path to the wals directories private final Path logDir; // Path to the wal archive @@ -216,7 +218,7 @@ public class ReplicationSourceManager implements ReplicationListener { tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); - this.latestPaths = new HashMap<>(); + this.latestWalIds = new HashMap<>(); this.replicationForBulkLoadDataEnabled = conf.getBoolean( HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); @@ -365,22 +367,22 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface addSource(String peerId) throws IOException { ReplicationPeer peer = replicationPeers.getPeer(peerId); ReplicationSourceInterface src = createSource(peerId, peer); - // synchronized on latestPaths to avoid missing the new log - synchronized (this.latestPaths) { + // synchronized on latestWalIds to avoid missing the new log + synchronized (this.latestWalIds) { this.sources.put(peerId, src); Map> walsByGroup = new HashMap<>(); this.walsById.put(peerId, walsByGroup); // Add the latest wal to that source's queue - if (!latestPaths.isEmpty()) { - for (Map.Entry walPrefixAndPath : latestPaths.entrySet()) { - Path walPath = walPrefixAndPath.getValue(); + if (!latestWalIds.isEmpty()) { + for (Map.Entry walPrefixAndId : latestWalIds.entrySet()) { + WALIdentity walId = walPrefixAndId.getValue(); NavigableSet wals = new TreeSet<>(); - wals.add(walPath.getName()); - walsByGroup.put(walPrefixAndPath.getKey(), wals); + wals.add(walId.getName()); + walsByGroup.put(walPrefixAndId.getKey(), wals); // Abort RS and throw exception to make add peer failed abortAndThrowIOExceptionWhenFail( - () -> this.queueStorage.addWAL(server.getServerName(), peerId, walPath.getName())); - src.enqueueLog(walPath); + () -> this.queueStorage.addWAL(server.getServerName(), peerId, walId.getName())); + src.enqueueLog(walId); } } } @@ -417,7 +419,7 @@ public class ReplicationSourceManager implements ReplicationListener { // walsById. ReplicationSourceInterface toRemove; Map> wals = new HashMap<>(); - synchronized (latestPaths) { + synchronized (latestWalIds) { toRemove = sources.put(peerId, src); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); @@ -476,15 +478,15 @@ public class ReplicationSourceManager implements ReplicationListener { " state or config changed. Will close the previous replication source and open a new one"; ReplicationPeer peer = replicationPeers.getPeer(peerId); ReplicationSourceInterface src = createSource(peerId, peer); - // synchronized on latestPaths to avoid missing the new log - synchronized (this.latestPaths) { + // synchronized on latestWalIds to avoid missing the new log + synchronized (this.latestWalIds) { ReplicationSourceInterface toRemove = this.sources.put(peerId, src); if (toRemove != null) { LOG.info("Terminate replication source for " + toRemove.getPeerId()); toRemove.terminate(terminateMessage); } for (NavigableSet walsByGroup : walsById.get(peerId).values()) { - walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); + walsByGroup.forEach(wal -> src.enqueueLog(new FSWALIdentity(new Path(this.logDir, wal)))); } } LOG.info("Startup replication source for " + src.getPeerId()); @@ -505,7 +507,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface replicationSource = createSource(queueId, peer); this.oldsources.add(replicationSource); for (SortedSet walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { - walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal))); + walsByGroup.forEach(wal -> src.enqueueLog(new FSWALIdentity(wal))); } toStartup.add(replicationSource); } @@ -617,7 +619,7 @@ public class ReplicationSourceManager implements ReplicationListener { */ public void logPositionAndCleanOldLogs(ReplicationSourceInterface source, WALEntryBatch entryBatch) { - String fileName = entryBatch.getLastWalPath().getName(); + String fileName = entryBatch.getLastWalId().getName(); interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), source.getQueueId(), fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds())); cleanOldLogs(fileName, entryBatch.isEndOfFile(), source); @@ -735,11 +737,11 @@ public class ReplicationSourceManager implements ReplicationListener { // public because of we call it in TestReplicationEmptyWALRecovery @VisibleForTesting - public void preLogRoll(Path newLog) throws IOException { + public void preLogRoll(WALIdentity newLog) throws IOException { String logName = newLog.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); - // synchronized on latestPaths to avoid the new open source miss the new log - synchronized (this.latestPaths) { + // synchronized on latestWalIds to avoid the new open source miss the new log + synchronized (this.latestWalIds) { // Add log to queue storage for (ReplicationSourceInterface source : this.sources.values()) { // If record log to queue storage failed, abort RS and throw exception to make log roll @@ -778,14 +780,14 @@ public class ReplicationSourceManager implements ReplicationListener { } } - // Add to latestPaths - latestPaths.put(logPrefix, newLog); + // Add to latestWalIds + latestWalIds.put(logPrefix, newLog); } } // public because of we call it in TestReplicationEmptyWALRecovery @VisibleForTesting - public void postLogRoll(Path newLog) throws IOException { + public void postLogRoll(WALIdentity newLog) throws IOException { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources.values()) { source.enqueueLog(newLog); @@ -961,7 +963,7 @@ public class ReplicationSourceManager implements ReplicationListener { } oldsources.add(src); for (String wal : walsSet) { - src.enqueueLog(new Path(oldLogDir, wal)); + src.enqueueLog(new FSWALIdentity(new Path(oldLogDir, wal))); } src.startup(); } @@ -1038,16 +1040,16 @@ public class ReplicationSourceManager implements ReplicationListener { } @VisibleForTesting - int getSizeOfLatestPath() { - synchronized (latestPaths) { - return latestPaths.size(); + int getSizeOfLatestWalId() { + synchronized (latestWalIds) { + return latestWalIds.size(); } } @VisibleForTesting - Set getLastestPath() { - synchronized (latestPaths) { - return Sets.newHashSet(latestPaths.values()); + Set getLastestWalIds() { + synchronized (latestWalIds) { + return Sets.newHashSet(latestWalIds.values()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 5d6198e96e..8ecd5bdc8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; @@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,14 +56,14 @@ public class ReplicationSourceShipper extends Thread { private final Configuration conf; protected final String walGroupId; - protected final PriorityBlockingQueue queue; + protected final PriorityBlockingQueue queue; private final ReplicationSource source; // Last position in the log that we sent to ZooKeeper // It will be accessed by the stats thread so make it volatile private volatile long currentPosition = -1; // Path of the current log - private Path currentPath; + private WALIdentity currentWalId; // Current state of the worker thread private volatile WorkerState state; protected ReplicationSourceWALReader entryReader; @@ -76,7 +76,7 @@ public class ReplicationSourceShipper extends Thread { private final int getEntriesTimeout; public ReplicationSourceShipper(Configuration conf, String walGroupId, - PriorityBlockingQueue queue, ReplicationSource source) { + PriorityBlockingQueue queue, ReplicationSource source) { this.conf = conf; this.walGroupId = walGroupId; this.queue = queue; @@ -269,7 +269,7 @@ public class ReplicationSourceShipper extends Thread { // record on zk, so let's call it. The last wal position maybe zero if end of file is true and // there is no entry in the batch. It is OK because that the queue storage will ignore the zero // position and the file will be removed soon in cleanOldLogs. - if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) || + if (batch.isEndOfFile() || !batch.getLastWalId().equals(currentWalId) || batch.getLastWalPosition() != currentPosition) { source.getSourceManager().logPositionAndCleanOldLogs(source, batch); updated = true; @@ -278,10 +278,10 @@ public class ReplicationSourceShipper extends Thread { // the only exception is for recovered queue, if we reach the end of the queue, then there will // no more files so here the currentPath may be null. if (batch.isEndOfFile()) { - currentPath = entryReader.getCurrentPath(); + currentWalId = entryReader.getCurrentWalId(); currentPosition = 0L; } else { - currentPath = batch.getLastWalPath(); + currentWalId = batch.getLastWalId(); currentPosition = batch.getLastWalPosition(); } return updated; @@ -293,8 +293,8 @@ public class ReplicationSourceShipper extends Thread { name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); } - Path getCurrentPath() { - return entryReader.getCurrentPath(); + WALIdentity getCurrentWALIdentity() { + return entryReader.getCurrentWalId(); } long getCurrentPosition() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java index 27b25c45a7..6c1c0b89c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java @@ -19,11 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; @@ -46,13 +46,13 @@ class ReplicationSourceWALActionListener implements WALActionsListener { } @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { - manager.preLogRoll(newPath); + public void preLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException { + manager.preLogRoll(newWalId); } @Override - public void postLogRoll(Path oldPath, Path newPath) throws IOException { - manager.postLogRoll(newPath); + public void postLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException { + manager.postLogRoll(newWalId); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index b3bdb02940..a0b2ecd10f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -28,15 +28,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -55,7 +56,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript class ReplicationSourceWALReader extends Thread { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); - private final PriorityBlockingQueue logQueue; + private final PriorityBlockingQueue logQueue; private final FileSystem fs; private final Configuration conf; private final WALEntryFilter filter; @@ -89,7 +90,7 @@ class ReplicationSourceWALReader extends Thread { * @param source replication source */ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, - PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, + PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, ReplicationSource source) { this.logQueue = logQueue; this.currentPosition = startPosition; @@ -181,29 +182,29 @@ class ReplicationSourceWALReader extends Thread { batch.getNbEntries() >= replicationBatchCountCapacity; } - protected static final boolean switched(WALEntryStream entryStream, Path path) { - Path newPath = entryStream.getCurrentPath(); - return newPath == null || !path.getName().equals(newPath.getName()); + protected static final boolean switched(WALEntryStream entryStream, WALIdentity walId) { + WALIdentity newWalId = entryStream.getCurrentWalIdentity(); + return newWalId == null || !walId.equals(newWalId); } protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { - Path currentPath = entryStream.getCurrentPath(); + WALIdentity walId = entryStream.getCurrentWalIdentity(); if (!entryStream.hasNext()) { // check whether we have switched a file - if (currentPath != null && switched(entryStream, currentPath)) { - return WALEntryBatch.endOfFile(currentPath); + if (walId != null && switched(entryStream, walId)) { + return WALEntryBatch.endOfFile(walId); } else { return null; } } - if (currentPath != null) { - if (switched(entryStream, currentPath)) { - return WALEntryBatch.endOfFile(currentPath); + if (walId != null) { + if (switched(entryStream, walId)) { + return WALEntryBatch.endOfFile(walId); } } else { // when reading from the entry stream first time we will enter here - currentPath = entryStream.getCurrentPath(); + walId = entryStream.getCurrentWalIdentity(); } WALEntryBatch batch = createBatch(entryStream); for (;;) { @@ -217,7 +218,7 @@ class ReplicationSourceWALReader extends Thread { } boolean hasNext = entryStream.hasNext(); // always return if we have switched to a new file - if (switched(entryStream, currentPath)) { + if (switched(entryStream, walId)) { batch.setEndOfFile(true); break; } @@ -248,7 +249,7 @@ class ReplicationSourceWALReader extends Thread { if ((e instanceof EOFException || e.getCause() instanceof EOFException) && logQueue.size() > 1 && this.eofAutoRecovery) { try { - if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { + if (fs.getFileStatus(((FSWALIdentity)logQueue.peek()).getPath()).getLen() == 0) { LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); logQueue.remove(); currentPosition = 0; @@ -259,11 +260,11 @@ class ReplicationSourceWALReader extends Thread { } } - public Path getCurrentPath() { - // if we've read some WAL entries, get the Path we read from + public WALIdentity getCurrentWalId() { + // if we've read some WAL entries, get the walId we read from WALEntryBatch batchQueueHead = entryBatchQueue.peek(); if (batchQueueHead != null) { - return batchQueueHead.getLastWalPath(); + return batchQueueHead.getLastWalId(); } // otherwise, we must be currently reading from the head of the log queue return logQueue.peek(); @@ -280,7 +281,7 @@ class ReplicationSourceWALReader extends Thread { } protected final WALEntryBatch createBatch(WALEntryStream entryStream) { - return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); + return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentWalIdentity()); } protected final Entry filterEntry(Entry entry) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java index 10d6cd59d4..bd0d83accf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java @@ -17,14 +17,14 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public final class ReplicationStatus { private final String peerId; private final String walGroup; - private final Path currentPath; + private final WALIdentity currentWalId; private final int queueSize; private final long ageOfLastShippedOp; private final long replicationDelay; @@ -34,7 +34,7 @@ public final class ReplicationStatus { private ReplicationStatus(ReplicationStatusBuilder builder) { this.peerId = builder.peerId; this.walGroup = builder.walGroup; - this.currentPath = builder.currentPath; + this.currentWalId = builder.currentWalId; this.queueSize = builder.queueSize; this.ageOfLastShippedOp = builder.ageOfLastShippedOp; this.replicationDelay = builder.replicationDelay; @@ -70,8 +70,8 @@ public final class ReplicationStatus { return replicationDelay; } - public Path getCurrentPath() { - return currentPath; + public WALIdentity getCurrentWalId() { + return currentWalId; } public static ReplicationStatusBuilder newBuilder() { @@ -81,7 +81,7 @@ public final class ReplicationStatus { public static class ReplicationStatusBuilder { private String peerId = "UNKNOWN"; private String walGroup = "UNKNOWN"; - private Path currentPath = new Path("UNKNOWN"); + private WALIdentity currentWalId = null; private int queueSize = -1; private long ageOfLastShippedOp = -1; private long replicationDelay = -1; @@ -103,8 +103,8 @@ public final class ReplicationStatus { return this; } - public ReplicationStatusBuilder withCurrentPath(Path currentPath) { - this.currentPath = currentPath; + public ReplicationStatusBuilder withCurrentWalId(WALIdentity currentWalId) { + this.currentWalId = currentWalId; return this; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index 9edcc8a17a..5f33e73d95 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -21,11 +21,11 @@ import java.io.IOException; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; /** @@ -44,7 +44,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader private final SerialReplicationChecker checker; public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf, - PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, + PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, ReplicationSource source) { super(fs, conf, logQueue, startPosition, filter, source); checker = new SerialReplicationChecker(conf, source); @@ -53,22 +53,22 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader @Override protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { - Path currentPath = entryStream.getCurrentPath(); + WALIdentity currentWalId = entryStream.getCurrentWalIdentity(); if (!entryStream.hasNext()) { // check whether we have switched a file - if (currentPath != null && switched(entryStream, currentPath)) { - return WALEntryBatch.endOfFile(currentPath); + if (currentWalId != null && switched(entryStream, currentWalId)) { + return WALEntryBatch.endOfFile(currentWalId); } else { return null; } } - if (currentPath != null) { - if (switched(entryStream, currentPath)) { - return WALEntryBatch.endOfFile(currentPath); + if (currentWalId != null) { + if (switched(entryStream, currentWalId)) { + return WALEntryBatch.endOfFile(currentWalId); } } else { // when reading from the entry stream first time we will enter here - currentPath = entryStream.getCurrentPath(); + currentWalId = entryStream.getCurrentWalIdentity(); } long positionBefore = entryStream.getPosition(); WALEntryBatch batch = createBatch(entryStream); @@ -115,7 +115,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader } boolean hasNext = entryStream.hasNext(); // always return if we have switched to a new file. - if (switched(entryStream, currentPath)) { + if (switched(entryStream, currentWalId)) { batch.setEndOfFile(true); break; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index 22b2de7817..b651a9b17d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -21,8 +21,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; /** @@ -36,7 +36,7 @@ class WALEntryBatch { private List walEntries; // last WAL that was read - private Path lastWalPath; + private WALIdentity lastWalId; // position in WAL of last entry in this batch private long lastWalPosition = 0; // number of distinct row keys in this batch @@ -51,16 +51,16 @@ class WALEntryBatch { private boolean endOfFile; /** - * @param lastWalPath Path of the WAL the last entry in this batch was read from + * @param lastWalId of the WAL the last entry in this batch was read from */ - WALEntryBatch(int maxNbEntries, Path lastWalPath) { + WALEntryBatch(int maxNbEntries, WALIdentity lastWalId) { this.walEntries = new ArrayList<>(maxNbEntries); - this.lastWalPath = lastWalPath; + this.lastWalId = lastWalId; } - static WALEntryBatch endOfFile(Path lastWalPath) { - WALEntryBatch batch = new WALEntryBatch(0, lastWalPath); + static WALEntryBatch endOfFile(WALIdentity lastWalId) { + WALEntryBatch batch = new WALEntryBatch(0, lastWalId); batch.setLastWalPosition(-1L); batch.setEndOfFile(true); return batch; @@ -78,10 +78,10 @@ class WALEntryBatch { } /** - * @return the path of the last WAL that was read. + * @return the Id of the last WAL that was read. */ - public Path getLastWalPath() { - return lastWalPath; + public WALIdentity getLastWalId() { + return lastWalId; } /** @@ -160,7 +160,7 @@ class WALEntryBatch { @Override public String toString() { - return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath + + return "WALEntryBatch [walEntries=" + walEntries + ", lastWalId=" + lastWalId + ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" + nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" + endOfFile + "]"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 0393af4970..c16f251998 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -35,7 +35,9 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -43,8 +45,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually - * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it + * Streaming access to WAL entries. This class is given a queue of WAL {@link WALIdentity}, and continually + * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Wal, it * dequeues it and starts reading from the next. */ @InterfaceAudience.Private @@ -53,7 +55,7 @@ class WALEntryStream implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class); private Reader reader; - private Path currentPath; + private WALIdentity currentWAlIdentity; // cache of next entry for hasNext() private Entry currentEntry; // position for the current entry. As now we support peek, which means that the upper layer may @@ -62,7 +64,7 @@ class WALEntryStream implements Closeable { private long currentPositionOfEntry = 0; // position after reading current entry private long currentPositionOfReader = 0; - private final PriorityBlockingQueue logQueue; + private final PriorityBlockingQueue logQueue; private final FileSystem fs; private final Configuration conf; private final WALFileLengthProvider walFileLengthProvider; @@ -72,7 +74,7 @@ class WALEntryStream implements Closeable { /** * Create an entry stream over the given queue at the given start position - * @param logQueue the queue of WAL paths + * @param logQueue the queue of WAL walIds * @param fs {@link FileSystem} to use to create {@link Reader} for this stream * @param conf {@link Configuration} to use to create {@link Reader} for this stream * @param startPosition the position in the first WAL to start reading at @@ -80,7 +82,7 @@ class WALEntryStream implements Closeable { * @param metrics replication metrics * @throws IOException */ - public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs, Configuration conf, + public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs, Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, MetricsSource metrics) throws IOException { this.logQueue = logQueue; @@ -135,16 +137,16 @@ class WALEntryStream implements Closeable { } /** - * @return the {@link Path} of the current WAL + * @return the {@link WALIdentity} of the current WAL */ - public Path getCurrentPath() { - return currentPath; + public WALIdentity getCurrentWalIdentity() { + return currentWAlIdentity; } - private String getCurrentPathStat() { + private String getCurrentWalIdStat() { StringBuilder sb = new StringBuilder(); - if (currentPath != null) { - sb.append("currently replicating from: ").append(currentPath).append(" at position: ") + if (currentWAlIdentity != null) { + sb.append("currently replicating from: ").append(currentWAlIdentity).append(" at position: ") .append(currentPositionOfEntry).append("\n"); } else { sb.append("no replication ongoing, waiting for new log"); @@ -157,7 +159,7 @@ class WALEntryStream implements Closeable { * false) */ public void reset() throws IOException { - if (reader != null && currentPath != null) { + if (reader != null && currentWAlIdentity != null) { resetReader(); } } @@ -166,8 +168,8 @@ class WALEntryStream implements Closeable { currentPositionOfEntry = position; } - private void setCurrentPath(Path path) { - this.currentPath = path; + private void setCurrentWalId(WALIdentity walId) { + this.currentWAlIdentity = walId; } private void tryAdvanceEntry() throws IOException { @@ -203,10 +205,10 @@ class WALEntryStream implements Closeable { final long trailerSize = currentTrailerSize(); FileStatus stat = null; try { - stat = fs.getFileStatus(this.currentPath); + stat = fs.getFileStatus(((FSWALIdentity)this.currentWAlIdentity).getPath()); } catch (IOException exception) { LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", - currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat()); + currentWAlIdentity, trailerSize < 0 ? "was not" : "was", getCurrentWalIdStat()); metrics.incrUnknownFileLengthForClosedWAL(); } // Here we use currentPositionOfReader instead of currentPositionOfEntry. @@ -222,7 +224,7 @@ class WALEntryStream implements Closeable { LOG.debug( "Reached the end of WAL file '{}'. It was not closed cleanly," + " so we did not parse {} bytes of data. This is normally ok.", - currentPath, skippedBytes); + currentWAlIdentity, skippedBytes); metrics.incrUncleanlyClosedWALs(); metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); } @@ -230,7 +232,7 @@ class WALEntryStream implements Closeable { LOG.warn( "Processing end of WAL file '{}'. At position {}, which is too far away from" + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", - currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); + currentWAlIdentity, currentPositionOfReader, stat.getLen(), getCurrentWalIdStat()); setPosition(0); resetReader(); metrics.incrRestartedWALReading(); @@ -239,7 +241,7 @@ class WALEntryStream implements Closeable { } } if (LOG.isTraceEnabled()) { - LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + + LOG.trace("Reached the end of log " + this.currentWAlIdentity + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen())); } metrics.incrCompletedWAL(); @@ -247,7 +249,7 @@ class WALEntryStream implements Closeable { } private void dequeueCurrentLog() throws IOException { - LOG.debug("Reached the end of log {}", currentPath); + LOG.debug("Reached the end of log {}", currentWAlIdentity); closeReader(); logQueue.remove(); setPosition(0); @@ -260,12 +262,12 @@ class WALEntryStream implements Closeable { private boolean readNextEntryAndRecordReaderPosition() throws IOException { Entry readEntry = reader.next(); long readerPos = reader.getPosition(); - OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); + OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentWAlIdentity); if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted // data, so we need to make sure that we do not read beyond the committed file length. if (LOG.isDebugEnabled()) { - LOG.debug("The provider tells us the valid length for " + currentPath + " is " + + LOG.debug("The provider tells us the valid length for " + currentWAlIdentity + " is " + fileLength.getAsLong() + ", but we have advanced to " + readerPos); } resetReader(); @@ -297,16 +299,16 @@ class WALEntryStream implements Closeable { // open a reader on the next log in queue private boolean openNextLog() throws IOException { - Path nextPath = logQueue.peek(); - if (nextPath != null) { - openReader(nextPath); + WALIdentity nextWalId = logQueue.peek(); + if (nextWalId != null) { + openReader((FSWALIdentity)nextWalId); if (reader != null) { return true; } } else { // no more files in queue, this could happen for recovered queue, or for a wal group of a sync // replication peer which has already been transited to DA or S. - setCurrentPath(null); + setCurrentWalId(null); } return false; } @@ -336,38 +338,38 @@ class WALEntryStream implements Closeable { return path; } - private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { + private void handleFileNotFound(FSWALIdentity walId, FileNotFoundException fnfe) throws IOException { // If the log was archived, continue reading from there - Path archivedLog = getArchivedLog(path); - if (!path.equals(archivedLog)) { + FSWALIdentity archivedLog = new FSWALIdentity(getArchivedLog(walId.getPath())); + if (!walId.equals(archivedLog)) { openReader(archivedLog); } else { throw fnfe; } } - private void openReader(Path path) throws IOException { + private void openReader(FSWALIdentity walId) throws IOException { try { // Detect if this is a new file, if so get a new reader else // reset the current reader so that we see the new data - if (reader == null || !getCurrentPath().equals(path)) { + if (reader == null || !getCurrentWalIdentity().equals(walId)) { closeReader(); - reader = WALFactory.createReader(fs, path, conf); + reader = WALFactory.createReader(fs, walId.getPath(), conf); seek(); - setCurrentPath(path); + setCurrentWalId(walId); } else { resetReader(); } } catch (FileNotFoundException fnfe) { - handleFileNotFound(path, fnfe); + handleFileNotFound(walId, fnfe); } catch (RemoteException re) { IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); if (!(ioe instanceof FileNotFoundException)) throw ioe; - handleFileNotFound(path, (FileNotFoundException)ioe); + handleFileNotFound(walId, (FileNotFoundException)ioe); } catch (LeaseNotRecoveredException lnre) { // HBASE-15019 the WAL was not closed due to some hiccup. - LOG.warn("Try to recover the WAL lease " + currentPath, lnre); - recoverLease(conf, currentPath); + LOG.warn("Try to recover the WAL lease " + currentWAlIdentity, lnre); + recoverLease(conf, ((FSWALIdentity)currentWAlIdentity).getPath()); reader = null; } catch (NullPointerException npe) { // Workaround for race condition in HDFS-4380 @@ -402,8 +404,9 @@ class WALEntryStream implements Closeable { seek(); } catch (FileNotFoundException fnfe) { // If the log was archived, continue reading from there - Path archivedLog = getArchivedLog(currentPath); - if (!currentPath.equals(archivedLog)) { + FSWALIdentity archivedLog = + new FSWALIdentity(getArchivedLog(((FSWALIdentity) currentWAlIdentity).getPath())); + if (!currentWAlIdentity.equals(archivedLog)) { openReader(archivedLog); } else { throw fnfe; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java index 010fa69005..d0b63cc244 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.OptionalLong; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; /** @@ -30,5 +30,5 @@ import org.apache.yetus.audience.InterfaceAudience; @FunctionalInterface public interface WALFileLengthProvider { - OptionalLong getLogFileSizeIfBeingWritten(Path path); + OptionalLong getLogFileSizeIfBeingWritten(WALIdentity walId); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 75439fe6c5..ad9f6bda30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -63,7 +63,7 @@ class DisabledWALProvider implements WALProvider { if (null == providerId) { providerId = "defaultDisabled"; } - disabled = new DisabledWAL(new Path(FSUtils.getWALRootDir(conf), providerId), conf, null); + disabled = new DisabledWAL(new FSWALIdentity(new Path(FSUtils.getWALRootDir(conf), providerId)), conf, null); } @Override @@ -90,14 +90,14 @@ class DisabledWALProvider implements WALProvider { private static class DisabledWAL implements WAL { protected final List listeners = new CopyOnWriteArrayList<>(); - protected final Path path; + protected final FSWALIdentity walId; protected final WALCoprocessorHost coprocessorHost; protected final AtomicBoolean closed = new AtomicBoolean(false); - public DisabledWAL(final Path path, final Configuration conf, + public DisabledWAL(final FSWALIdentity walId, final Configuration conf, final List listeners) { this.coprocessorHost = new WALCoprocessorHost(this, conf); - this.path = path; + this.walId = walId; if (null != listeners) { for(WALActionsListener listener : listeners) { registerWALActionsListener(listener); @@ -123,14 +123,14 @@ class DisabledWALProvider implements WALProvider { } for (WALActionsListener listener : listeners) { try { - listener.preLogRoll(path, path); + listener.preLogRoll(walId, walId); } catch (IOException exception) { LOG.debug("Ignoring exception from listener.", exception); } } for (WALActionsListener listener : listeners) { try { - listener.postLogRoll(path, path); + listener.postLogRoll(walId, walId); } catch (IOException exception) { LOG.debug("Ignoring exception from listener.", exception); } @@ -243,7 +243,7 @@ class DisabledWALProvider implements WALProvider { } @Override - public OptionalLong getLogFileSizeIfBeingWritten(Path path) { + public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity path) { return OptionalLong.empty(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index 7cd39ea788..b02a4d30fd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -27,7 +28,6 @@ import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java index 1da31dadb0..e82ccc2735 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java @@ -24,13 +24,13 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,13 +148,13 @@ public class SampleRegionWALCoprocessor implements WALCoprocessor, RegionCoproce @Override public void preWALRoll(ObserverContext ctx, - Path oldPath, Path newPath) throws IOException { + WALIdentity oldWalId, WALIdentity newWalId) throws IOException { preWALRollCalled = true; } @Override public void postWALRoll(ObserverContext ctx, - Path oldPath, Path newPath) throws IOException { + WALIdentity oldWalId, WALIdentity newWalId) throws IOException { postWALRollCalled = true; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java index ad2b2d48aa..cd96586fc2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -155,7 +156,8 @@ public class TestBlockReorderMultiBlocks { // listen for successful log rolls final WALActionsListener listener = new WALActionsListener() { @Override - public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { + public void postLogRoll(final WALIdentity oldWalId, final WALIdentity newWalId) + throws IOException { latch.countDown(); } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java index 9322c5e5df..8dcca9391a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; @@ -30,6 +29,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -130,12 +130,12 @@ public abstract class AbstractTestLogRollPeriod { private void checkMinLogRolls(final WAL log, final int minRolls) throws Exception { - final List paths = new ArrayList<>(); + final List walIds = new ArrayList(); log.registerWALActionsListener(new WALActionsListener() { @Override - public void postLogRoll(Path oldFile, Path newFile) { - LOG.debug("postLogRoll: oldFile="+oldFile+" newFile="+newFile); - paths.add(newFile); + public void postLogRoll(WALIdentity oldWalId, WALIdentity newWalId) { + LOG.debug("postLogRoll: oldWalId="+oldWalId+" newWalId="+newWalId); + walIds.add(newWalId); } }); @@ -144,13 +144,13 @@ public abstract class AbstractTestLogRollPeriod { Thread.sleep((minRolls + 1) * LOG_ROLL_PERIOD); // Do some extra sleep in case the machine is slow, // and the log-roll is not triggered exactly on LOG_ROLL_PERIOD. - final int NUM_RETRIES = 1 + 8 * (minRolls - paths.size()); - for (int retry = 0; paths.size() < minRolls && retry < NUM_RETRIES; ++retry) { + final int NUM_RETRIES = 1 + 8 * (minRolls - walIds.size()); + for (int retry = 0; walIds.size() < minRolls && retry < NUM_RETRIES; ++retry) { Thread.sleep(LOG_ROLL_PERIOD / 4); } wtime = System.currentTimeMillis() - wtime; LOG.info(String.format("got %d rolls after %dms (%dms each) - expected at least %d rolls", - paths.size(), wtime, wtime / paths.size(), minRolls)); - assertFalse(paths.size() < minRolls); + walIds.size(), wtime, wtime / walIds.size(), minRolls)); + assertFalse(walIds.size() < minRolls); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index e19361e200..8e7cacf0b9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -52,8 +52,10 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.junit.BeforeClass; @@ -251,21 +253,21 @@ public class TestLogRolling extends AbstractTestLogRolling { server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); final WAL log = server.getWAL(region); - final List paths = new ArrayList<>(1); + final List walIds = new ArrayList<>(1); final List preLogRolledCalled = new ArrayList<>(); - paths.add(AbstractFSWALProvider.getCurrentFileName(log)); + walIds.add(new FSWALIdentity(AbstractFSWALProvider.getCurrentFileName(log))); log.registerWALActionsListener(new WALActionsListener() { @Override - public void preLogRoll(Path oldFile, Path newFile) { + public void preLogRoll(WALIdentity oldFile, WALIdentity newFile) { LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile); preLogRolledCalled.add(new Integer(1)); } @Override - public void postLogRoll(Path oldFile, Path newFile) { - paths.add(newFile); + public void postLogRoll(WALIdentity oldFile, WALIdentity newFile) { + walIds.add(newFile); } }); @@ -315,7 +317,8 @@ public class TestLogRolling extends AbstractTestLogRolling { // read back the data written Set loggedRows = new HashSet<>(); FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration()); - for (Path p : paths) { + for (WALIdentity walId : walIds) { + Path p = ((FSWALIdentity) walId).getPath(); LOG.debug("recovering lease for " + p); fsUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index 0967a756f4..880dea7891 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.After; import org.junit.Before; @@ -142,12 +143,12 @@ public class TestWALActionsListener { public int closedCount = 0; @Override - public void preLogRoll(Path oldFile, Path newFile) { + public void preLogRoll(WALIdentity oldWalId, WALIdentity newWalId) { preLogRollCounter++; } @Override - public void postLogRoll(Path oldFile, Path newFile) { + public void postLogRoll(WALIdentity oldWalId, WALIdentity newWalId) { postLogRollCounter++; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 67f793d628..ed71e6e483 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALIdentity; /** * Source that does nothing at all, helpful to test ReplicationSourceManager @@ -42,7 +44,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { private ReplicationSourceManager manager; private ReplicationPeer replicationPeer; private String peerClusterId; - private Path currentPath; + private WALIdentity currentWalId; private MetricsSource metrics; private WALFileLengthProvider walFileLengthProvider; private AtomicBoolean startup = new AtomicBoolean(false); @@ -60,14 +62,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override - public void enqueueLog(Path log) { - this.currentPath = log; + public void enqueueLog(WALIdentity log) { + this.currentWalId = log; metrics.incrSizeOfLogQueue(); } @Override - public Path getCurrentPath() { - return this.currentPath; + public WALIdentity getCurrentWALIdentity() { + return this.currentWalId; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index 37ca7dc830..c04fe0e27e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -695,7 +696,8 @@ public class TestMasterReplication { // listen for successful log rolls final WALActionsListener listener = new WALActionsListener() { @Override - public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { + public void postLogRoll(final WALIdentity oldWalId, final WALIdentity newWalId) + throws IOException { latch.countDown(); } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index 225ca7f692..6247714e94 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.client.Admin; @@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.BeforeClass; @@ -219,7 +219,8 @@ public class TestMultiSlaveReplication { // listen for successful log rolls final WALActionsListener listener = new WALActionsListener() { @Override - public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { + public void postLogRoll(final WALIdentity oldWalId, final WALIdentity newWalId) + throws IOException { latch.countDown(); } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java index 4effe4149c..cb508209e1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java @@ -32,7 +32,11 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterfa import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -64,12 +68,13 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); Path currentFile = ((AbstractFSWAL) wal).getCurrentFileName(); + WALIdentity walId = new FSWALIdentity(currentFile); Replication replicationService = (Replication) utility1.getHBaseCluster() .getRegionServer(i).getReplicationSourceService(); for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() .getSources()) { ReplicationSource source = (ReplicationSource) rsi; - if (!currentFile.equals(source.getCurrentPath())) { + if (!walId.equals(source.getCurrentWALIdentity())) { return false; } } @@ -97,6 +102,7 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { emptyWalPaths.add(emptyWalPath); } + WALFactory factory = new WALFactory(conf1, "empty-wal-recovery"); // inject our empty wal into the replication queue, and then roll the original wal, which // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to // determine if the file being replicated currently is still opened for write, so just inject a @@ -104,8 +110,10 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { for (int i = 0; i < numRs; i++) { HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i); Replication replicationService = (Replication) hrs.getReplicationSourceService(); - replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i)); - replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i)); + WALIdentity id = new FSWALIdentity( + emptyWalPaths.get(i)); + replicationService.getReplicationManager().preLogRoll(id); + replicationService.getReplicationManager().postLogRoll(id); RegionInfo regionInfo = utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java index 8ff4d84dcd..fae1f618b3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication; import java.util.Map; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Get; @@ -28,6 +27,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; @@ -74,9 +74,9 @@ public class TestReplicationMetricsforUI extends TestReplicationBase { } rs = utility1.getRSForFirstRegionInTable(tableName); metrics = rs.getWalGroupsReplicationStatus(); - Path lastPath = null; + WALIdentity lastPath = null; for (Map.Entry metric : metrics.entrySet()) { - lastPath = metric.getValue().getCurrentPath(); + lastPath = metric.getValue().getCurrentWalId(); Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId()); Assert.assertTrue("age of Last Shipped Op should be > 0 ", metric.getValue().getAgeOfLastShippedOp() > 0); @@ -100,7 +100,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase { Assert.assertEquals("replication delay", 0, metric.getValue().getReplicationDelay()); Assert.assertTrue("current position should < last position", metric.getValue().getCurrentPosition() < lastPosition); - Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentPath()); + Assert.assertNotEquals("current path", lastPath, metric.getValue().getCurrentWalId()); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java index d01a0ac61a..b49646d54b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 274ccabfbe..23202058c1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -57,9 +57,11 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; import org.junit.AfterClass; @@ -301,8 +303,8 @@ public class TestReplicationSource { String walGroupId = "fake-wal-group-id"; ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L); ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); - PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); - queue.put(new Path("/www/html/test")); + PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); + queue.put(new FSWALIdentity(new Path("/www/html/test"))); RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class); Server server = Mockito.mock(Server.class); Mockito.when(server.getServerName()).thenReturn(serverName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 86bbb09d3d..110c61f2d8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -84,9 +84,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -336,7 +338,7 @@ public abstract class TestReplicationSourceManager { when(source.isRecovered()).thenReturn(false); when(source.isSyncReplication()).thenReturn(false); manager.logPositionAndCleanOldLogs(source, - new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); + new WALEntryBatch(0, manager.getSources().get(0).getCurrentWALIdentity())); wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), @@ -551,7 +553,7 @@ public abstract class TestReplicationSourceManager { .map(HRegionServer::getReplicationSourceService) .map(r -> (Replication)r) .map(Replication::getReplicationManager) - .mapToLong(ReplicationSourceManager::getSizeOfLatestPath) + .mapToLong(ReplicationSourceManager::getSizeOfLatestWalId) .sum(); } @@ -571,7 +573,7 @@ public abstract class TestReplicationSourceManager { assertNotNull(source); final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); // Enqueue log and check if metrics updated - source.enqueueLog(new Path("abc")); + source.enqueueLog(new FSWALIdentity((new Path("abc")).toString())); assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); @@ -618,8 +620,9 @@ public abstract class TestReplicationSourceManager { String walNameNotExists = "remoteWAL-12345-" + slaveId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX; Path wal = new Path(logDir, walNameNotExists); - manager.preLogRoll(wal); - manager.postLogRoll(wal); + WALIdentity walId = new FSWALIdentity(wal.toString()); + manager.preLogRoll(walId); + manager.postLogRoll(walId); Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId); fs.mkdirs(remoteLogDirForPeer); @@ -629,8 +632,8 @@ public abstract class TestReplicationSourceManager { new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); fs.create(remoteWAL).close(); wal = new Path(logDir, walName); - manager.preLogRoll(wal); - manager.postLogRoll(wal); + manager.preLogRoll(walId); + manager.postLogRoll(walId); ReplicationSourceInterface source = mockReplicationSource(peerId2); manager.cleanOldLogs(walName, true, source); @@ -648,13 +651,13 @@ public abstract class TestReplicationSourceManager { @Test public void testSameWALPrefix() throws IOException { Set latestWalsBefore = - manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet()); + manager.getLastestWalIds().stream().map(WALIdentity::getName).collect(Collectors.toSet()); String walName1 = "localhost,8080,12345-45678-Peer.34567"; String walName2 = "localhost,8080,12345.56789"; - manager.preLogRoll(new Path(walName1)); - manager.preLogRoll(new Path(walName2)); + manager.preLogRoll(new FSWALIdentity((new Path(walName1)).toString())); + manager.preLogRoll(new FSWALIdentity((new Path(walName2)).toString())); - Set latestWals = manager.getLastestPath().stream().map(Path::getName) + Set latestWals = manager.getLastestWalIds().stream().map(WALIdentity::getName) .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet()); assertEquals(2, latestWals.size()); assertTrue(latestWals.contains(walName1)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index fac6f7481a..d22f96ab7b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -56,10 +55,12 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; @@ -98,7 +99,7 @@ public class TestWALEntryStream { } private WAL log; - PriorityBlockingQueue walQueue; + PriorityBlockingQueue walQueue; private PathWatcher pathWatcher; @Rule @@ -369,7 +370,7 @@ public class TestWALEntryStream { } // start up a reader - Path walPath = walQueue.peek(); + WALIdentity walId = walQueue.peek(); ReplicationSourceWALReader reader = createReader(false, CONF); WALEntryBatch entryBatch = reader.take(); @@ -377,7 +378,7 @@ public class TestWALEntryStream { assertNotNull(entryBatch); assertEquals(3, entryBatch.getWalEntries().size()); assertEquals(position, entryBatch.getLastWalPosition()); - assertEquals(walPath, entryBatch.getLastWalPath()); + assertEquals(walId, entryBatch.getLastWalId()); assertEquals(3, entryBatch.getNbRowKeys()); appendToLog("foo"); @@ -389,7 +390,7 @@ public class TestWALEntryStream { @Test public void testReplicationSourceWALReaderRecovered() throws Exception { appendEntriesToLogAndSync(10); - Path walPath = walQueue.peek(); + WALIdentity walId = walQueue.peek(); log.rollWriter(); appendEntriesToLogAndSync(5); log.shutdown(); @@ -400,18 +401,18 @@ public class TestWALEntryStream { ReplicationSourceWALReader reader = createReader(true, conf); WALEntryBatch batch = reader.take(); - assertEquals(walPath, batch.getLastWalPath()); + assertEquals(walId, batch.getLastWalId()); assertEquals(10, batch.getNbEntries()); assertFalse(batch.isEndOfFile()); batch = reader.take(); - assertEquals(walPath, batch.getLastWalPath()); + assertEquals(walId, batch.getLastWalId()); assertEquals(0, batch.getNbEntries()); assertTrue(batch.isEndOfFile()); - walPath = walQueue.peek(); + walId = walQueue.peek(); batch = reader.take(); - assertEquals(walPath, batch.getLastWalPath()); + assertEquals(walId, batch.getLastWalId()); assertEquals(5, batch.getNbEntries()); assertTrue(batch.isEndOfFile()); @@ -422,49 +423,49 @@ public class TestWALEntryStream { @Test public void testReplicationSourceWALReaderWrongPosition() throws Exception { appendEntriesToLogAndSync(1); - Path walPath = walQueue.peek(); + FSWALIdentity walId = (FSWALIdentity)walQueue.peek(); log.rollWriter(); appendEntriesToLogAndSync(20); TEST_UTIL.waitFor(5000, new ExplainingPredicate() { @Override public boolean evaluate() throws Exception { - return fs.getFileStatus(walPath).getLen() > 0; + return fs.getFileStatus(walId.getPath()).getLen() > 0; } @Override public String explainFailure() throws Exception { - return walPath + " has not been closed yet"; + return walId + " has not been closed yet"; } }); - long walLength = fs.getFileStatus(walPath).getLen(); + long walLength = fs.getFileStatus(walId.getPath()).getLen(); ReplicationSourceWALReader reader = createReader(false, CONF); WALEntryBatch entryBatch = reader.take(); - assertEquals(walPath, entryBatch.getLastWalPath()); + assertEquals(walId, entryBatch.getLastWalId()); assertTrue("Position " + entryBatch.getLastWalPosition() + " is out of range, file length is " + walLength, entryBatch.getLastWalPosition() <= walLength); assertEquals(1, entryBatch.getNbEntries()); assertTrue(entryBatch.isEndOfFile()); - Path walPath2 = walQueue.peek(); + WALIdentity walId2 = walQueue.peek(); entryBatch = reader.take(); - assertEquals(walPath2, entryBatch.getLastWalPath()); + assertEquals(walId2, entryBatch.getLastWalId()); assertEquals(20, entryBatch.getNbEntries()); assertFalse(entryBatch.isEndOfFile()); log.rollWriter(); appendEntriesToLogAndSync(10); entryBatch = reader.take(); - assertEquals(walPath2, entryBatch.getLastWalPath()); + assertEquals(walId2, entryBatch.getLastWalId()); assertEquals(0, entryBatch.getNbEntries()); assertTrue(entryBatch.isEndOfFile()); - Path walPath3 = walQueue.peek(); + WALIdentity walId3 = walQueue.peek(); entryBatch = reader.take(); - assertEquals(walPath3, entryBatch.getLastWalPath()); + assertEquals(walId3, entryBatch.getLastWalId()); assertEquals(10, entryBatch.getNbEntries()); assertFalse(entryBatch.isEndOfFile()); } @@ -484,7 +485,7 @@ public class TestWALEntryStream { } // start up a reader - Path walPath = walQueue.peek(); + WALIdentity walId = walQueue.peek(); ReplicationSource source = mockReplicationSource(false, CONF); AtomicInteger invokeCount = new AtomicInteger(0); AtomicBoolean enabled = new AtomicBoolean(false); @@ -511,7 +512,7 @@ public class TestWALEntryStream { assertNotNull(entryBatch); assertEquals(3, entryBatch.getWalEntries().size()); assertEquals(position, entryBatch.getLastWalPosition()); - assertEquals(walPath, entryBatch.getLastWalPath()); + assertEquals(walId, entryBatch.getLastWalId()); assertEquals(3, entryBatch.getNbRowKeys()); } @@ -577,12 +578,12 @@ public class TestWALEntryStream { class PathWatcher implements WALActionsListener { - Path currentPath; + WALIdentity currentWalId; @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { - walQueue.add(newPath); - currentPath = newPath; + public void preLogRoll(WALIdentity oldWalId, WALIdentity newWalId) throws IOException { + walQueue.add(newWalId); + currentWalId = newWalId; } }