diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index 899c633..f29529b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -391,6 +391,12 @@ public abstract class CommonFSUtils { return p.makeQualified(fs.getUri(), fs.getWorkingDirectory()); } + public static boolean hasWALRootDir(final Configuration c) { + if (c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)) != null) { + return true; + } + return false; + } @VisibleForTesting public static void setWALRootDir(final Configuration c, final Path root) throws IOException { c.set(HBASE_WAL_DIR, root.toString()); @@ -1050,7 +1056,7 @@ public abstract class CommonFSUtils { * Helper exception for those cases where the place where we need to check a stream capability * is not where we have the needed context to explain the impact and mitigation for a lack. */ - public static class StreamLacksCapabilityException extends Exception { + public static class StreamLacksCapabilityException extends IOException { public StreamLacksCapabilityException(String message, Throwable cause) { super(message, cause); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java index b2fa7ca..0b3217d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -96,7 +97,7 @@ public interface WALObserver { * @param newPath the path of the wal we are going to create */ default void preWALRoll(ObserverContext ctx, - Path oldPath, Path newPath) throws IOException {} + WALIdentity oldPath, WALIdentity newPath) throws IOException {} /** * Called after rolling the current WAL @@ -104,6 +105,6 @@ public interface WALObserver { * @param newPath the path of the wal we have created and now is the current */ default void postWALRoll(ObserverContext ctx, - Path oldPath, Path newPath) throws IOException {} + WALIdentity oldPath, WALIdentity newPath) throws IOException {} } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c10a824..7e5c8c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -174,6 +174,7 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; import org.apache.hadoop.io.MultipleIOException; @@ -327,6 +328,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final LongAdder flushesQueued = new LongAdder(); private final WAL wal; + private final WALProvider walProvider; private final HRegionFileSystem fs; protected final Configuration conf; private final Configuration baseConf; @@ -778,6 +780,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.rsServices = rsServices; setHTableSpecificConf(); this.scannerReadPoints = new ConcurrentHashMap<>(); + try { + WALFactory factory = new WALFactory(conf, getRegionInfo().getEncodedName()); + this.walProvider = factory.getWALProvider(); + } catch (IOException ioe) { + throw new IllegalArgumentException("cannot construct WALFactory for " + + getRegionInfo().getRegionNameAsString(), ioe); + } this.busyWaitDuration = conf.getLong( "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION); @@ -4699,7 +4708,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Opening recovered edits"); WAL.Reader reader = null; try { - reader = WALFactory.createReader(fs, edits, conf); + reader = walProvider.createReader(walProvider.createWALIdentity(edits.toString()), null, + true); long currentEditSeqId = -1; long currentReplaySeqId = -1; long firstSeqIdInLog = -1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 4a9712c..7761ec0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -77,17 +77,19 @@ public class SplitLogWorker implements Runnable { // thread pool which executes recovery work private SplitLogWorkerCoordination coordination; private RegionServerServices server; + private WALFactory factory; public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, - TaskExecutor splitTaskExecutor) { + WALFactory factory, TaskExecutor splitTaskExecutor) { this.server = server; + this.factory = factory; this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination(); coordination.init(server, conf, splitTaskExecutor, this); } public SplitLogWorker(Configuration conf, RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) { - this(server, conf, server, (f, p) -> splitLog(f, p, conf, server, sequenceIdChecker, factory)); + this(server, conf, server, factory, (f, p) -> splitLog(f, p, conf, server, sequenceIdChecker, factory)); } // returns whether we need to continue the split work @@ -175,7 +177,8 @@ public class SplitLogWorker implements Runnable { // interrupted or has encountered a transient error and when it has // encountered a bad non-retry-able persistent error. try { - if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf, + if (!WALSplitter.splitLogFile(walDir, factory.getWALProvider().createWALIdentity( + new Path(walDir, name).toString()), fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) { return Status.PREEMPTED; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 7915ac3..7999fc5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -65,9 +65,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALPrettyPrinter; import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; @@ -542,7 +544,7 @@ public abstract class AbstractFSWAL implements WAL { /** * Tell listeners about pre log roll. */ - private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) + private void tellListenersAboutPreLogRoll(final WALIdentity oldPath, final WALIdentity newPath) throws IOException { coprocessorHost.preWALRoll(oldPath, newPath); @@ -556,7 +558,7 @@ public abstract class AbstractFSWAL implements WAL { /** * Tell listeners about post log roll. */ - private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) + private void tellListenersAboutPostLogRoll(final WALIdentity oldPath, final WALIdentity newPath) throws IOException { if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { @@ -650,7 +652,7 @@ public abstract class AbstractFSWAL implements WAL { // Tell our listeners that a log is going to be archived. if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { - i.preLogArchive(p, newPath); + i.preLogArchive(new FSWALIdentity(p), new FSWALIdentity(newPath)); } } LOG.info("Archiving " + p + " to " + newPath); @@ -660,7 +662,7 @@ public abstract class AbstractFSWAL implements WAL { // Tell our listeners that a log has been archived. if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { - i.postLogArchive(p, newPath); + i.postLogArchive(new FSWALIdentity(p), new FSWALIdentity(newPath)); } } } @@ -760,10 +762,11 @@ public abstract class AbstractFSWAL implements WAL { Path newPath = getNewPath(); // Any exception from here on is catastrophic, non-recoverable so we currently abort. W nextWriter = this.createWriterInstance(newPath); - tellListenersAboutPreLogRoll(oldPath, newPath); + tellListenersAboutPreLogRoll(new FSWALIdentity(oldPath), new FSWALIdentity(newPath)); // NewPath could be equal to oldPath if replaceWriter fails. newPath = replaceWriter(oldPath, newPath, nextWriter); - tellListenersAboutPostLogRoll(oldPath, newPath); + tellListenersAboutPostLogRoll(new FSWALIdentity(oldPath), + new FSWALIdentity(newPath)); if (LOG.isDebugEnabled()) { LOG.debug("Create new " + implClassName + " writer with pipeline: " + Arrays.toString(getPipeline())); @@ -836,7 +839,7 @@ public abstract class AbstractFSWAL implements WAL { // Tell our listeners that a log is going to be archived. if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { - i.preLogArchive(file.getPath(), p); + i.preLogArchive(new FSWALIdentity(file.getPath()), new FSWALIdentity(p)); } } @@ -846,7 +849,7 @@ public abstract class AbstractFSWAL implements WAL { // Tell our listeners that a log was archived. if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { - i.postLogArchive(file.getPath(), p); + i.postLogArchive(new FSWALIdentity(file.getPath()), new FSWALIdentity(p)); } } } @@ -994,7 +997,7 @@ public abstract class AbstractFSWAL implements WAL { * https://issues.apache.org/jira/browse/HBASE-14004 for more details. */ @Override - public OptionalLong getLogFileSizeIfBeingWritten(Path path) { + public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity path) { rollWriterLock.lock(); try { Path currentPath = getOldPath(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index ae084a4..49a8791 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -46,9 +46,12 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.EncryptionTest; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.FSWALIdentity; +import org.apache.hadoop.hbase.wal.WALIdentity; /** * Base class for Protobuf log writer. @@ -153,8 +156,10 @@ public abstract class AbstractProtobufLogWriter { return doCompress; } - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, - long blocksize) throws IOException, StreamLacksCapabilityException { + public void init(WALIdentity walId, Configuration conf, boolean overwritable, + long blocksize) throws IOException { + Path path = ((FSWALIdentity)walId).getPath(); + FileSystem fs = CommonFSUtils.getWALFileSystem(conf); this.conf = conf; boolean doCompress = initializeCompressionContext(conf, path); this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java index 13f5d6e..6e41126f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java @@ -69,10 +69,11 @@ public class Compressor { throws IOException { Configuration conf = HBaseConfiguration.create(); - FileSystem inFS = input.getFileSystem(conf); - FileSystem outFS = output.getFileSystem(conf); + WALFactory factory = new WALFactory(conf, "compressor"); + WALProvider provider = factory.getWALProvider(); - WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf); + WAL.Reader in = provider.createReader(provider.createWALIdentity(input.toString()), null, + false); WALProvider.Writer out = null; try { @@ -82,7 +83,8 @@ public class Compressor { } boolean compress = ((ReaderBase)in).hasCompression(); conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress); - out = WALFactory.createWALWriter(outFS, output, conf); + + out = provider.createWriter(conf, provider.createWALIdentity(output.toString()), false); WAL.Entry e = null; while ((e = in.next()) != null) out.append(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index baa87a4..2224756 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -50,7 +50,9 @@ import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; @@ -271,7 +273,9 @@ public class FSHLog extends AbstractFSWAL { */ @Override protected Writer createWriterInstance(final Path path) throws IOException { - Writer writer = FSHLogProvider.createWriter(conf, fs, path, false, this.blocksize); + WALFactory factory = new WALFactory(conf, "FSHLog"); + WALProvider provider = factory.getWALProvider(); + Writer writer = provider.createWriter(conf, provider.createWALIdentity(path.toString()), false); if (writer instanceof ProtobufLogWriter) { preemptiveSync((ProtobufLogWriter) writer); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index 13ffac7..4615434 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; @@ -38,7 +39,7 @@ public interface WALActionsListener { * @param oldPath the path to the old wal * @param newPath the path to the new wal */ - default void preLogRoll(Path oldPath, Path newPath) throws IOException {} + default void preLogRoll(WALIdentity oldPath, WALIdentity newPath) throws IOException {} /** * The WAL has been rolled. The oldPath can be null if this is @@ -46,21 +47,21 @@ public interface WALActionsListener { * @param oldPath the path to the old wal * @param newPath the path to the new wal */ - default void postLogRoll(Path oldPath, Path newPath) throws IOException {} + default void postLogRoll(WALIdentity oldPath, WALIdentity newPath) throws IOException {} /** * The WAL is going to be archived. * @param oldPath the path to the old wal * @param newPath the path to the new wal */ - default void preLogArchive(Path oldPath, Path newPath) throws IOException {} + default void preLogArchive(WALIdentity oldPath, WALIdentity newPath) throws IOException {} /** * The WAL has been archived. * @param oldPath the path to the old wal * @param newPath the path to the new wal */ - default void postLogArchive(Path oldPath, Path newPath) throws IOException {} + default void postLogArchive(WALIdentity oldPath, WALIdentity newPath) throws IOException {} /** * A request was made that the WAL be rolled. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java index 40d6d0f..c8ae6f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.coprocessor.WALObserver; import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -173,7 +174,7 @@ public class WALCoprocessorHost * @param oldPath the path of the current wal that we are replacing * @param newPath the path of the wal we are going to create */ - public void preWALRoll(Path oldPath, Path newPath) throws IOException { + public void preWALRoll(WALIdentity oldPath, WALIdentity newPath) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { @Override protected void call(WALObserver observer) throws IOException { @@ -187,7 +188,7 @@ public class WALCoprocessorHost * @param oldPath the path of the wal that we replaced * @param newPath the path of the wal we have created and now is the current */ - public void postWALRoll(Path oldPath, Path newPath) throws IOException { + public void postWALRoll(WALIdentity oldPath, WALIdentity newPath) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { @Override protected void call(WALObserver observer) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index f1bb538..b5eaaf7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -18,19 +18,17 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; -import java.util.List; import java.util.UUID; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,89 +38,36 @@ import org.slf4j.LoggerFactory; * another dead region server. This will be closed when all logs are pushed to peer cluster. */ @InterfaceAudience.Private -public class RecoveredReplicationSource extends ReplicationSource { +public abstract class RecoveredReplicationSource extends ReplicationSource { private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class); private String actualPeerId; @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { - super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode, - clusterId, walFileLengthProvider, metrics); + MetricsSource metrics, WALProvider walProvider) throws IOException { + super.init(conf, manager, queueStorage, replicationPeer, server, peerClusterZnode, + clusterId, walFileLengthProvider, metrics, walProvider); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } @Override protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId, - PriorityBlockingQueue queue) { + PriorityBlockingQueue queue) { return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); } - public void locateRecoveredPaths(PriorityBlockingQueue queue) throws IOException { - boolean hasPathChanged = false; - PriorityBlockingQueue newPaths = - new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator()); - pathsLoop: for (Path path : queue) { - if (fs.exists(path)) { // still in same location, don't need to do anything - newPaths.add(path); - continue; - } - // Path changed - try to find the right path. - hasPathChanged = true; - if (server instanceof ReplicationSyncUp.DummyServer) { - // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data - // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists - Path newPath = getReplSyncUpPath(path); - newPaths.add(newPath); - continue; - } else { - // See if Path exists in the dead RS folder (there could be a chain of failures - // to look at) - List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); - LOG.info("NB dead servers : " + deadRegionServers.size()); - final Path walDir = FSUtils.getWALRootDir(conf); - for (ServerName curDeadServerName : deadRegionServers) { - final Path deadRsDirectory = - new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName - .getServerName())); - Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path( - deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) }; - for (Path possibleLogLocation : locs) { - LOG.info("Possible location " + possibleLogLocation.toUri().toString()); - if (manager.getFs().exists(possibleLogLocation)) { - // We found the right new location - LOG.info("Log " + path + " still exists at " + possibleLogLocation); - newPaths.add(possibleLogLocation); - continue pathsLoop; - } - } - } - // didn't find a new location - LOG.error( - String.format("WAL Path %s doesn't exist and couldn't find its new location", path)); - newPaths.add(path); - } - } - - if (hasPathChanged) { - if (newPaths.size() != queue.size()) { // this shouldn't happen - LOG.error("Recovery queue size is incorrect"); - throw new IOException("Recovery queue size error"); - } - // put the correct locations in the queue - // since this is a recovered queue with no new incoming logs, - // there shouldn't be any concurrency issues - queue.clear(); - for (Path path : newPaths) { - queue.add(path); - } - } - } - + /** + * Get the updated queue of the wals if the wals are moved to another location. + * @param queue Updated queue with the new WALIdentity(paths or stream) if wals are archived + * @throws IOException + */ + public abstract void locateRecoveredWALIdentities(PriorityBlockingQueue queue) + throws IOException; + // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal // area rather than to the wal area for a particular region server. private Path getReplSyncUpPath(Path path) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index b0d4db0..9d0bbd3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +40,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper private final ReplicationQueueStorage replicationQueues; public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, - PriorityBlockingQueue queue, RecoveredReplicationSource source, + PriorityBlockingQueue queue, RecoveredReplicationSource source, ReplicationQueueStorage queueStorage) { super(conf, walGroupId, queue, source); this.source = source; @@ -58,7 +58,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper int numRetries = 0; while (numRetries <= maxRetriesMultiplier) { try { - source.locateRecoveredPaths(queue); + source.locateRecoveredWALIdentities(queue); break; } catch (IOException e) { LOG.error("Error while locating recovered queue paths, attempt #" + numRetries); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java index 24963f1..75d1eff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,9 +140,11 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { private Reader getReader(String wal) throws IOException { Path path = new Path(rs.getWALRootDir(), wal); long length = rs.getWALFileSystem().getFileStatus(path).getLen(); + WALFactory factory = new WALFactory(conf, "replay-sync-replication"); + WALProvider walProvider = factory.getWALProvider(); try { FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf); - return WALFactory.createReader(rs.getWALFileSystem(), path, rs.getConfiguration()); + return walProvider.createReader(walProvider.createWALIdentity(path.toString()), null, true); } catch (EOFException e) { if (length <= 0) { LOG.warn("File is empty. Could not open {} for reading because {}", path, e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 799d975..8faa9a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -127,7 +127,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), - mapping); + mapping, walProvider); this.syncReplicationPeerInfoProvider = new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping); PeerActionListener peerActionListener = PeerActionListener.DUMMY; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 10fa50f..814c1f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath; - -import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; @@ -35,6 +32,8 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -62,6 +61,8 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +87,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); // Queues of logs to process, entry in format of walGroupId->queue, // each presents a queue for one wal group - private Map> queues = new HashMap<>(); + private Map> queues = new HashMap<>(); // per group queue size, keep no more than this number of logs in each wal group protected int queueSizePerGroup; protected ReplicationQueueStorage queueStorage; @@ -137,6 +138,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private int waitOnEndpointSeconds = -1; private Thread initThread; + protected WALProvider walProvider; /** * Instantiation method used by region servers @@ -149,10 +151,10 @@ public class ReplicationSource implements ReplicationSourceInterface { * @param metrics metrics for replication source */ @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { + MetricsSource metrics, WALProvider walProvider) throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); this.waitOnEndpointSeconds = @@ -191,11 +193,11 @@ public class ReplicationSource implements ReplicationSourceInterface { } @Override - public void enqueueLog(Path log) { + public void enqueueLog(WALIdentity log) { String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName()); - PriorityBlockingQueue queue = queues.get(logPrefix); + PriorityBlockingQueue queue = queues.get(logPrefix); if (queue == null) { - queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); + queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator(walProvider)); // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise // the shipper may quit immediately queue.put(log); @@ -300,7 +302,7 @@ public class ReplicationSource implements ReplicationSourceInterface { this.walEntryFilter = new ChainWALEntryFilter(filters); } - private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { + private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { @@ -328,13 +330,8 @@ public class ReplicationSource implements ReplicationSourceInterface { int queueSize = queues.get(walGroupId).size(); replicationDelay = ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); - Path currentPath = shipper.getCurrentPath(); - try { - fileSize = getFileSize(currentPath); - } catch (IOException e) { - LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e); - fileSize = -1; - } + WALIdentity currentPath = shipper.getCurrentWALIdentity(); + fileSize = -1; ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); statusBuilder.withPeerId(this.getPeerId()) .withQueueSize(queueSize) @@ -349,24 +346,13 @@ public class ReplicationSource implements ReplicationSourceInterface { return sourceReplicationStatus; } - private long getFileSize(Path currentPath) throws IOException { - long fileSize; - try { - fileSize = fs.getContentSummary(currentPath).getLength(); - } catch (FileNotFoundException e) { - currentPath = getArchivedLogPath(currentPath, conf); - fileSize = fs.getContentSummary(currentPath).getLength(); - } - return fileSize; - } - protected ReplicationSourceShipper createNewShipper(String walGroupId, - PriorityBlockingQueue queue) { + PriorityBlockingQueue queue) { return new ReplicationSourceShipper(conf, walGroupId, queue, this); } private ReplicationSourceWALReader createNewWALReader(String walGroupId, - PriorityBlockingQueue queue, long startPosition) { + PriorityBlockingQueue queue, long startPosition) { return replicationPeer.getPeerConfig().isSerial() ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); @@ -374,7 +360,8 @@ public class ReplicationSource implements ReplicationSourceInterface { protected final void uncaughtException(Thread t, Throwable e) { RSRpcServices.exitIfOOME(e); - LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e); + LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentWALIdentity(), + e); server.abort("Unexpected exception in " + t.getName(), e); } @@ -497,9 +484,9 @@ public class ReplicationSource implements ReplicationSourceInterface { initializeWALEntryFilter(peerClusterId); // start workers - for (Map.Entry> entry : queues.entrySet()) { + for (Map.Entry> entry : queues.entrySet()) { String walGroupId = entry.getKey(); - PriorityBlockingQueue queue = entry.getValue(); + PriorityBlockingQueue queue = entry.getValue(); tryStartNewShipper(walGroupId, queue); } } @@ -593,11 +580,11 @@ public class ReplicationSource implements ReplicationSourceInterface { } @Override - public Path getCurrentPath() { + public WALIdentity getCurrentWALIdentity() { // only for testing for (ReplicationSourceShipper worker : workerThreads.values()) { - if (worker.getCurrentPath() != null) { - return worker.getCurrentPath(); + if (worker.getCurrentWALIdentity() != null) { + return worker.getCurrentWALIdentity(); } } return null; @@ -611,25 +598,16 @@ public class ReplicationSource implements ReplicationSourceInterface { /** * Comparator used to compare logs together based on their start time */ - public static class LogsComparator implements Comparator { - - @Override - public int compare(Path o1, Path o2) { - return Long.compare(getTS(o1), getTS(o2)); + public static class LogsComparator implements Comparator { + private static Pattern WAL_FILE_NAME_PATTERN = + Pattern.compile("(.+)\\.(\\d+)(\\.[0-9A-Za-z]+)?"); + private WALProvider walProvider; + public LogsComparator(WALProvider walProvider) { + this.walProvider = walProvider; } - - /** - *

- * Split a path to get the start time - *

- *

- * For example: 10.20.20.171%3A60020.1277499063250 - *

- * @param p path to split - * @return start time - */ - private static long getTS(Path p) { - return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName()); + @Override + public int compare(WALIdentity o1, WALIdentity o2) { + return Long.compare(walProvider.getWALStartTime(o1), walProvider.getWALStartTime(o2)); } } @@ -642,7 +620,7 @@ public class ReplicationSource implements ReplicationSourceInterface { String walGroupId = entry.getKey(); ReplicationSourceShipper worker = entry.getValue(); long position = worker.getCurrentPosition(); - Path currentPath = worker.getCurrentPath(); + WALIdentity currentPath = worker.getCurrentWALIdentity(); sb.append("walGroup [").append(walGroupId).append("]: "); if (currentPath != null) { sb.append("currently replicating from: ").append(currentPath).append(" at position: ") @@ -695,4 +673,7 @@ public class ReplicationSource implements ReplicationSourceInterface { void removeWorker(ReplicationSourceShipper worker) { workerThreads.remove(worker.walGroupId, worker); } + public WALProvider getWalProvider() { + return walProvider; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java index d613049..65241e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java @@ -47,7 +47,7 @@ public class ReplicationSourceFactory { LOG.warn("Passed replication source implementation throws errors, " + "defaulting to ReplicationSource", e); - src = isQueueRecovered ? new RecoveredReplicationSource() : new ReplicationSource(); + src = isQueueRecovered ? new FSRecoveredReplicationSource() : new ReplicationSource(); } return src; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index df7a8cc..fb09c31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.UUID; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; @@ -36,6 +35,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; /** @@ -51,16 +52,16 @@ public interface ReplicationSourceInterface { * @param manager the manager to use * @param server the server for this region server */ - void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException; + MetricsSource metrics, WALProvider walProvider) throws IOException; /** * Add a log to the list of logs to replicate * @param log path to the log to replicate */ - void enqueueLog(Path log); + void enqueueLog(WALIdentity log); /** * Add hfile names to the queue to be replicated. @@ -95,7 +96,7 @@ public interface ReplicationSourceInterface { * Get the current log that's replicated * @return the current log */ - Path getCurrentPath(); + WALIdentity getCurrentWALIdentity(); /** * Get the queue id that the source is replicating to diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 20c1215..fb2bc15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -64,6 +64,8 @@ import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -148,7 +150,7 @@ public class ReplicationSourceManager implements ReplicationListener { private final Configuration conf; private final FileSystem fs; // The paths to the latest log of each wal group, for new coming peers - private final Map latestPaths; + private final Map latestPaths; // Path to the wals directories private final Path logDir; // Path to the wal archive @@ -169,6 +171,8 @@ public class ReplicationSourceManager implements ReplicationListener { // Maximum number of retries before taking bold actions when deleting remote wal files for sync // replication peer. private final int maxRetriesMultiplier; + private final WALProvider walProvider; + private final ServerName serverName; /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -186,7 +190,8 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) throws IOException { + SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, + WALProvider walProvider) throws IOException { this.sources = new ConcurrentHashMap<>(); this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; @@ -196,6 +201,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); this.oldsources = new ArrayList<>(); this.conf = conf; + this.walProvider = walProvider; this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; @@ -212,6 +218,7 @@ public class ReplicationSourceManager implements ReplicationListener { // even if we fail, other region servers can take care of it this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + this.serverName = this.server.getServerName(); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); @@ -349,8 +356,8 @@ public class ReplicationSourceManager implements ReplicationListener { MetricsSource metrics = new MetricsSource(queueId); // init replication source - src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, - walFileLengthProvider, metrics); + src.init(conf, this, queueStorage, replicationPeer, server, queueId, clusterId, + walFileLengthProvider, metrics, walProvider); return src; } @@ -372,8 +379,8 @@ public class ReplicationSourceManager implements ReplicationListener { this.walsById.put(peerId, walsByGroup); // Add the latest wal to that source's queue if (!latestPaths.isEmpty()) { - for (Map.Entry walPrefixAndPath : latestPaths.entrySet()) { - Path walPath = walPrefixAndPath.getValue(); + for (Map.Entry walPrefixAndPath : latestPaths.entrySet()) { + WALIdentity walPath = walPrefixAndPath.getValue(); NavigableSet wals = new TreeSet<>(); wals.add(walPath.getName()); walsByGroup.put(walPrefixAndPath.getKey(), wals); @@ -484,7 +491,8 @@ public class ReplicationSourceManager implements ReplicationListener { toRemove.terminate(terminateMessage); } for (NavigableSet walsByGroup : walsById.get(peerId).values()) { - walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); + walsByGroup.forEach(wal -> src.enqueueLog( + ((SyncReplicationWALProvider)this.walProvider).getFullPath(serverName, wal))); } } LOG.info("Startup replication source for " + src.getPeerId()); @@ -505,7 +513,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface replicationSource = createSource(queueId, peer); this.oldsources.add(replicationSource); for (SortedSet walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { - walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal))); + walsByGroup.forEach(wal -> src.enqueueLog(this.walProvider.createWALIdentity(wal))); } toStartup.add(replicationSource); } @@ -735,7 +743,7 @@ public class ReplicationSourceManager implements ReplicationListener { // public because of we call it in TestReplicationEmptyWALRecovery @VisibleForTesting - public void preLogRoll(Path newLog) throws IOException { + public void preLogRoll(WALIdentity newLog) throws IOException { String logName = newLog.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); // synchronized on latestPaths to avoid the new open source miss the new log @@ -785,7 +793,7 @@ public class ReplicationSourceManager implements ReplicationListener { // public because of we call it in TestReplicationEmptyWALRecovery @VisibleForTesting - public void postLogRoll(Path newLog) throws IOException { + public void postLogRoll(WALIdentity newLog) throws IOException { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources.values()) { source.enqueueLog(newLog); @@ -961,7 +969,11 @@ public class ReplicationSourceManager implements ReplicationListener { } oldsources.add(src); for (String wal : walsSet) { - src.enqueueLog(new Path(oldLogDir, wal)); + WALIdentity archivedWal = ((SyncReplicationWALProvider)walProvider) + .getWalFromArchivePath(wal); + if (archivedWal != null) { + src.enqueueLog(archivedWal); + } } src.startup(); } @@ -1045,7 +1057,7 @@ public class ReplicationSourceManager implements ReplicationListener { } @VisibleForTesting - Set getLastestPath() { + Set getLastestPath() { synchronized (latestPaths) { return Sets.newHashSet(latestPaths.values()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 5d6198e..4b517ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; @@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,14 +56,14 @@ public class ReplicationSourceShipper extends Thread { private final Configuration conf; protected final String walGroupId; - protected final PriorityBlockingQueue queue; + protected final PriorityBlockingQueue queue; private final ReplicationSource source; // Last position in the log that we sent to ZooKeeper // It will be accessed by the stats thread so make it volatile private volatile long currentPosition = -1; // Path of the current log - private Path currentPath; + private WALIdentity currentPath; // Current state of the worker thread private volatile WorkerState state; protected ReplicationSourceWALReader entryReader; @@ -76,7 +76,7 @@ public class ReplicationSourceShipper extends Thread { private final int getEntriesTimeout; public ReplicationSourceShipper(Configuration conf, String walGroupId, - PriorityBlockingQueue queue, ReplicationSource source) { + PriorityBlockingQueue queue, ReplicationSource source) { this.conf = conf; this.walGroupId = walGroupId; this.queue = queue; @@ -293,7 +293,7 @@ public class ReplicationSourceShipper extends Thread { name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); } - Path getCurrentPath() { + WALIdentity getCurrentWALIdentity() { return entryReader.getCurrentPath(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java index 27b25c4..c00d8df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java @@ -19,11 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; @@ -46,12 +46,12 @@ class ReplicationSourceWALActionListener implements WALActionsListener { } @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { + public void preLogRoll(WALIdentity oldPath, WALIdentity newPath) throws IOException { manager.preLogRoll(newPath); } @Override - public void postLogRoll(Path oldPath, Path newPath) throws IOException { + public void postLogRoll(WALIdentity oldPath, WALIdentity newPath) throws IOException { manager.postLogRoll(newPath); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index b3bdb02..193e108 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import java.io.EOFException; import java.io.IOException; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -28,7 +27,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; @@ -37,7 +35,9 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -55,7 +55,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript class ReplicationSourceWALReader extends Thread { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); - private final PriorityBlockingQueue logQueue; + private final PriorityBlockingQueue logQueue; private final FileSystem fs; private final Configuration conf; private final WALEntryFilter filter; @@ -89,7 +89,7 @@ class ReplicationSourceWALReader extends Thread { * @param source replication source */ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, - PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, + PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, ReplicationSource source) { this.logQueue = logQueue; this.currentPosition = startPosition; @@ -123,9 +123,10 @@ class ReplicationSourceWALReader extends Thread { public void run() { int sleepMultiplier = 1; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream + WALProvider provider = this.source.getWalProvider(); try (WALEntryStream entryStream = - new WALEntryStream(logQueue, fs, conf, currentPosition, - source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), + provider.getWalStream(logQueue, conf, + currentPosition, source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), source.getSourceMetrics())) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!source.isPeerEnabled()) { @@ -152,9 +153,6 @@ class ReplicationSourceWALReader extends Thread { if (sleepMultiplier < maxRetriesMultiplier) { LOG.debug("Failed to read stream of replication entries: " + e); sleepMultiplier++; - } else { - LOG.error("Failed to read stream of replication entries", e); - handleEofException(e); } Threads.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { @@ -181,14 +179,14 @@ class ReplicationSourceWALReader extends Thread { batch.getNbEntries() >= replicationBatchCountCapacity; } - protected static final boolean switched(WALEntryStream entryStream, Path path) { - Path newPath = entryStream.getCurrentPath(); + protected static final boolean switched(WALEntryStream entryStream, WALIdentity path) { + WALIdentity newPath = entryStream.getCurrentWALIdentity(); return newPath == null || !path.getName().equals(newPath.getName()); } protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { - Path currentPath = entryStream.getCurrentPath(); + WALIdentity currentPath = entryStream.getCurrentWALIdentity(); if (!entryStream.hasNext()) { // check whether we have switched a file if (currentPath != null && switched(entryStream, currentPath)) { @@ -203,7 +201,7 @@ class ReplicationSourceWALReader extends Thread { } } else { // when reading from the entry stream first time we will enter here - currentPath = entryStream.getCurrentPath(); + currentPath = entryStream.getCurrentWALIdentity(); } WALEntryBatch batch = createBatch(entryStream); for (;;) { @@ -241,25 +239,7 @@ class ReplicationSourceWALReader extends Thread { } } - // if we get an EOF due to a zero-length log, and there are other logs in queue - // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is - // enabled, then dump the log - private void handleEofException(IOException e) { - if ((e instanceof EOFException || e.getCause() instanceof EOFException) && - logQueue.size() > 1 && this.eofAutoRecovery) { - try { - if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { - LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); - logQueue.remove(); - currentPosition = 0; - } - } catch (IOException ioe) { - LOG.warn("Couldn't get file length information about log " + logQueue.peek()); - } - } - } - - public Path getCurrentPath() { + public WALIdentity getCurrentPath() { // if we've read some WAL entries, get the Path we read from WALEntryBatch batchQueueHead = entryBatchQueue.peek(); if (batchQueueHead != null) { @@ -280,7 +260,7 @@ class ReplicationSourceWALReader extends Thread { } protected final WALEntryBatch createBatch(WALEntryStream entryStream) { - return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); + return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentWALIdentity()); } protected final Entry filterEntry(Entry entry) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java index 10d6cd5..94393cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java @@ -17,14 +17,14 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public final class ReplicationStatus { private final String peerId; private final String walGroup; - private final Path currentPath; + private final WALIdentity currentPath; private final int queueSize; private final long ageOfLastShippedOp; private final long replicationDelay; @@ -70,7 +70,7 @@ public final class ReplicationStatus { return replicationDelay; } - public Path getCurrentPath() { + public WALIdentity getCurrentPath() { return currentPath; } @@ -81,7 +81,7 @@ public final class ReplicationStatus { public static class ReplicationStatusBuilder { private String peerId = "UNKNOWN"; private String walGroup = "UNKNOWN"; - private Path currentPath = new Path("UNKNOWN"); + private WALIdentity currentPath = null; private int queueSize = -1; private long ageOfLastShippedOp = -1; private long replicationDelay = -1; @@ -103,7 +103,7 @@ public final class ReplicationStatus { return this; } - public ReplicationStatusBuilder withCurrentPath(Path currentPath) { + public ReplicationStatusBuilder withCurrentPath(WALIdentity currentPath) { this.currentPath = currentPath; return this; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index 9edcc8a..7a4d496 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -21,11 +21,11 @@ import java.io.IOException; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; /** @@ -44,7 +44,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader private final SerialReplicationChecker checker; public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf, - PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, + PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, ReplicationSource source) { super(fs, conf, logQueue, startPosition, filter, source); checker = new SerialReplicationChecker(conf, source); @@ -53,7 +53,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader @Override protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { - Path currentPath = entryStream.getCurrentPath(); + WALIdentity currentPath = entryStream.getCurrentWALIdentity(); if (!entryStream.hasNext()) { // check whether we have switched a file if (currentPath != null && switched(entryStream, currentPath)) { @@ -68,7 +68,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader } } else { // when reading from the entry stream first time we will enter here - currentPath = entryStream.getCurrentPath(); + currentPath = entryStream.getCurrentWALIdentity(); } long positionBefore = entryStream.getPosition(); WALEntryBatch batch = createBatch(entryStream); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index 22b2de7..b3b0978 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; /** @@ -36,7 +37,7 @@ class WALEntryBatch { private List walEntries; // last WAL that was read - private Path lastWalPath; + private WALIdentity lastWalPath; // position in WAL of last entry in this batch private long lastWalPosition = 0; // number of distinct row keys in this batch @@ -53,13 +54,13 @@ class WALEntryBatch { /** * @param lastWalPath Path of the WAL the last entry in this batch was read from */ - WALEntryBatch(int maxNbEntries, Path lastWalPath) { + WALEntryBatch(int maxNbEntries, WALIdentity lastWalPath) { this.walEntries = new ArrayList<>(maxNbEntries); this.lastWalPath = lastWalPath; } - static WALEntryBatch endOfFile(Path lastWalPath) { + static WALEntryBatch endOfFile(WALIdentity lastWalPath) { WALEntryBatch batch = new WALEntryBatch(0, lastWalPath); batch.setLastWalPosition(-1L); batch.setEndOfFile(true); @@ -80,7 +81,7 @@ class WALEntryBatch { /** * @return the path of the last WAL that was read. */ - public Path getLastWalPath() { + public WALIdentity getLastWalPath() { return lastWalPath; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 0393af4..074aca2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -18,29 +18,14 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import java.io.Closeable; -import java.io.FileNotFoundException; +import org.apache.hadoop.hbase.wal.WALIdentity; + import java.io.IOException; -import java.util.OptionalLong; -import java.util.concurrent.PriorityBlockingQueue; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; -import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; + +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WAL.Reader; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually @@ -49,382 +34,19 @@ import org.slf4j.LoggerFactory; */ @InterfaceAudience.Private @InterfaceStability.Evolving -class WALEntryStream implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class); - - private Reader reader; - private Path currentPath; - // cache of next entry for hasNext() - private Entry currentEntry; - // position for the current entry. As now we support peek, which means that the upper layer may - // choose to return before reading the current entry, so it is not safe to return the value below - // in getPosition. - private long currentPositionOfEntry = 0; - // position after reading current entry - private long currentPositionOfReader = 0; - private final PriorityBlockingQueue logQueue; - private final FileSystem fs; - private final Configuration conf; - private final WALFileLengthProvider walFileLengthProvider; - // which region server the WALs belong to - private final ServerName serverName; - private final MetricsSource metrics; - +public interface WALEntryStream extends WAL.Reader { /** - * Create an entry stream over the given queue at the given start position - * @param logQueue the queue of WAL paths - * @param fs {@link FileSystem} to use to create {@link Reader} for this stream - * @param conf {@link Configuration} to use to create {@link Reader} for this stream - * @param startPosition the position in the first WAL to start reading at - * @param serverName the server name which all WALs belong to - * @param metrics replication metrics - * @throws IOException + * @return the {@link WALIdentity} of the current WAL */ - public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs, Configuration conf, - long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, - MetricsSource metrics) throws IOException { - this.logQueue = logQueue; - this.fs = fs; - this.conf = conf; - this.currentPositionOfEntry = startPosition; - this.walFileLengthProvider = walFileLengthProvider; - this.serverName = serverName; - this.metrics = metrics; - } + public WALIdentity getCurrentWALIdentity(); /** * @return true if there is another WAL {@link Entry} */ - public boolean hasNext() throws IOException { - if (currentEntry == null) { - tryAdvanceEntry(); - } - return currentEntry != null; - } + public boolean hasNext() throws IOException; /** * Returns the next WAL entry in this stream but does not advance. */ - public Entry peek() throws IOException { - return hasNext() ? currentEntry: null; - } - - /** - * Returns the next WAL entry in this stream and advance the stream. - */ - public Entry next() throws IOException { - Entry save = peek(); - currentPositionOfEntry = currentPositionOfReader; - currentEntry = null; - return save; - } - - /** - * {@inheritDoc} - */ - @Override - public void close() throws IOException { - closeReader(); - } - - /** - * @return the position of the last Entry returned by next() - */ - public long getPosition() { - return currentPositionOfEntry; - } - - /** - * @return the {@link Path} of the current WAL - */ - public Path getCurrentPath() { - return currentPath; - } - - private String getCurrentPathStat() { - StringBuilder sb = new StringBuilder(); - if (currentPath != null) { - sb.append("currently replicating from: ").append(currentPath).append(" at position: ") - .append(currentPositionOfEntry).append("\n"); - } else { - sb.append("no replication ongoing, waiting for new log"); - } - return sb.toString(); - } - - /** - * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned - * false) - */ - public void reset() throws IOException { - if (reader != null && currentPath != null) { - resetReader(); - } - } - - private void setPosition(long position) { - currentPositionOfEntry = position; - } - - private void setCurrentPath(Path path) { - this.currentPath = path; - } - - private void tryAdvanceEntry() throws IOException { - if (checkReader()) { - boolean beingWritten = readNextEntryAndRecordReaderPosition(); - if (currentEntry == null && !beingWritten) { - // no more entries in this log file, and the file is already closed, i.e, rolled - // Before dequeueing, we should always get one more attempt at reading. - // This is in case more entries came in after we opened the reader, and the log is rolled - // while we were reading. See HBASE-6758 - resetReader(); - readNextEntryAndRecordReaderPosition(); - if (currentEntry == null) { - if (checkAllBytesParsed()) { // now we're certain we're done with this log file - dequeueCurrentLog(); - if (openNextLog()) { - readNextEntryAndRecordReaderPosition(); - } - } - } - } - // if currentEntry != null then just return - // if currentEntry == null but the file is still being written, then we should not switch to - // the next log either, just return here and try next time to see if there are more entries in - // the current file - } - // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) - } - - // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file - private boolean checkAllBytesParsed() throws IOException { - // -1 means the wal wasn't closed cleanly. - final long trailerSize = currentTrailerSize(); - FileStatus stat = null; - try { - stat = fs.getFileStatus(this.currentPath); - } catch (IOException exception) { - LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", - currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat()); - metrics.incrUnknownFileLengthForClosedWAL(); - } - // Here we use currentPositionOfReader instead of currentPositionOfEntry. - // We only call this method when currentEntry is null so usually they are the same, but there - // are two exceptions. One is we have nothing in the file but only a header, in this way - // the currentPositionOfEntry will always be 0 since we have no change to update it. The other - // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the - // last valid entry, and the currentPositionOfReader will usually point to the end of the file. - if (stat != null) { - if (trailerSize < 0) { - if (currentPositionOfReader < stat.getLen()) { - final long skippedBytes = stat.getLen() - currentPositionOfReader; - LOG.debug( - "Reached the end of WAL file '{}'. It was not closed cleanly," + - " so we did not parse {} bytes of data. This is normally ok.", - currentPath, skippedBytes); - metrics.incrUncleanlyClosedWALs(); - metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); - } - } else if (currentPositionOfReader + trailerSize < stat.getLen()) { - LOG.warn( - "Processing end of WAL file '{}'. At position {}, which is too far away from" + - " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", - currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); - setPosition(0); - resetReader(); - metrics.incrRestartedWALReading(); - metrics.incrRepeatedFileBytes(currentPositionOfReader); - return false; - } - } - if (LOG.isTraceEnabled()) { - LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + - (stat == null ? "N/A" : stat.getLen())); - } - metrics.incrCompletedWAL(); - return true; - } - - private void dequeueCurrentLog() throws IOException { - LOG.debug("Reached the end of log {}", currentPath); - closeReader(); - logQueue.remove(); - setPosition(0); - metrics.decrSizeOfLogQueue(); - } - - /** - * Returns whether the file is opened for writing. - */ - private boolean readNextEntryAndRecordReaderPosition() throws IOException { - Entry readEntry = reader.next(); - long readerPos = reader.getPosition(); - OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); - if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { - // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted - // data, so we need to make sure that we do not read beyond the committed file length. - if (LOG.isDebugEnabled()) { - LOG.debug("The provider tells us the valid length for " + currentPath + " is " + - fileLength.getAsLong() + ", but we have advanced to " + readerPos); - } - resetReader(); - return true; - } - if (readEntry != null) { - metrics.incrLogEditsRead(); - metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); - } - currentEntry = readEntry; // could be null - this.currentPositionOfReader = readerPos; - return fileLength.isPresent(); - } - - private void closeReader() throws IOException { - if (reader != null) { - reader.close(); - reader = null; - } - } - - // if we don't have a reader, open a reader on the next log - private boolean checkReader() throws IOException { - if (reader == null) { - return openNextLog(); - } - return true; - } - - // open a reader on the next log in queue - private boolean openNextLog() throws IOException { - Path nextPath = logQueue.peek(); - if (nextPath != null) { - openReader(nextPath); - if (reader != null) { - return true; - } - } else { - // no more files in queue, this could happen for recovered queue, or for a wal group of a sync - // replication peer which has already been transited to DA or S. - setCurrentPath(null); - } - return false; - } - - private Path getArchivedLog(Path path) throws IOException { - Path rootDir = FSUtils.getRootDir(conf); - - // Try found the log in old dir - Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); - Path archivedLogLocation = new Path(oldLogDir, path.getName()); - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } - - // Try found the log in the seperate old log dir - oldLogDir = - new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) - .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); - archivedLogLocation = new Path(oldLogDir, path.getName()); - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } - - LOG.error("Couldn't locate log: " + path); - return path; - } - - private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { - // If the log was archived, continue reading from there - Path archivedLog = getArchivedLog(path); - if (!path.equals(archivedLog)) { - openReader(archivedLog); - } else { - throw fnfe; - } - } - - private void openReader(Path path) throws IOException { - try { - // Detect if this is a new file, if so get a new reader else - // reset the current reader so that we see the new data - if (reader == null || !getCurrentPath().equals(path)) { - closeReader(); - reader = WALFactory.createReader(fs, path, conf); - seek(); - setCurrentPath(path); - } else { - resetReader(); - } - } catch (FileNotFoundException fnfe) { - handleFileNotFound(path, fnfe); - } catch (RemoteException re) { - IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); - if (!(ioe instanceof FileNotFoundException)) throw ioe; - handleFileNotFound(path, (FileNotFoundException)ioe); - } catch (LeaseNotRecoveredException lnre) { - // HBASE-15019 the WAL was not closed due to some hiccup. - LOG.warn("Try to recover the WAL lease " + currentPath, lnre); - recoverLease(conf, currentPath); - reader = null; - } catch (NullPointerException npe) { - // Workaround for race condition in HDFS-4380 - // which throws a NPE if we open a file before any data node has the most recent block - // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. - LOG.warn("Got NPE opening reader, will retry."); - reader = null; - } - } - - // For HBASE-15019 - private void recoverLease(final Configuration conf, final Path path) { - try { - final FileSystem dfs = FSUtils.getCurrentFileSystem(conf); - FSUtils fsUtils = FSUtils.getInstance(dfs, conf); - fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { - @Override - public boolean progress() { - LOG.debug("recover WAL lease: " + path); - return true; - } - }); - } catch (IOException e) { - LOG.warn("unable to recover lease for WAL: " + path, e); - } - } - - private void resetReader() throws IOException { - try { - currentEntry = null; - reader.reset(); - seek(); - } catch (FileNotFoundException fnfe) { - // If the log was archived, continue reading from there - Path archivedLog = getArchivedLog(currentPath); - if (!currentPath.equals(archivedLog)) { - openReader(archivedLog); - } else { - throw fnfe; - } - } catch (NullPointerException npe) { - throw new IOException("NPE resetting reader, likely HDFS-4380", npe); - } - } - - private void seek() throws IOException { - if (currentPositionOfEntry != 0) { - reader.seek(currentPositionOfEntry); - } - } - - private long currentTrailerSize() { - long size = -1L; - if (reader instanceof ProtobufLogReader) { - final ProtobufLogReader pblr = (ProtobufLogReader) reader; - size = pblr.trailerSize(); - } - return size; - } + public Entry peek() throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java index 010fa69..40772ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.OptionalLong; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; /** @@ -30,5 +30,5 @@ import org.apache.yetus.audience.InterfaceAudience; @FunctionalInterface public interface WALFileLengthProvider { - OptionalLong getLogFileSizeIfBeingWritten(Path path); + OptionalLong getLogFileSizeIfBeingWritten(WALIdentity path); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index ccdc95f..acbc8af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.wal; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -39,8 +41,17 @@ import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -84,7 +95,11 @@ public abstract class AbstractFSWALProvider> implemen protected AtomicBoolean initialized = new AtomicBoolean(false); // for default wal provider, logPrefix won't change protected String logPrefix; - + private Class logReaderClass; + /** + * How long to attempt opening in-recovery wals + */ + private int timeoutMillis; /** * we synchronized on walCreateLock to prevent wal recreation in different threads */ @@ -101,6 +116,9 @@ public abstract class AbstractFSWALProvider> implemen if (!initialized.compareAndSet(false, true)) { throw new IllegalStateException("WALProvider.init should only be called once."); } + timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); + logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, + AbstractFSWALProvider.Reader.class); this.factory = factory; this.conf = conf; this.providerId = providerId; @@ -448,19 +466,20 @@ public abstract class AbstractFSWALProvider> implemen * @throws IOException */ public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf) - throws IOException - - { + throws IOException { long retryInterval = 2000; // 2 sec int maxAttempts = 30; int attempt = 0; Exception ee = null; org.apache.hadoop.hbase.wal.WAL.Reader reader = null; + WALFactory factory = new WALFactory(conf, "pretty-printer"); + WALProvider provider = factory.getWALProvider(); + while (reader == null && attempt++ < maxAttempts) { try { // Detect if this is a new file, if so get a new reader else // reset the current reader so that we see the new data - reader = WALFactory.createReader(path.getFileSystem(conf), path, conf); + reader = provider.createReader(provider.createWALIdentity(path.toString()), null, true); return reader; } catch (FileNotFoundException fnfe) { // If the log was archived, continue reading from there @@ -537,7 +556,139 @@ public abstract class AbstractFSWALProvider> implemen return getWALNameGroupFromWALName(name, 1); } + @Override + public long getWALStartTime(WALIdentity id) { + return getWALStartTimeFromWALName(id.getName()); + } + + /** + * Public because of FSHLog. Should be package-private + * @param overwritable if the created writer can overwrite. For recovered edits, it is true and + * for WAL it is false. Thus we can distinguish WAL and recovered edits by this. + */ + public Writer createWriter(final Configuration conf, final WALIdentity walId, + final boolean overwritable) throws IOException { + Path path = ((FSWALIdentity)walId).getPath(); + FileSystem fs = CommonFSUtils.getWALFileSystem(conf); + // Configuration already does caching for the Class lookup. + Class logWriterClass = + conf.getClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class, + Writer.class); + Writer writer = null; + try { + writer = logWriterClass.getDeclaredConstructor().newInstance(); + long blocksize = WALUtil.getWALBlockSize(conf, fs, path, overwritable); + writer.init(walId, conf, overwritable, blocksize); + return writer; + } catch (Exception e) { + if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { + LOG.error("The RegionServer write ahead log provider for FileSystem implementations " + + "relies on the ability to call " + e.getMessage() + " for proper operation during " + + "component failures, but the current FileSystem does not support doing so. Please " + + "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " + + "it points to a FileSystem mount that has suitable capabilities for output streams."); + } else { + LOG.debug("Error instantiating log writer.", e); + } + if (writer != null) { + try{ + writer.close(); + } catch(IOException ee){ + LOG.error("cannot close log writer", ee); + } + } + throw new IOException("cannot get log writer", e); + } + } + public static long getWALStartTimeFromWALName(String name) { return Long.parseLong(getWALNameGroupFromWALName(name, 2)); } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + walFileLengthProvider, serverName, metrics, this); + } + @Override + public void preRecovery(WALIdentity walId, Configuration conf, CancelableProgressable reporter) + throws IOException { + FileSystem walFS = CommonFSUtils.getWALFileSystem(conf); + FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, + ((FSWALIdentity)walId).getPath(), conf, reporter); + } + + @Override + public Reader createReader(final WALIdentity walId, CancelableProgressable reporter, + boolean allowCustom) throws IOException { + Class lrClass = + allowCustom ? logReaderClass : ProtobufLogReader.class; + try { + // A wal file could be under recovery, so it may take several + // tries to get it open. Instead of claiming it is corrupted, retry + // to open it up to 5 minutes by default. + long startWaiting = EnvironmentEdgeManager.currentTime(); + long openTimeout = timeoutMillis + startWaiting; + int nbAttempt = 0; + Path path = ((FSWALIdentity)walId).getPath(); + FileSystem fs = CommonFSUtils.getWALFileSystem(conf); + AbstractFSWALProvider.Reader reader = null; + while (true) { + try { + reader = lrClass.getDeclaredConstructor().newInstance(); + reader.init(fs, path, conf, null); + return reader; + } catch (IOException e) { + if (reader != null) { + try { + reader.close(); + } catch (IOException exception) { + LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); + LOG.debug("exception details", exception); + } + } + + String msg = e.getMessage(); + if (msg != null + && (msg.contains("Cannot obtain block length") + || msg.contains("Could not obtain the last block") || msg + .matches("Blocklist for [^ ]* has changed.*"))) { + if (++nbAttempt == 1) { + LOG.warn("Lease should have recovered. This is not expected. Will retry", e); + } + if (reporter != null && !reporter.progress()) { + throw new InterruptedIOException("Operation is cancelled"); + } + if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { + LOG.error("Can't open after " + nbAttempt + " attempts and " + + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); + } else { + try { + Thread.sleep(nbAttempt < 3 ? 500 : 1000); + continue; // retry + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + throw new LeaseNotRecoveredException(e); + } else { + throw e; + } + } + } + } catch (IOException ie) { + throw ie; + } catch (Exception e) { + throw new IOException("Cannot get log reader", e); + } + } + + @Override + public WALIdentity createWALIdentity(String wal) { + return new FSWALIdentity(wal); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 062b368..5e821d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -52,11 +51,9 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { public interface AsyncWriter extends WALProvider.AsyncWriter { /** * @throws IOException if something goes wrong initializing an output stream - * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that - * meet the needs of the given Writer implementation. */ - void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) - throws IOException, CommonFSUtils.StreamLacksCapabilityException; + void init(WALIdentity walId, Configuration c, boolean overwritable, long blocksize) + throws IOException; } private EventLoopGroup eventLoopGroup; @@ -99,10 +96,12 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { // Configuration already does caching for the Class lookup. Class logWriterClass = conf.getClass( "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class); + WALFactory factory = new WALFactory(conf, "AsyncFSWAL"); + WALProvider provider = factory.getWALProvider(); try { AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) .newInstance(eventLoopGroup, channelClass); - writer.init(fs, path, conf, overwritable, blocksize); + writer.init(provider.createWALIdentity(path.toString()), conf, overwritable, blocksize); return writer; } catch (Exception e) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 75439fe..b22715c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -31,11 +32,19 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,16 +130,17 @@ class DisabledWALProvider implements WALProvider { for (WALActionsListener listener : listeners) { listener.logRollRequested(false); } + WALIdentity walId = new FSWALIdentity(path); for (WALActionsListener listener : listeners) { try { - listener.preLogRoll(path, path); + listener.preLogRoll(walId, walId); } catch (IOException exception) { LOG.debug("Ignoring exception from listener.", exception); } } for (WALActionsListener listener : listeners) { try { - listener.postLogRoll(path, path); + listener.postLogRoll(walId, walId); } catch (IOException exception) { LOG.debug("Ignoring exception from listener.", exception); } @@ -243,7 +253,7 @@ class DisabledWALProvider implements WALProvider { } @Override - public OptionalLong getLogFileSizeIfBeingWritten(Path path) { + public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity path) { return OptionalLong.empty(); } } @@ -262,4 +272,38 @@ class DisabledWALProvider implements WALProvider { public void addWALActionsListener(WALActionsListener listener) { disabled.registerWALActionsListener(listener); } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileSizeProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + walFileSizeProvider, serverName, metrics, this); + } + @Override + public void preRecovery(WALIdentity walId, Configuration conf, CancelableProgressable reporter) + throws IOException { + } + + @Override + public Reader createReader(final WALIdentity path, CancelableProgressable reporter, + boolean allowCustom) throws IOException { + return null; + } + + @Override + public Writer createWriter(final Configuration conf, final WALIdentity walId, + final boolean overwritable) throws IOException { + return null; + } + + @Override + public long getWALStartTime(WALIdentity id) { + return 0; + } + + @Override + public WALIdentity createWALIdentity(String wal) { + return new FSWALIdentity(wal); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index 7cd39ea..9275798 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -20,14 +20,8 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; -import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; -import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -42,64 +36,6 @@ public class FSHLogProvider extends AbstractFSWALProvider { private static final Logger LOG = LoggerFactory.getLogger(FSHLogProvider.class); - // Only public so classes back in regionserver.wal can access - public interface Writer extends WALProvider.Writer { - /** - * @throws IOException if something goes wrong initializing an output stream - * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that - * meet the needs of the given Writer implementation. - */ - void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) - throws IOException, CommonFSUtils.StreamLacksCapabilityException; - } - - /** - * Public because of FSHLog. Should be package-private - * @param overwritable if the created writer can overwrite. For recovered edits, it is true and - * for WAL it is false. Thus we can distinguish WAL and recovered edits by this. - */ - public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, - final boolean overwritable) throws IOException { - return createWriter(conf, fs, path, overwritable, - WALUtil.getWALBlockSize(conf, fs, path, overwritable)); - } - - /** - * Public because of FSHLog. Should be package-private - */ - public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, - final boolean overwritable, long blocksize) throws IOException { - // Configuration already does caching for the Class lookup. - Class logWriterClass = - conf.getClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class, - Writer.class); - Writer writer = null; - try { - writer = logWriterClass.getDeclaredConstructor().newInstance(); - FileSystem rootFs = FileSystem.get(path.toUri(), conf); - writer.init(rootFs, path, conf, overwritable, blocksize); - return writer; - } catch (Exception e) { - if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { - LOG.error("The RegionServer write ahead log provider for FileSystem implementations " + - "relies on the ability to call " + e.getMessage() + " for proper operation during " + - "component failures, but the current FileSystem does not support doing so. Please " + - "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " + - "it points to a FileSystem mount that has suitable capabilities for output streams."); - } else { - LOG.debug("Error instantiating log writer.", e); - } - if (writer != null) { - try{ - writer.close(); - } catch(IOException ee){ - LOG.error("cannot close log writer", ee); - } - } - throw new IOException("cannot get log writer", e); - } - } - @Override protected FSHLog createWAL() throws IOException { return new FSHLog(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index 0b7b8da..e85bc16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -26,16 +26,25 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; // imports for classes still in regionserver.wal import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +97,7 @@ public class RegionGroupingProvider implements WALProvider { } } + static private final String GROUP_ID = "RegionGrouping"; /** * instantiate a strategy from a config property. * requires conf to have already been set (as well as anything the provider might need to read). @@ -285,4 +295,56 @@ public class RegionGroupingProvider implements WALProvider { // extra code actually works, then we will have other big problems. So leave it as is. listeners.add(listener); } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + walFileLengthProvider, serverName, metrics, this); + } + @Override + public void preRecovery(WALIdentity walId, Configuration conf, CancelableProgressable reporter) + throws IOException { + if (cached.isEmpty()) { + getWAL(GROUP_ID); + } + cached.values().iterator().next().preRecovery(walId, conf, reporter); + } + @Override + public Reader createReader(final WALIdentity path, CancelableProgressable reporter, + boolean allowCustom) throws IOException { + if (cached.isEmpty()) { + getWAL(GROUP_ID); + } + return cached.values().iterator().next().createReader(path, reporter, allowCustom); + } + + private void ensureWALPresent() { + try { + if (cached.isEmpty()) { + getWAL(GROUP_ID); + } + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + + @Override + public Writer createWriter(final Configuration conf, final WALIdentity walId, + final boolean overwritable) throws IOException { + ensureWALPresent(); + return cached.values().iterator().next().createWriter(conf, walId, overwritable); + } + + @Override + public long getWALStartTime(WALIdentity id) { + ensureWALPresent(); + return cached.values().iterator().next().getWALStartTime(id); + } + + @Override + public WALIdentity createWALIdentity(String wal) { + return new FSWALIdentity(wal); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 9859c20..3528406 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.function.BiPredicate; @@ -36,18 +37,28 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener; import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +101,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen private Class channelClass; private AtomicBoolean initialized = new AtomicBoolean(false); + private Path oldLogDir; + private Path walRootDir; // when switching from A to DA, we will put a Optional.empty into this map if there is no WAL for // the peer yet. When getting WAL from this map the caller should know that it should not use @@ -115,6 +128,10 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen provider.init(factory, conf, providerId); this.conf = conf; this.factory = factory; + if (FSUtils.hasWALRootDir(conf)) { + walRootDir = FSUtils.getWALRootDir(conf); + this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); + } Pair> eventLoopGroupAndChannelClass = NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); @@ -349,4 +366,47 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen return provider; } + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, + ServerName serverName, MetricsSource metrics) + throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + walFileLengthProvider, serverName, metrics, this); + } + + @Override + public long getWALStartTime(WALIdentity id) { + return provider.getWALStartTime(id); + } + + @Override + public WALIdentity createWALIdentity(String wal) { + return provider.createWALIdentity(wal); + } + + @Override + public Writer createWriter(final Configuration conf, final WALIdentity walId, + final boolean overwritable) throws IOException { + return provider.createWriter(conf, walId, overwritable); + } + + @Override + public void preRecovery(WALIdentity walId, Configuration conf, CancelableProgressable reporter) + throws IOException { + provider.preRecovery(walId, conf, reporter); + } + + @Override + public Reader createReader(final WALIdentity path, CancelableProgressable reporter, + boolean allowCustom) throws IOException { + return provider.createReader(path, reporter, allowCustom); + } + public WALIdentity getWalFromArchivePath(String wal) { + return new FSWALIdentity(new Path(oldLogDir, wal)); + } + public WALIdentity getFullPath(ServerName serverName, String wal) { + Path walWithServerName = new Path(getWALDirectoryName(serverName.toString()), wal); + return new FSWALIdentity(new Path(walRootDir,walWithServerName)); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 8bde6d2..f26d783 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -18,18 +18,13 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.yetus.audience.InterfaceAudience; @@ -93,7 +88,7 @@ public class WALFactory { /** * Configuration-specified WAL Reader used when a custom reader is requested */ - private final Class logReaderClass; + private final Class logReaderClass; /** * How long to attempt opening in-recovery wals @@ -103,20 +98,30 @@ public class WALFactory { private final Configuration conf; // Used for the singleton WALFactory, see below. - private WALFactory(Configuration conf) { + private WALFactory(Configuration conf) throws IOException { // this code is duplicated here so we can keep our members final. // until we've moved reader/writer construction down into providers, this initialization must // happen prior to provider initialization, in case they need to instantiate a reader/writer. timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); /* TODO Both of these are probably specific to the fs wal provider */ logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, - AbstractFSWALProvider.Reader.class); + WAL.Reader.class); this.conf = conf; // end required early initialization // this instance can't create wals, just reader/writers. - provider = null; factoryId = SINGLETON_ID; + if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { + WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); + provider.init(this, conf, null); + provider.addWALActionsListener(new MetricsWAL()); + this.provider = provider; + } else { + // special handling of existing configuration behavior. + LOG.warn("Running with WAL disabled."); + provider = new DisabledWALProvider(); + provider.init(this, conf, factoryId); + } } @VisibleForTesting @@ -191,7 +196,7 @@ public class WALFactory { timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); /* TODO Both of these are probably specific to the fs wal provider */ logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, - AbstractFSWALProvider.Reader.class); + WAL.Reader.class); this.conf = conf; this.factoryId = factoryId; // end required early initialization @@ -296,109 +301,6 @@ public class WALFactory { } } - public Reader createReader(final FileSystem fs, final Path path) throws IOException { - return createReader(fs, path, (CancelableProgressable)null); - } - - /** - * Create a reader for the WAL. If you are reading from a file that's being written to and need - * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method - * then just seek back to the last known good position. - * @return A WAL reader. Close when done with it. - * @throws IOException - */ - public Reader createReader(final FileSystem fs, final Path path, - CancelableProgressable reporter) throws IOException { - return createReader(fs, path, reporter, true); - } - - public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, - boolean allowCustom) throws IOException { - Class lrClass = - allowCustom ? logReaderClass : ProtobufLogReader.class; - try { - // A wal file could be under recovery, so it may take several - // tries to get it open. Instead of claiming it is corrupted, retry - // to open it up to 5 minutes by default. - long startWaiting = EnvironmentEdgeManager.currentTime(); - long openTimeout = timeoutMillis + startWaiting; - int nbAttempt = 0; - AbstractFSWALProvider.Reader reader = null; - while (true) { - try { - reader = lrClass.getDeclaredConstructor().newInstance(); - reader.init(fs, path, conf, null); - return reader; - } catch (IOException e) { - if (reader != null) { - try { - reader.close(); - } catch (IOException exception) { - LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); - LOG.debug("exception details", exception); - } - } - - String msg = e.getMessage(); - if (msg != null - && (msg.contains("Cannot obtain block length") - || msg.contains("Could not obtain the last block") || msg - .matches("Blocklist for [^ ]* has changed.*"))) { - if (++nbAttempt == 1) { - LOG.warn("Lease should have recovered. This is not expected. Will retry", e); - } - if (reporter != null && !reporter.progress()) { - throw new InterruptedIOException("Operation is cancelled"); - } - if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { - LOG.error("Can't open after " + nbAttempt + " attempts and " - + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); - } else { - try { - Thread.sleep(nbAttempt < 3 ? 500 : 1000); - continue; // retry - } catch (InterruptedException ie) { - InterruptedIOException iioe = new InterruptedIOException(); - iioe.initCause(ie); - throw iioe; - } - } - throw new LeaseNotRecoveredException(e); - } else { - throw e; - } - } - } - } catch (IOException ie) { - throw ie; - } catch (Exception e) { - throw new IOException("Cannot get log reader", e); - } - } - - /** - * Create a writer for the WAL. - * Uses defaults. - *

- * Should be package-private. public only for tests and - * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} - * @return A WAL writer. Close when done with it. - */ - public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, false); - } - - /** - * Should be package-private, visible for recovery testing. - * Uses defaults. - * @return an overwritable writer for recovered edits. caller should close. - */ - @VisibleForTesting - public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) - throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, true); - } - // These static methods are currently used where it's impractical to // untangle the reliance on state in the filesystem. They rely on singleton // WALFactory that just provides Reader / Writers. @@ -410,7 +312,12 @@ public class WALFactory { public static WALFactory getInstance(Configuration configuration) { WALFactory factory = singleton.get(); if (null == factory) { - WALFactory temp = new WALFactory(configuration); + WALFactory temp = null; + try { + temp = new WALFactory(configuration); + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } if (singleton.compareAndSet(null, temp)) { factory = temp; } else { @@ -426,60 +333,6 @@ public class WALFactory { return factory; } - /** - * Create a reader for the given path, accept custom reader classes from conf. - * If you already have a WALFactory, you should favor the instance method. - * @return a WAL Reader, caller must close. - */ - public static Reader createReader(final FileSystem fs, final Path path, - final Configuration configuration) throws IOException { - return getInstance(configuration).createReader(fs, path); - } - - /** - * Create a reader for the given path, accept custom reader classes from conf. - * If you already have a WALFactory, you should favor the instance method. - * @return a WAL Reader, caller must close. - */ - static Reader createReader(final FileSystem fs, final Path path, - final Configuration configuration, final CancelableProgressable reporter) throws IOException { - return getInstance(configuration).createReader(fs, path, reporter); - } - - /** - * Create a reader for the given path, ignore custom reader classes from conf. - * If you already have a WALFactory, you should favor the instance method. - * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} - * @return a WAL Reader, caller must close. - */ - public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path, - final Configuration configuration) throws IOException { - return getInstance(configuration).createReader(fs, path, null, false); - } - - /** - * If you already have a WALFactory, you should favor the instance method. - * Uses defaults. - * @return a Writer that will overwrite files. Caller must close. - */ - static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, - final Configuration configuration) - throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, true); - } - - /** - * If you already have a WALFactory, you should favor the instance method. - * Uses defaults. - * @return a writer that won't overwrite files. Caller must close. - */ - @VisibleForTesting - public static Writer createWALWriter(final FileSystem fs, final Path path, - final Configuration configuration) - throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, false); - } - public final WALProvider getWALProvider() { return this.provider; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java index 281f3c9..9192160 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java @@ -259,7 +259,9 @@ public class WALPrettyPrinter { throw new IOException(p + " is not a file"); } - WAL.Reader log = WALFactory.createReader(fs, p, conf); + WALFactory factory = new WALFactory(conf, "pretty-printer"); + WALProvider provider = factory.getWALProvider(); + WAL.Reader log = provider.createReader(provider.createWALIdentity(p.toString()), null, true); if (log instanceof ProtobufLogReader) { List writerClsNames = ((ProtobufLogReader) log).getWriterClsNames(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 244a636..3775000 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -23,10 +23,17 @@ import java.io.IOException; import java.util.List; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.PriorityBlockingQueue; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.yetus.audience.InterfaceAudience; /** @@ -82,6 +89,11 @@ public interface WALProvider { void sync(boolean forceSync) throws IOException; void append(WAL.Entry entry) throws IOException; + /** + * @throws IOException if something goes wrong initializing an output stream + */ + void init(WALIdentity path, Configuration c, boolean overwritable, long blocksize) + throws IOException; } interface AsyncWriter extends WriterBase { @@ -113,4 +125,61 @@ public interface WALProvider { return path -> getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)) .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); } + + /** + * Streaming implementation to retrieve WAL entries from given set of Wals. This class is given a queue of WAL + * @param logQueue Queue of wals + * @param conf configuration + * @param startPosition start position for the first wal in the queue + * @param walFileSizeProvider + * @param serverName name of the server + * @param metrics metric source + * @return WALEntryStream instance + * @throws IOException + */ + WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException; + + /** + * Create a reader for the given WAL + * @param walId identity of the WAL + * @param reporter progress reporter + * @param allowCustom true if accepting custom reader classes from conf. + * @return a WAL Reader, caller must close. + */ + Reader createReader(final WALIdentity walId, CancelableProgressable reporter, + boolean allowCustom) throws IOException; + + /** + * Hook for performing recovery work needed for the given WAL. + * The implementation must prevent further appends to the WAL, identified by id, + * after this method returns. Otherwise there may be dataloss if some region server + * continues to write to the WAL. + * + * @param walId identity of the WAL + * @param conf Configuration instance + * @param reporter progress reporter + */ + void preRecovery(WALIdentity id, Configuration conf, CancelableProgressable reporter) + throws IOException; + + /** + * Creates WALIdentity for WAL path/name. + * The name should be uniquely identifying a WAL in this WALProvider. + * + * Note: this method is subject to change / removal upon future WAL refactoring + * + * @param wal the WAL + * @return WALIdentity instance for the WAL + */ + WALIdentity createWALIdentity(String wal); + + public Writer createWriter(final Configuration conf, final WALIdentity walId, + final boolean overwritable) throws IOException; + + /* + * @return start time of the underlying WAL + */ + long getWALStartTime(WALIdentity id); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index bc67d98..131948a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -140,7 +141,7 @@ public class WALSplitter { protected Map> regionMaxSeqIdInStores = new ConcurrentHashMap<>(); // the file being split currently - private FileStatus fileBeingSplit; + private WALIdentity fileBeingSplit; // if we limit the number of writers opened for sinking recovered edits private final boolean splitWriterCreationBounded; @@ -150,14 +151,18 @@ public class WALSplitter { @VisibleForTesting WALSplitter(final WALFactory factory, Configuration conf, Path walDir, - FileSystem walFS, LastSequenceId idChecker, + LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.walDir = walDir; - this.walFS = walFS; + try { + this.walFS = CommonFSUtils.getWALFileSystem(conf); + } catch (IOException ioe) { + throw new IllegalArgumentException("unable to get FS for " + walDir, ioe); + } this.sequenceIdChecker = idChecker; this.splitLogWorkerCoordination = splitLogWorkerCoordination; @@ -187,11 +192,11 @@ public class WALSplitter { *

* @return false if it is interrupted by the progress-able. */ - public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, + public static boolean splitLogFile(Path walDir, WALIdentity logfile, FileSystem walFS, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory) throws IOException { - WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, + WALSplitter s = new WALSplitter(factory, conf, walDir, idChecker, splitLogWorkerCoordination); return s.splitLogFile(logfile, reporter); } @@ -206,10 +211,11 @@ public class WALSplitter { final FileStatus[] logfiles = SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null); List splits = new ArrayList<>(); + WALProvider provider = factory.getWALProvider(); if (ArrayUtils.isNotEmpty(logfiles)) { for (FileStatus logfile: logfiles) { - WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null); - if (s.splitLogFile(logfile, null)) { + WALSplitter s = new WALSplitter(factory, conf, rootDir, null, null); + if (s.splitLogFile(provider.createWALIdentity(logfile.getPath().toString()), null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { splits.addAll(s.outputSink.splits); @@ -228,27 +234,26 @@ public class WALSplitter { * @param logfile should be an actual log file. */ @VisibleForTesting - boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException { + boolean splitLogFile(WALIdentity logfile, CancelableProgressable reporter) throws IOException { Preconditions.checkState(status == null); - Preconditions.checkArgument(logfile.isFile(), - "passed in file status is for something other than a regular file."); boolean isCorrupted = false; boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", SPLIT_SKIP_ERRORS_DEFAULT); int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); - Path logPath = logfile.getPath(); + String logPath = logfile.getName(); boolean outputSinkStarted = false; boolean progress_failed = false; int editsCount = 0; int editsSkipped = 0; status = TaskMonitor.get().createStatus( - "Splitting log file " + logfile.getPath() + "into a temporary staging area."); + "Splitting log file " + logPath + "into a temporary staging area."); Reader logFileReader = null; this.fileBeingSplit = logfile; try { - long logLength = logfile.getLen(); - LOG.info("Splitting WAL={}, length={}", logPath, logLength); + // long logLength = logfile.getLen(); + LOG.info("Splitting WAL={}, length={}", logPath //, logLength + ); status.setStatus("Opening log file"); if (reporter != null && !reporter.progress()) { progress_failed = true; @@ -266,7 +271,7 @@ public class WALSplitter { outputSinkStarted = true; Entry entry; Long lastFlushedSequenceId = -1L; - while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) { + while ((entry = getNextLogLine(logFileReader, logfile, skipErrors)) != null) { byte[] region = entry.getKey().getEncodedRegionName(); String encodedRegionNameAsStr = Bytes.toString(region); lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); @@ -323,10 +328,10 @@ public class WALSplitter { LOG.warn("Could not parse, corrupted WAL={}", logPath, e); if (splitLogWorkerCoordination != null) { // Some tests pass in a csm of null. - splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); + splitLogWorkerCoordination.markCorrupted(walDir, logPath, walFS); } else { // for tests only - ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); + ZKSplitLog.markCorrupted(walDir, logPath, walFS); } isCorrupted = true; } catch (IOException e) { @@ -352,7 +357,7 @@ public class WALSplitter { String msg = "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions() + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath + - ", length=" + logfile.getLen() + // See if length got updated post lease recovery + //", length=" + logfile.getLen() + // See if length got updated post lease recovery ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed; LOG.info(msg); status.markComplete(msg); @@ -715,39 +720,37 @@ public class WALSplitter { * @throws IOException * @throws CorruptedLogFileException */ - protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter) + protected Reader getReader(WALIdentity file, boolean skipErrors, CancelableProgressable reporter) throws IOException, CorruptedLogFileException { - Path path = file.getPath(); - long length = file.getLen(); Reader in; // Check for possibly empty file. With appends, currently Hadoop reports a // zero length even if the file has been sync'd. Revisit if HDFS-376 or // HDFS-878 is committed. + /* long length = file.getLen(); if (length <= 0) { LOG.warn("File {} might be still open, length is 0", path); - } + } */ try { - FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter); try { - in = getReader(path, reporter); + in = getReader(file, reporter); } catch (EOFException e) { - if (length <= 0) { + /* if (length <= 0) { // TODO should we ignore an empty, not-last log file if skip.errors // is false? Either way, the caller should decide what to do. E.g. // ignore if this is the last log in sequence. // TODO is this scenario still possible if the log has been // recovered (i.e. closed) LOG.warn("Could not open {} for reading. File is empty", path, e); - } + } */ // EOFException being ignored return null; } } catch (IOException e) { if (e instanceof FileNotFoundException) { // A wal file may not exist anymore. Nothing can be recovered so move on - LOG.warn("File {} does not exist anymore", path, e); + LOG.warn("File {} does not exist anymore", file, e); return null; } if (!skipErrors || e instanceof InterruptedIOException) { @@ -755,14 +758,14 @@ public class WALSplitter { } CorruptedLogFileException t = new CorruptedLogFileException("skipErrors=true Could not open wal " + - path + " ignoring"); + file + " ignoring"); t.initCause(e); throw t; } return in; } - static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) + static private Entry getNextLogLine(Reader in, WALIdentity path, boolean skipErrors) throws CorruptedLogFileException, IOException { try { return in.next(); @@ -796,15 +799,19 @@ public class WALSplitter { */ protected Writer createWriter(Path logfile) throws IOException { - return walFactory.createRecoveredEditsWriter(walFS, logfile); + WALProvider provider = walFactory.getWALProvider(); + return provider.createWriter(conf, provider.createWALIdentity(logfile.toString()), true); } /** * Create a new {@link Reader} for reading logs to split. * @return new Reader instance, caller should close */ - protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { - return walFactory.createReader(walFS, curLogFile, reporter); + protected Reader getReader(WALIdentity curLogFile, CancelableProgressable reporter) + throws IOException { + WALProvider provider = walFactory.getWALProvider(); + provider.preRecovery(curLogFile, conf, reporter); + return provider.createReader(curLogFile, reporter, true); } /** @@ -1282,7 +1289,9 @@ public class WALSplitter { private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException { long dstMinLogSeqNum = -1L; - try (WAL.Reader reader = walFactory.createReader(walFS, dst)) { + WALProvider provider = walFactory.getWALProvider(); + try (WAL.Reader reader = provider.createReader(provider.createWALIdentity(dst.toString()), + null, true)) { WAL.Entry entry = reader.next(); if (entry != null) { dstMinLogSeqNum = entry.getKey().getSequenceId(); @@ -1508,7 +1517,7 @@ public class WALSplitter { String tmpDirName = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); Path regionedits = getRegionSplitEditsPath(entry, - fileBeingSplit.getPath().getName(), tmpDirName, conf); + fileBeingSplit.getName(), tmpDirName, conf); if (regionedits == null) { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java index e657d9c..46d6dd6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALUtil; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -93,8 +94,8 @@ public class TestSequenceIdMonotonicallyIncreasing { private long getMaxSeqId(HRegionServer rs, RegionInfo region) throws IOException { Path walFile = ((AbstractFSWAL) rs.getWAL(null)).getCurrentFileName(); long maxSeqId = -1L; - try (WAL.Reader reader = - WALFactory.createReader(UTIL.getTestFileSystem(), walFile, UTIL.getConfiguration())) { + try (WAL.Reader reader = WALUtil.createReader(UTIL.getConfiguration(), "seq-increasing", + walFile)) { for (;;) { WAL.Entry entry = reader.next(); if (entry == null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java index 1da31da..1e21db5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java @@ -24,13 +24,13 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,13 +148,13 @@ public class SampleRegionWALCoprocessor implements WALCoprocessor, RegionCoproce @Override public void preWALRoll(ObserverContext ctx, - Path oldPath, Path newPath) throws IOException { + WALIdentity oldPath, WALIdentity newPath) throws IOException { preWALRollCalled = true; } @Override public void postWALRoll(ObserverContext ctx, - Path oldPath, Path newPath) throws IOException { + WALIdentity oldPath, WALIdentity newPath) throws IOException { postWALRollCalled = true; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java index ad2b2d4..3f274b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -155,7 +156,8 @@ public class TestBlockReorderMultiBlocks { // listen for successful log rolls final WALActionsListener listener = new WALActionsListener() { @Override - public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { + public void postLogRoll(final WALIdentity oldPath, final WALIdentity newPath) + throws IOException { latch.countDown(); } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index a11064d..e52aec3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -80,7 +80,9 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.After; @@ -639,7 +641,7 @@ public abstract class AbstractTestDLS { private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException { int count = 0; - try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) { + try (WAL.Reader in = WALUtil.createReader(conf, "count-wal", log)) { WAL.Entry e; while ((e = in.next()) != null) { if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 1490653..0f953f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.junit.After; import org.junit.Before; @@ -125,6 +126,11 @@ public class TestFailedAppendAndSync { final Writer w = super.createWriterInstance(path); return new Writer() { @Override + public void init(WALIdentity path, Configuration c, boolean overwritable, long blocksize) + throws IOException { + } + + @Override public void close() throws IOException { w.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index b2d9a1b..4de326e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -140,7 +140,6 @@ import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource; -import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.test.MetricsAssertHelper; @@ -158,10 +157,12 @@ import org.apache.hadoop.hbase.wal.FaultyFSLog; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALUtil; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -689,7 +690,7 @@ public class TestHRegion { for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = WALUtil.createRecoveredEditsWriter(CONF, wals, recoveredEdits); long time = System.nanoTime(); WALEdit edit = new WALEdit(); @@ -740,7 +741,8 @@ public class TestHRegion { for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = WALUtil.createRecoveredEditsWriter(CONF, + wals, recoveredEdits); long time = System.nanoTime(); WALEdit edit = new WALEdit(); @@ -826,7 +828,8 @@ public class TestHRegion { for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = WALUtil.createRecoveredEditsWriter(CONF, + wals, recoveredEdits); long time = System.nanoTime(); WALEdit edit = null; @@ -931,14 +934,16 @@ public class TestHRegion { storeFiles, Lists.newArrayList(newFile), region.getRegionFileSystem().getStoreDir(Bytes.toString(family))); - WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(), + org.apache.hadoop.hbase.regionserver.wal.WALUtil.writeCompactionMarker( + region.getWAL(), this.region.getReplicationScope(), this.region.getRegionInfo(), compactionDescriptor, region.getMVCC()); Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = WALUtil.createRecoveredEditsWriter(CONF, + wals, recoveredEdits); long time = System.nanoTime(); @@ -1018,8 +1023,7 @@ public class TestHRegion { // now verify that the flush markers are written wal.shutdown(); - WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal), - TEST_UTIL.getConfiguration()); + WAL.Reader reader = WALUtil.createReader(wals, AbstractFSWALProvider.getCurrentFileName(wal)); try { List flushDescriptors = new ArrayList<>(); long lastFlushSeqId = -1; @@ -1063,7 +1067,8 @@ public class TestHRegion { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = WALUtil.createRecoveredEditsWriter(CONF, + wals, recoveredEdits); for (WAL.Entry entry : flushDescriptors) { writer.append(entry); @@ -1162,6 +1167,11 @@ public class TestHRegion { } @Override + public void init(WALIdentity path, Configuration c, boolean overwritable, long blocksize) + throws IOException { + } + + @Override public void sync(boolean forceSync) throws IOException { w.sync(forceSync); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 308dc03..d85112f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; +import org.apache.hadoop.hbase.wal.WALUtil; import org.apache.hadoop.util.StringUtils; import org.junit.After; import org.junit.Before; @@ -307,9 +308,7 @@ public class TestHRegionReplayEvents { } WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException { - return WALFactory.createReader(TEST_UTIL.getTestFileSystem(), - AbstractFSWALProvider.getCurrentFileName(walPrimary), - TEST_UTIL.getConfiguration()); + return WALUtil.createReader(wals, AbstractFSWALProvider.getCurrentFileName(walPrimary)); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java index 34f6ca1..034026c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java @@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALUtil; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -172,7 +172,7 @@ public class TestRecoveredEdits { // Based on HRegion#replayRecoveredEdits WAL.Reader reader = null; try { - reader = WALFactory.createReader(fs, edits, conf); + reader = WALUtil.createReader(conf, "replay", edits); WAL.Entry entry; while ((entry = reader.next()) != null) { WALKey key = entry.getKey(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java index 7aeff84..dd5e5a0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALUtil; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -155,8 +156,7 @@ public class TestRecoveredEditsReplayAndAbort { String.format("%019d", i)); LOG.info("Begin to write recovered.edits : " + recoveredEdits); fs.create(recoveredEdits); - WALProvider.Writer writer = wals - .createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = WALUtil.createRecoveredEditsWriter(CONF, wals, recoveredEdits); for (long j = i; j < i + 100; j++) { long time = System.nanoTime(); WALEdit edit = new WALEdit(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index 0c38ee3..5926102 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -260,7 +261,8 @@ public class TestSplitLogWorker { CreateMode.PERSISTENT); SplitLogWorker slw = - new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, + new WALFactory(TEST_UTIL.getConfiguration(), "acquire-task"), neverEndingTask); slw.start(); try { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME); @@ -295,10 +297,13 @@ public class TestSplitLogWorker { Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); RegionServerServices mockedRS1 = getRegionServer(SVR1); RegionServerServices mockedRS2 = getRegionServer(SVR2); + WALFactory factory = new WALFactory(TEST_UTIL.getConfiguration(), "race-for-task"); SplitLogWorker slw1 = - new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, + factory, neverEndingTask); SplitLogWorker slw2 = - new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, + factory, neverEndingTask); slw1.start(); slw2.start(); try { @@ -325,7 +330,8 @@ public class TestSplitLogWorker { final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task"); RegionServerServices mockedRS = getRegionServer(SRV); SplitLogWorker slw = - new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, + new WALFactory(TEST_UTIL.getConfiguration(), "preempt-task"), neverEndingTask); slw.start(); try { Thread.yield(); // let the worker start @@ -358,7 +364,8 @@ public class TestSplitLogWorker { final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task"); RegionServerServices mockedRS = getRegionServer(SRV); SplitLogWorker slw = - new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); + new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, + new WALFactory(TEST_UTIL.getConfiguration(), "multiple-tasks"), neverEndingTask); slw.start(); try { Thread.yield(); // let the worker start @@ -400,7 +407,8 @@ public class TestSplitLogWorker { SplitLogCounters.resetCounters(); final ServerName SRV = ServerName.valueOf("svr,1,1"); RegionServerServices mockedRS = getRegionServer(SRV); - slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); + slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, + new WALFactory(TEST_UTIL.getConfiguration(), "rescan"), neverEndingTask); slw.start(); Thread.yield(); // let the worker start Thread.sleep(100); @@ -463,7 +471,8 @@ public class TestSplitLogWorker { Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } - SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); + SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, + new WALFactory(TEST_UTIL.getConfiguration(), "acquire-multi-task"), neverEndingTask); slw.start(); try { waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME); @@ -506,7 +515,8 @@ public class TestSplitLogWorker { Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } - SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); + SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, + new WALFactory(TEST_UTIL.getConfiguration(), "avg-tasks"), neverEndingTask); slw.start(); try { int acquiredTasks = 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 84b8d6c..9054e30 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider.Writer; @@ -170,6 +171,10 @@ public class TestWALLockup { public void close() throws IOException { w.close(); } + @Override + public void init(WALIdentity path, Configuration c, boolean overwritable, long blocksize) + throws IOException { + } @Override public void sync(boolean forceSync) throws IOException { @@ -361,6 +366,10 @@ public class TestWALLockup { public void close() throws IOException { w.close(); } + @Override + public void init(WALIdentity path, Configuration c, boolean overwritable, long blocksize) + throws IOException { + } @Override public void sync(boolean forceSync) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java index 599260b..ced0547 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java @@ -25,7 +25,6 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -47,6 +46,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALUtil; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -85,7 +85,6 @@ public class TestWALMonotonicallyIncreasingSeqId { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static Path testDir = TEST_UTIL.getDataTestDir("TestWALMonotonicallyIncreasingSeqId"); private WALFactory wals; - private FileSystem fileSystem; private Configuration walConf; private HRegion region; @@ -117,7 +116,6 @@ public class TestWALMonotonicallyIncreasingSeqId { RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKey) .setEndKey(stopKey).setReplicaId(replicaId).setRegionId(0).build(); - fileSystem = tableDir.getFileSystem(conf); final Configuration walConf = new Configuration(conf); FSUtils.setRootDir(walConf, tableDir); this.walConf = walConf; @@ -203,9 +201,9 @@ public class TestWALMonotonicallyIncreasingSeqId { private WAL.Reader createReader(Path logPath, Path oldWalsDir) throws IOException { try { - return wals.createReader(fileSystem, logPath); + return WALUtil.createReader(wals, logPath); } catch (IOException e) { - return wals.createReader(fileSystem, new Path(oldWalsDir, logPath.getName())); + return WALUtil.createReader(wals, new Path(oldWalsDir, logPath.getName())); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java index 9322c5e..fd9254d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; @@ -30,6 +29,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -130,10 +130,10 @@ public abstract class AbstractTestLogRollPeriod { private void checkMinLogRolls(final WAL log, final int minRolls) throws Exception { - final List paths = new ArrayList<>(); + final List paths = new ArrayList<>(); log.registerWALActionsListener(new WALActionsListener() { @Override - public void postLogRoll(Path oldFile, Path newFile) { + public void postLogRoll(WALIdentity oldFile, WALIdentity newFile) { LOG.debug("postLogRoll: oldFile="+oldFile+" newFile="+newFile); paths.add(newFile); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java index 5098609..100c7b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WALUtil; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -118,7 +119,7 @@ public abstract class AbstractTestProtobufLog { try (WALProvider.Writer writer = createWriter(path)) { ProtobufLogTestHelper.doWrite(writer, withTrailer, tableName, columnCount, recordCount, row, timestamp); - try (ProtobufLogReader reader = (ProtobufLogReader) wals.createReader(fs, path)) { + try (ProtobufLogReader reader = (ProtobufLogReader) WALUtil.createReader(wals, path)) { ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row, timestamp); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 3f9040b..3a1c170 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -899,8 +899,8 @@ public abstract class AbstractTestWALReplay { FileStatus[] listStatus = wal.getFiles(); assertNotNull(listStatus); assertTrue(listStatus.length > 0); - WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], - this.fs, this.conf, null, null, null, wals); + WALSplitter.splitLogFile(hbaseRootDir, wals.getWALProvider().createWALIdentity( + listStatus[0].getPath().toString()), this.fs, this.conf, null, null, null, wals); FileStatus[] listStatus1 = this.fs.listStatus( new Path(FSUtils.getWALTableDir(conf, tableName), new Path(hri.getEncodedName(), "recovered.edits")), new PathFilter() { @@ -1053,8 +1053,10 @@ public abstract class AbstractTestWALReplay { first = fs.getFileStatus(smallFile); second = fs.getFileStatus(largeFile); } - WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals); - WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals); + WALSplitter.splitLogFile(hbaseRootDir, wals.getWALProvider().createWALIdentity( + first.getPath().toString()), fs, conf, null, null, null, wals); + WALSplitter.splitLogFile(hbaseRootDir, wals.getWALProvider().createWALIdentity( + second.getPath().toString()), fs, conf, null, null, null, wals); WAL wal = createWAL(this.conf, hbaseRootDir, logName); region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); @@ -1229,7 +1231,8 @@ public abstract class AbstractTestWALReplay { StreamLacksCapabilityException { fs.mkdirs(file.getParent()); ProtobufLogWriter writer = new ProtobufLogWriter(); - writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file)); + writer.init(wals.getWALProvider().createWALIdentity(file.toString()), conf, true, + WALUtil.getWALBlockSize(conf, fs, file)); for (FSWALEntry entry : entries) { writer.append(entry); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java index f73b4f1..a45eabe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALUtil; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -112,11 +113,11 @@ public class TestCombinedAsyncWriter { CombinedAsyncWriter writer = CombinedAsyncWriter.create(writer1, writer2)) { ProtobufLogTestHelper.doWrite(new WriterOverAsyncWriter(writer), withTrailer, tableName, columnCount, recordCount, row, timestamp); - try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path1)) { + try (ProtobufLogReader reader = (ProtobufLogReader) WALUtil.createReader(WALS, path1)) { ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row, timestamp); } - try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path2)) { + try (ProtobufLogReader reader = (ProtobufLogReader) WALUtil.createReader(WALS, path2)) { ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row, timestamp); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java index 4effa6d..c7af0fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.AfterClass; @@ -277,7 +278,7 @@ public class TestDurability { private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception { Path walPath = AbstractFSWALProvider.getCurrentFileName(log); - WAL.Reader reader = wals.createReader(FS, walPath); + WAL.Reader reader = WALUtil.createReader(wals, walPath); int count = 0; WAL.Entry entry = new WAL.Entry(); while (reader.next(entry) != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index e19361e..31c6773 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; @@ -52,8 +51,11 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALUtil; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.junit.BeforeClass; @@ -251,20 +253,20 @@ public class TestLogRolling extends AbstractTestLogRolling { server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); final WAL log = server.getWAL(region); - final List paths = new ArrayList<>(1); + final List paths = new ArrayList<>(1); final List preLogRolledCalled = new ArrayList<>(); - paths.add(AbstractFSWALProvider.getCurrentFileName(log)); + paths.add(new FSWALIdentity(AbstractFSWALProvider.getCurrentFileName(log))); log.registerWALActionsListener(new WALActionsListener() { @Override - public void preLogRoll(Path oldFile, Path newFile) { + public void preLogRoll(WALIdentity oldFile, WALIdentity newFile) { LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile); preLogRolledCalled.add(new Integer(1)); } @Override - public void postLogRoll(Path oldFile, Path newFile) { + public void postLogRoll(WALIdentity oldFile, WALIdentity newFile) { paths.add(newFile); } }); @@ -315,15 +317,16 @@ public class TestLogRolling extends AbstractTestLogRolling { // read back the data written Set loggedRows = new HashSet<>(); FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration()); - for (Path p : paths) { + for (WALIdentity wi : paths) { + FSWALIdentity p = (FSWALIdentity)wi; LOG.debug("recovering lease for " + p); - fsUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), - null); + fsUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p.getPath(), + TEST_UTIL.getConfiguration(), null); - LOG.debug("Reading WAL " + FSUtils.getPath(p)); + LOG.debug("Reading WAL " + FSUtils.getPath(p.getPath())); WAL.Reader reader = null; try { - reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration()); + reader = WALUtil.createReader(TEST_UTIL.getConfiguration(), "rolling", p.getPath()); WAL.Entry entry; while ((entry = reader.next()) != null) { LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells()); @@ -333,7 +336,7 @@ public class TestLogRolling extends AbstractTestLogRolling { } } } catch (EOFException e) { - LOG.debug("EOF reading file " + FSUtils.getPath(p)); + LOG.debug("EOF reading file " + FSUtils.getPath(p.getPath())); } finally { if (reader != null) reader.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java index d429a01..a3bb6d2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java @@ -22,8 +22,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WALProvider.Writer; +import org.apache.hadoop.hbase.wal.WALUtil; import org.junit.ClassRule; import org.junit.experimental.categories.Category; @@ -36,6 +36,6 @@ public class TestProtobufLog extends AbstractTestProtobufLog { @Override protected Writer createWriter(Path path) throws IOException { - return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false); + return WALUtil.createWriter(TEST_UTIL.getConfiguration(), "protobuf-log", path); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index 0967a75..d140a5c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.After; import org.junit.Before; @@ -142,12 +143,12 @@ public class TestWALActionsListener { public int closedCount = 0; @Override - public void preLogRoll(Path oldFile, Path newFile) { + public void preLogRoll(WALIdentity oldFile, WALIdentity newFile) { preLogRollCounter++; } @Override - public void postLogRoll(Path oldFile, Path newFile) { + public void postLogRoll(WALIdentity oldFile, WALIdentity newFile) { postLogRollCounter++; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java index 9d938b0..bf1aa0a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; @@ -35,6 +37,11 @@ class WriterOverAsyncWriter implements WALProvider.Writer { } @Override + public void init(WALIdentity path, Configuration c, boolean overwritable, long blocksize) + throws IOException { + } + + @Override public void close() throws IOException { asyncWriter.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 67f793d..5c80297 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; @@ -32,7 +31,9 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterfa import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALIdentity; /** * Source that does nothing at all, helpful to test ReplicationSourceManager @@ -42,31 +43,33 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { private ReplicationSourceManager manager; private ReplicationPeer replicationPeer; private String peerClusterId; - private Path currentPath; + private WALIdentity currentPath; private MetricsSource metrics; private WALFileLengthProvider walFileLengthProvider; private AtomicBoolean startup = new AtomicBoolean(false); + private WALProvider walProvider; @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, - UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) - throws IOException { + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics, + WALProvider walProvider) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; this.metrics = metrics; this.walFileLengthProvider = walFileLengthProvider; this.replicationPeer = rp; + this.walProvider = walProvider; } @Override - public void enqueueLog(Path log) { + public void enqueueLog(WALIdentity log) { this.currentPath = log; metrics.incrSizeOfLogQueue(); } @Override - public Path getCurrentPath() { + public WALIdentity getCurrentWALIdentity() { return this.currentPath; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index 1b98518..05fea44 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WALUtil; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -69,6 +70,7 @@ public class SerialReplicationTestBase { public final TestName name = new TestName(); protected Path logPath; + protected static WALFactory factory; public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { @@ -126,6 +128,7 @@ public class SerialReplicationTestBase { LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); FS = UTIL.getTestFileSystem(); FS.mkdirs(LOG_DIR); + factory = new WALFactory(UTIL.getConfiguration(), "repl-base"); } @AfterClass @@ -184,7 +187,7 @@ public class SerialReplicationTestBase { protected final void setupWALWriter() throws IOException { logPath = new Path(LOG_DIR, name.getMethodName()); - WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); + WRITER = WALUtil.createWriter(UTIL.getConfiguration(), factory, logPath); } protected final void waitUntilReplicationDone(int expectedEntries) throws Exception { @@ -192,7 +195,7 @@ public class SerialReplicationTestBase { @Override public boolean evaluate() throws Exception { - try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { + try (WAL.Reader reader = WALUtil.createReader(factory, logPath)) { int count = 0; while (reader.next() != null) { count++; @@ -224,8 +227,7 @@ public class SerialReplicationTestBase { } protected final void checkOrder(int expectedEntries) throws IOException { - try (WAL.Reader reader = - WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { + try (WAL.Reader reader = WALUtil.createReader(factory, logPath)) { long seqId = -1L; int count = 0; for (Entry entry;;) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index 37ca7dc..400f981 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -695,7 +696,8 @@ public class TestMasterReplication { // listen for successful log rolls final WALActionsListener listener = new WALActionsListener() { @Override - public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { + public void postLogRoll(final WALIdentity oldPath, final WALIdentity newPath) + throws IOException { latch.countDown(); } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index 225ca7f..5c2a763 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.client.Admin; @@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.BeforeClass; @@ -219,7 +219,8 @@ public class TestMultiSlaveReplication { // listen for successful log rolls final WALActionsListener listener = new WALActionsListener() { @Override - public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { + public void postLogRoll(final WALIdentity oldPath, final WALIdentity newPath) + throws IOException { latch.countDown(); } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java index 4effe41..4e1616a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java @@ -33,6 +33,9 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -58,18 +61,21 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { Waiter.waitFor(conf1, 10000, new Waiter.Predicate() { @Override public boolean evaluate() throws Exception { + WALFactory factory = new WALFactory(conf1, "empty-wal-recovery"); + WALProvider provider = factory.getWALProvider(); for (int i = 0; i < numRs; i++) { HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i); RegionInfo regionInfo = utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); Path currentFile = ((AbstractFSWAL) wal).getCurrentFileName(); + WALIdentity walId = provider.createWALIdentity(currentFile.toString()); Replication replicationService = (Replication) utility1.getHBaseCluster() .getRegionServer(i).getReplicationSourceService(); for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() .getSources()) { ReplicationSource source = (ReplicationSource) rsi; - if (!currentFile.equals(source.getCurrentPath())) { + if (!walId.equals(source.getCurrentWALIdentity())) { return false; } } @@ -97,6 +103,8 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { emptyWalPaths.add(emptyWalPath); } + WALFactory factory = new WALFactory(conf1, "empty-wal-recovery"); + WALProvider provider = factory.getWALProvider(); // inject our empty wal into the replication queue, and then roll the original wal, which // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to // determine if the file being replicated currently is still opened for write, so just inject a @@ -104,8 +112,10 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { for (int i = 0; i < numRs; i++) { HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i); Replication replicationService = (Replication) hrs.getReplicationSourceService(); - replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i)); - replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i)); + WALIdentity id = provider.createWALIdentity( + emptyWalPaths.get(i).toString()); + replicationService.getReplicationManager().preLogRoll(id); + replicationService.getReplicationManager().postLogRoll(id); RegionInfo regionInfo = utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java index 8ff4d84..8d229aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication; import java.util.Map; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Get; @@ -28,6 +27,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; @@ -74,7 +74,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase { } rs = utility1.getRSForFirstRegionInTable(tableName); metrics = rs.getWalGroupsReplicationStatus(); - Path lastPath = null; + WALIdentity lastPath = null; for (Map.Entry metric : metrics.entrySet()) { lastPath = metric.getValue().getCurrentPath(); Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java index 07e626b..a5e36cb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALUtil; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -107,8 +108,8 @@ public class TestSerialReplication extends SerialReplicationTestBase { Map regionsToSeqId = new HashMap<>(); regionsToSeqId.put(region.getEncodedName(), -1L); regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L)); - try (WAL.Reader reader = - WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { + try (WAL.Reader reader = WALUtil.createReader(UTIL.getConfiguration(), "region-split", + logPath)) { int count = 0; for (Entry entry;;) { entry = reader.next(); @@ -168,8 +169,8 @@ public class TestSerialReplication extends SerialReplicationTestBase { RegionInfo region = regionsAfterMerge.get(0); regionsToSeqId.put(region.getEncodedName(), -1L); regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L)); - try (WAL.Reader reader = - WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { + try (WAL.Reader reader = WALUtil.createReader(UTIL.getConfiguration(), "region-merge", + logPath)) { int count = 0; for (Entry entry;;) { entry = reader.next(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java index 42adab6..b6d973a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; -import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALUtil; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; @@ -124,7 +124,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { Assert.assertTrue(files.length > 0); for (FileStatus file : files) { try ( - Reader reader = WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) { + Reader reader = WALUtil.createReader(utility.getConfiguration(), "sync-repl-active", + file.getPath())) { Entry entry = reader.next(); Assert.assertTrue(entry != null); while (entry != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java index d01a0ac..17c8224 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java @@ -52,7 +52,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -157,10 +159,13 @@ public class TestRecoverStandbyProcedure { if (!fs.exists(peerRemoteWALDir)) { fs.mkdirs(peerRemoteWALDir); } + WALFactory factory = new WALFactory(conf, "standby"); + WALProvider provider = factory.getWALProvider(); for (int i = 0; i < WAL_NUMBER; i++) { try (ProtobufLogWriter writer = new ProtobufLogWriter()) { Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep"); - writer.init(fs, wal, conf, true, WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir)); + writer.init(provider.createWALIdentity(wal.toString()), conf, true, + WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir)); List entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT); for (Entry entry : entries) { writer.append(entry); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java index bd800a8..05c2c34 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WALUtil; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -74,6 +75,8 @@ public class TestRaceWhenCreatingReplicationSource { private static FileSystem FS; + private static WALFactory walFactory; + private static Path LOG_PATH; private static WALProvider.Writer WRITER; @@ -138,7 +141,8 @@ public class TestRaceWhenCreatingReplicationSource { Path dir = UTIL.getDataTestDirOnTestFS(); FS = UTIL.getTestFileSystem(); LOG_PATH = new Path(dir, "replicated"); - WRITER = WALFactory.createWALWriter(FS, LOG_PATH, UTIL.getConfiguration()); + walFactory = new WALFactory(UTIL.getConfiguration(), "race"); + WRITER = WALUtil.createWriter(UTIL.getConfiguration(), walFactory, LOG_PATH); UTIL.getAdmin().addReplicationPeer(PEER_ID, ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(), @@ -184,7 +188,7 @@ public class TestRaceWhenCreatingReplicationSource { @Override public boolean evaluate() throws Exception { - try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) { + try (WAL.Reader reader = WALUtil.createReader(walFactory, LOG_PATH)) { return reader.next() != null; } catch (IOException e) { return false; @@ -196,7 +200,7 @@ public class TestRaceWhenCreatingReplicationSource { return "Replication has not catched up"; } }); - try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) { + try (WAL.Reader reader = WALUtil.createReader(walFactory, LOG_PATH)) { Cell cell = reader.next().getEdit().getCells().get(0); assertEquals(1, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); assertArrayEquals(CF, CellUtil.cloneFamily(cell)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 274ccab..be58eac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -57,11 +57,14 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WALUtil; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -118,8 +121,8 @@ public class TestReplicationSource { Path logPath = new Path(logDir, "log"); if (!FS.exists(logDir)) FS.mkdirs(logDir); if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir); - WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath, - TEST_UTIL.getConfiguration()); + WALProvider.Writer writer = WALUtil.createWriter(TEST_UTIL.getConfiguration(), "moving", + logPath); for(int i = 0; i < 3; i++) { byte[] b = Bytes.toBytes(Integer.toString(i)); KeyValue kv = new KeyValue(b,b,b); @@ -132,7 +135,7 @@ public class TestReplicationSource { } writer.close(); - WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration()); + WAL.Reader reader = WALUtil.createReader(TEST_UTIL.getConfiguration(), "moving", logPath); WAL.Entry entry = reader.next(); assertNotNull(entry); @@ -174,8 +177,8 @@ public class TestReplicationSource { testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, - p -> OptionalLong.empty(), null); + source.init(testConf, manager, null, mockPeer, null, "testPeer", null, + p -> OptionalLong.empty(), null, null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit(new Runnable() { @@ -301,8 +304,8 @@ public class TestReplicationSource { String walGroupId = "fake-wal-group-id"; ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L); ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); - PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); - queue.put(new Path("/www/html/test")); + PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); + queue.put(new FSWALIdentity(new Path("/www/html/test"))); RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class); Server server = Mockito.mock(Server.class); Mockito.when(server.getServerName()).thenReturn(serverName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 0872ea7..e9e037e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -87,7 +87,9 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -337,7 +339,7 @@ public abstract class TestReplicationSourceManager { when(source.isRecovered()).thenReturn(false); when(source.isSyncReplication()).thenReturn(false); manager.logPositionAndCleanOldLogs(source, - new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); + new WALEntryBatch(0, manager.getSources().get(0).getCurrentWALIdentity())); wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), @@ -572,7 +574,8 @@ public abstract class TestReplicationSourceManager { assertNotNull(source); final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); // Enqueue log and check if metrics updated - source.enqueueLog(new Path("abc")); + WALFactory factory = new WALFactory(utility.getConfiguration(), "remove-peer"); + source.enqueueLog(factory.getWALProvider().createWALIdentity((new Path("abc")).toString())); assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); @@ -614,13 +617,17 @@ public abstract class TestReplicationSourceManager { ReplicationPeerConfig.newBuilder() .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build(), true); + WALFactory wals = + new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8")); + WALProvider provider = wals.getWALProvider(); try { // make sure that we can deal with files which does not exist String walNameNotExists = "remoteWAL-12345-" + slaveId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX; Path wal = new Path(logDir, walNameNotExists); - manager.preLogRoll(wal); - manager.postLogRoll(wal); + WALIdentity walId = provider.createWALIdentity(wal.toString()); + manager.preLogRoll(walId); + manager.postLogRoll(walId); Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId); fs.mkdirs(remoteLogDirForPeer); @@ -630,8 +637,8 @@ public abstract class TestReplicationSourceManager { new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); fs.create(remoteWAL).close(); wal = new Path(logDir, walName); - manager.preLogRoll(wal); - manager.postLogRoll(wal); + manager.preLogRoll(walId); + manager.postLogRoll(walId); ReplicationSourceInterface source = mockReplicationSource(peerId2); manager.cleanOldLogs(walName, true, source); @@ -649,13 +656,16 @@ public abstract class TestReplicationSourceManager { @Test public void testSameWALPrefix() throws IOException { Set latestWalsBefore = - manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet()); + manager.getLastestPath().stream().map(WALIdentity::getName).collect(Collectors.toSet()); + WALFactory wals = + new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8")); + WALProvider provider = wals.getWALProvider(); String walName1 = "localhost,8080,12345-45678-Peer.34567"; String walName2 = "localhost,8080,12345.56789"; - manager.preLogRoll(new Path(walName1)); - manager.preLogRoll(new Path(walName2)); + manager.preLogRoll(provider.createWALIdentity((new Path(walName1)).toString())); + manager.preLogRoll(provider.createWALIdentity((new Path(walName2)).toString())); - Set latestWals = manager.getLastestPath().stream().map(Path::getName) + Set latestWals = manager.getLastestPath().stream().map(WALIdentity::getName) .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet()); assertEquals(2, latestWals.size()); assertTrue(latestWals.contains(walName1)); @@ -819,10 +829,10 @@ public abstract class TestReplicationSourceManager { static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, - UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) - throws IOException { + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics, + WALProvider provider) throws IOException { throw new IOException("Failing deliberately"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index fac6f74..34b084b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -56,10 +55,12 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; @@ -98,8 +99,9 @@ public class TestWALEntryStream { } private WAL log; - PriorityBlockingQueue walQueue; + PriorityBlockingQueue walQueue; private PathWatcher pathWatcher; + private WALFactory wals; @Rule public TestName tn = new TestName(); @@ -124,7 +126,7 @@ public class TestWALEntryStream { public void setUp() throws Exception { walQueue = new PriorityBlockingQueue<>(); pathWatcher = new PathWatcher(); - final WALFactory wals = new WALFactory(CONF, tn.getMethodName()); + wals = new WALFactory(CONF, tn.getMethodName()); wals.getWALProvider().addWALActionsListener(pathWatcher); log = wals.getWAL(info); } @@ -156,7 +158,8 @@ public class TestWALEntryStream { log.rollWriter(); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"), + wals.getWALProvider())) { int i = 0; while (entryStream.hasNext()) { assertNotNull(entryStream.next()); @@ -183,7 +186,8 @@ public class TestWALEntryStream { appendToLogAndSync(); long oldPos; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"), + wals.getWALProvider())) { // There's one edit in the log, read it. Reading past it needs to throw exception assertTrue(entryStream.hasNext()); WAL.Entry entry = entryStream.peek(); @@ -197,8 +201,8 @@ public class TestWALEntryStream { appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos, - log, null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, oldPos, + log, null, new MetricsSource("1"), wals.getWALProvider())) { // Read the newly added entry, make sure we made progress WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); @@ -211,8 +215,8 @@ public class TestWALEntryStream { log.rollWriter(); appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos, - log, null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, oldPos, + log, null, new MetricsSource("1"), wals.getWALProvider())) { WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); assertNotNull(entry); @@ -237,7 +241,8 @@ public class TestWALEntryStream { appendToLog("1"); appendToLog("2");// 2 try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"), + wals.getWALProvider())) { assertEquals("1", getRow(entryStream.next())); appendToLog("3"); // 3 - comes in after reader opened @@ -262,7 +267,8 @@ public class TestWALEntryStream { public void testNewEntriesWhileStreaming() throws Exception { appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"), + wals.getWALProvider())) { entryStream.next(); // we've hit the end of the stream at this point // some new entries come in while we're streaming @@ -285,7 +291,8 @@ public class TestWALEntryStream { long lastPosition = 0; appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"), + wals.getWALProvider())) { entryStream.next(); // we've hit the end of the stream at this point appendToLog("2"); appendToLog("3"); @@ -293,7 +300,8 @@ public class TestWALEntryStream { } // next stream should picks up where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, lastPosition, log, null, new MetricsSource("1"), + wals.getWALProvider())) { assertEquals("2", getRow(entryStream.next())); assertEquals("3", getRow(entryStream.next())); assertFalse(entryStream.hasNext()); // done @@ -310,14 +318,15 @@ public class TestWALEntryStream { long lastPosition = 0; appendEntriesToLogAndSync(3); // read only one element - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, lastPosition, - log, null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, lastPosition, + log, null, new MetricsSource("1"), wals.getWALProvider())) { entryStream.next(); lastPosition = entryStream.getPosition(); } // there should still be two more entries from where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, lastPosition, log, null, new MetricsSource("1"), + wals.getWALProvider())) { assertNotNull(entryStream.next()); assertNotNull(entryStream.next()); assertFalse(entryStream.hasNext()); @@ -328,7 +337,8 @@ public class TestWALEntryStream { @Test public void testEmptyStream() throws Exception { try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"), + wals.getWALProvider())) { assertFalse(entryStream.hasNext()); } } @@ -361,7 +371,8 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"), + wals.getWALProvider())) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -369,7 +380,7 @@ public class TestWALEntryStream { } // start up a reader - Path walPath = walQueue.peek(); + WALIdentity walPath = walQueue.peek(); ReplicationSourceWALReader reader = createReader(false, CONF); WALEntryBatch entryBatch = reader.take(); @@ -389,7 +400,7 @@ public class TestWALEntryStream { @Test public void testReplicationSourceWALReaderRecovered() throws Exception { appendEntriesToLogAndSync(10); - Path walPath = walQueue.peek(); + WALIdentity walPath = walQueue.peek(); log.rollWriter(); appendEntriesToLogAndSync(5); log.shutdown(); @@ -422,14 +433,14 @@ public class TestWALEntryStream { @Test public void testReplicationSourceWALReaderWrongPosition() throws Exception { appendEntriesToLogAndSync(1); - Path walPath = walQueue.peek(); + FSWALIdentity walPath = (FSWALIdentity)walQueue.peek(); log.rollWriter(); appendEntriesToLogAndSync(20); TEST_UTIL.waitFor(5000, new ExplainingPredicate() { @Override public boolean evaluate() throws Exception { - return fs.getFileStatus(walPath).getLen() > 0; + return fs.getFileStatus(walPath.getPath()).getLen() > 0; } @Override @@ -438,7 +449,7 @@ public class TestWALEntryStream { } }); - long walLength = fs.getFileStatus(walPath).getLen(); + long walLength = fs.getFileStatus(walPath.getPath()).getLen(); ReplicationSourceWALReader reader = createReader(false, CONF); @@ -449,7 +460,7 @@ public class TestWALEntryStream { assertEquals(1, entryBatch.getNbEntries()); assertTrue(entryBatch.isEndOfFile()); - Path walPath2 = walQueue.peek(); + WALIdentity walPath2 = walQueue.peek(); entryBatch = reader.take(); assertEquals(walPath2, entryBatch.getLastWalPath()); assertEquals(20, entryBatch.getNbEntries()); @@ -462,7 +473,7 @@ public class TestWALEntryStream { assertEquals(0, entryBatch.getNbEntries()); assertTrue(entryBatch.isEndOfFile()); - Path walPath3 = walQueue.peek(); + WALIdentity walPath3 = walQueue.peek(); entryBatch = reader.take(); assertEquals(walPath3, entryBatch.getLastWalPath()); assertEquals(10, entryBatch.getNbEntries()); @@ -476,7 +487,8 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"), + wals.getWALProvider())) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -484,7 +496,7 @@ public class TestWALEntryStream { } // start up a reader - Path walPath = walQueue.peek(); + WALIdentity walPath = walQueue.peek(); ReplicationSource source = mockReplicationSource(false, CONF); AtomicInteger invokeCount = new AtomicInteger(0); AtomicBoolean enabled = new AtomicBoolean(false); @@ -577,10 +589,10 @@ public class TestWALEntryStream { class PathWatcher implements WALActionsListener { - Path currentPath; + WALIdentity currentPath; @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { + public void preLogRoll(WALIdentity oldPath, WALIdentity newPath) throws IOException { walQueue.add(newPath); currentPath = newPath; } @@ -592,8 +604,9 @@ public class TestWALEntryStream { appendToLog("2"); long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong(); AtomicLong fileLength = new AtomicLong(size - 1); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 0, - p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 0, + p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"), + wals.getWALProvider())) { assertTrue(entryStream.hasNext()); assertNotNull(entryStream.next()); // can not get log 2 diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index d062c77..d337d03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -18,10 +18,6 @@ */ package org.apache.hadoop.hbase.wal; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.DEFAULT_PROVIDER_ID; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -34,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; @@ -70,7 +67,7 @@ import org.slf4j.LoggerFactory; * management over time, becaue the data set size may result in additional HDFS block allocations. */ @InterfaceAudience.Private -public class IOTestProvider implements WALProvider { +public class IOTestProvider extends AbstractFSWALProvider { private static final Logger LOG = LoggerFactory.getLogger(IOTestProvider.class); private static final String ALLOWED_OPERATIONS = "hbase.wal.iotestprovider.operations"; @@ -108,12 +105,15 @@ public class IOTestProvider implements WALProvider { this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID; } + protected void doInit(Configuration conf) throws IOException { + } + @Override public List getWALs() { return Collections.singletonList(log); } - private FSHLog createWAL() throws IOException { + protected FSHLog createWAL() throws IOException { String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId; return new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), AbstractFSWALProvider.getWALDirectoryName(factory.factoryId), @@ -122,7 +122,7 @@ public class IOTestProvider implements WALProvider { } @Override - public WAL getWAL(RegionInfo region) throws IOException { + public AbstractFSWAL getWAL(RegionInfo region) throws IOException { FSHLog log = this.log; if (log != null) { return log; @@ -210,7 +210,9 @@ public class IOTestProvider implements WALProvider { LOG.info("creating new writer instance."); final ProtobufLogWriter writer = new IOTestWriter(); try { - writer.init(fs, path, conf, false, this.blocksize); + WALFactory factory = new WALFactory(conf, "io-test-wal"); + writer.init(factory.getWALProvider().createWALIdentity(path.toString()), conf, false, + this.blocksize); } catch (CommonFSUtils.StreamLacksCapabilityException exception) { throw new IOException("Can't create writer instance because underlying FileSystem " + "doesn't support needed stream capabilities.", exception); @@ -237,7 +239,7 @@ public class IOTestProvider implements WALProvider { private boolean doSyncs; @Override - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, + public void init(WALIdentity path, Configuration conf, boolean overwritable, long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException { Collection operations = conf.getStringCollection(ALLOWED_OPERATIONS); if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) { @@ -250,7 +252,7 @@ public class IOTestProvider implements WALProvider { } LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") + " and syncs " + (doSyncs ? "enabled" : "disabled")); - super.init(fs, path, conf, overwritable, blocksize); + super.init(path, conf, overwritable, blocksize); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java index 8193806..8224b59 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java @@ -145,7 +145,7 @@ public class TestSecureWAL { assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value)); // Confirm the WAL can be read back - WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath); + WAL.Reader reader = WALUtil.createReader(wals, walPath); int count = 0; WAL.Entry entry = new WAL.Entry(); while (reader.next(entry) != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java index 8189cef..f3f6c3c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java @@ -115,12 +115,12 @@ public class TestSyncReplicationWALProvider { Path localFile = wal.getCurrentFileName(); Path remoteFile = new Path(REMOTE_WAL_DIR + "/" + PEER_ID, localFile.getName()); try (ProtobufLogReader reader = - (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) { + (ProtobufLogReader) WALUtil.createReader(FACTORY, localFile)) { ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row, timestamp); } try (ProtobufLogReader reader = - (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) { + (ProtobufLogReader) WALUtil.createReader(FACTORY, remoteFile)) { ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row, timestamp); } @@ -146,12 +146,12 @@ public class TestSyncReplicationWALProvider { } }); try (ProtobufLogReader reader = - (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) { + (ProtobufLogReader) WALUtil.createReader(FACTORY, localFile)) { ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row, timestamp); } try (ProtobufLogReader reader = - (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) { + (ProtobufLogReader) WALUtil.createReader(FACTORY, remoteFile)) { ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row, timestamp); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 8fbe09d..2cbd533 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -277,8 +277,10 @@ public class TestWALFactory { // gives you EOFE. wal.sync(); // Open a Reader. + WALProvider provider = wals.getWALProvider(); Path walPath = AbstractFSWALProvider.getCurrentFileName(wal); - reader = wals.createReader(fs, walPath); + WALIdentity walId = provider.createWALIdentity(walPath.toString()); + reader = provider.createReader(walId, null, true); int count = 0; WAL.Entry entry = new WAL.Entry(); while ((entry = reader.next(entry)) != null) count++; @@ -293,14 +295,14 @@ public class TestWALFactory { System.currentTimeMillis(), mvcc, scopes), kvs, true); } wal.sync(); - reader = wals.createReader(fs, walPath); + reader = provider.createReader(walId, null, true); count = 0; while((entry = reader.next(entry)) != null) count++; assertTrue(count >= total); reader.close(); // If I sync, should see double the edits. wal.sync(); - reader = wals.createReader(fs, walPath); + reader = provider.createReader(walId, null, true); count = 0; while((entry = reader.next(entry)) != null) count++; assertEquals(total * 2, count); @@ -316,14 +318,14 @@ public class TestWALFactory { } // Now I should have written out lots of blocks. Sync then read. wal.sync(); - reader = wals.createReader(fs, walPath); + reader = provider.createReader(walId, null, true); count = 0; while((entry = reader.next(entry)) != null) count++; assertEquals(total * 3, count); reader.close(); // shutdown and ensure that Reader gets right length also. wal.shutdown(); - reader = wals.createReader(fs, walPath); + reader = provider.createReader(walId, null, true); count = 0; while((entry = reader.next(entry)) != null) count++; assertEquals(total * 3, count); @@ -338,7 +340,7 @@ public class TestWALFactory { assertEquals(howmany * howmany, splits.size()); for (int i = 0; i < splits.size(); i++) { LOG.info("Verifying=" + splits.get(i)); - WAL.Reader reader = wals.createReader(fs, splits.get(i)); + WAL.Reader reader = WALUtil.createReader(wals, splits.get(i)); try { int count = 0; String previousRegion = null; @@ -476,7 +478,7 @@ public class TestWALFactory { throw t.exception; // Make sure you can read all the content - WAL.Reader reader = wals.createReader(fs, walPath); + WAL.Reader reader = WALUtil.createReader(wals, walPath); int count = 0; WAL.Entry entry = new WAL.Entry(); while (reader.next(entry) != null) { @@ -532,7 +534,7 @@ public class TestWALFactory { log.shutdown(); Path filename = AbstractFSWALProvider.getCurrentFileName(log); // Now open a reader on the log and assert append worked. - reader = wals.createReader(fs, filename); + reader = WALUtil.createReader(wals, filename); // Above we added all columns on a single row so we only read one // entry in the below... thats why we have '1'. for (int i = 0; i < 1; i++) { @@ -590,7 +592,7 @@ public class TestWALFactory { log.shutdown(); Path filename = AbstractFSWALProvider.getCurrentFileName(log); // Now open a reader on the log and assert append worked. - reader = wals.createReader(fs, filename); + reader = WALUtil.createReader(wals, filename); WAL.Entry entry = reader.next(); assertEquals(colCount, entry.getEdit().size()); int idx = 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java index 7d7896c..f698318 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java @@ -122,8 +122,8 @@ public class TestWALOpenAfterDNRollingStart { currentFile = new Path(oldLogDir, currentFile.getName()); } // if the log is not rolled, then we can never open this wal forever. - try (WAL.Reader reader = WALFactory.createReader(TEST_UTIL.getTestFileSystem(), currentFile, - TEST_UTIL.getConfiguration())) { + try (WAL.Reader reader = WALUtil.createReader(TEST_UTIL.getConfiguration(), "test", + currentFile)) { reader.next(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java index bc21a65..91b2cb4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java @@ -163,9 +163,10 @@ public class TestWALReaderOnSecureWAL { in.close(); assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value)); + WALProvider provider = wals.getWALProvider(); // Confirm the WAL cannot be read back by ProtobufLogReader try { - wals.createReader(TEST_UTIL.getTestFileSystem(), walPath); + provider.createReader(provider.createWALIdentity(walPath.toString()), null, true); assertFalse(true); } catch (IOException ioe) { // expected IOE @@ -174,8 +175,8 @@ public class TestWALReaderOnSecureWAL { FileStatus[] listStatus = fs.listStatus(walPath.getParent()); Path rootdir = FSUtils.getRootDir(conf); try { - WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null); - s.splitLogFile(listStatus[0], null); + WALSplitter s = new WALSplitter(wals, conf, rootdir, null, null); + s.splitLogFile(provider.createWALIdentity(listStatus[0].getPath().toString()), null); Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), "corrupt"); assertTrue(fs.exists(file)); @@ -209,7 +210,7 @@ public class TestWALReaderOnSecureWAL { // Confirm the WAL can be read back by SecureProtobufLogReader try { - WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath); + WAL.Reader reader = WALUtil.createReader(wals, walPath); reader.close(); } catch (IOException ioe) { assertFalse(true); @@ -218,8 +219,9 @@ public class TestWALReaderOnSecureWAL { FileStatus[] listStatus = fs.listStatus(walPath.getParent()); Path rootdir = FSUtils.getRootDir(conf); try { - WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null); - s.splitLogFile(listStatus[0], null); + WALSplitter s = new WALSplitter(wals, conf, rootdir, null, null); + s.splitLogFile(wals.getWALProvider().createWALIdentity(listStatus[0].getPath().toString()), + null); Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), "corrupt"); assertTrue(!fs.exists(file)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index e6644f0..d34e0c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -423,7 +423,8 @@ public class TestWALSplit { FILENAME_BEING_SPLIT, TMPDIRNAME, conf); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); - WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); + WALFactory factory = new WALFactory(conf, "sidelined"); + WALUtil.createRecoveredEditsWriter(conf, wals, p).close(); } private void useDifferentDFSClient() throws IOException { @@ -687,7 +688,7 @@ public class TestWALSplit { assertEquals(1, splitLog.length); int actualCount = 0; - Reader in = wals.createReader(fs, splitLog[0]); + Reader in = WALUtil.createReader(wals, splitLog[0]); @SuppressWarnings("unused") Entry entry; while ((entry = in.next()) != null) ++actualCount; @@ -810,7 +811,7 @@ public class TestWALSplit { } assertTrue("There should be some log greater than size 0.", 0 < largestSize); // Set up a splitter that will throw an IOE on the output side - WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) { + WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, null, null) { @Override protected Writer createWriter(Path logfile) throws IOException { Writer mockWriter = Mockito.mock(Writer.class); @@ -843,7 +844,8 @@ public class TestWALSplit { t.setDaemon(true); t.start(); try { - logSplitter.splitLogFile(logfiles[largestLogFile], null); + logSplitter.splitLogFile(wals.getWALProvider().createWALIdentity( + logfiles[largestLogFile].toString()), null); fail("Didn't throw!"); } catch (IOException ioe) { assertTrue(ioe.toString().contains("Injected")); @@ -940,7 +942,8 @@ public class TestWALSplit { try { conf.setInt("hbase.splitlog.report.period", 1000); boolean ret = WALSplitter.splitLogFile( - HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, wals); + HBASEDIR, wals.getWALProvider().createWALIdentity(logfile.toString()), spiedFs, conf, + localReporter, null, null, wals); assertFalse("Log splitting should failed", ret); assertTrue(count.get() > 0); } catch (IOException e) { @@ -998,7 +1001,7 @@ public class TestWALSplit { makeRegionDirs(regions); // Create a splitter that reads and writes the data without touching disk - WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, null, null) { + WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, null, null) { /* Produce a mock writer that doesn't write anywhere */ @Override @@ -1034,7 +1037,7 @@ public class TestWALSplit { /* Produce a mock reader that generates fake entries */ @Override - protected Reader getReader(Path curLogFile, CancelableProgressable reporter) + protected Reader getReader(WALIdentity curLogFile, CancelableProgressable reporter) throws IOException { Reader mockReader = Mockito.mock(Reader.class); Mockito.doAnswer(new Answer() { @@ -1059,7 +1062,7 @@ public class TestWALSplit { } }; - logSplitter.splitLogFile(fs.getFileStatus(logPath), null); + logSplitter.splitLogFile(wals.getWALProvider().createWALIdentity(logPath.toString()), null); // Verify number of written edits per region Map outputCounts = logSplitter.outputSink.getOutputCounts(); @@ -1147,11 +1150,11 @@ public class TestWALSplit { assertTrue("There should be some log file", logfiles != null && logfiles.length > 0); - WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) { + WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, null, null) { @Override protected Writer createWriter(Path logfile) throws IOException { - Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); + Writer writer = WALUtil.createRecoveredEditsWriter(conf, wals, logfile); // After creating writer, simulate region's // replayRecoveredEditsIfAny() which gets SplitEditFiles of this // region and delete them, excluding files with '.temp' suffix. @@ -1169,7 +1172,8 @@ public class TestWALSplit { } }; try{ - logSplitter.splitLogFile(logfiles[0], null); + logSplitter.splitLogFile(wals.getWALProvider().createWALIdentity(logfiles[0].toString()), + null); } catch (IOException e) { LOG.info(e.toString(), e); fail("Throws IOException when spliting " @@ -1210,7 +1214,7 @@ public class TestWALSplit { int seq = 0; int numRegionEventsAdded = 0; for (int i = 0; i < writers; i++) { - ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); + ws[i] = WALUtil.createWriter(conf, wals, new Path(WALDIR, WAL_FILE_PREFIX + i)); for (int j = 0; j < entries; j++) { int prefix = 0; for (String region : REGIONS) { @@ -1339,7 +1343,7 @@ public class TestWALSplit { private int countWAL(Path log) throws IOException { int count = 0; - Reader in = wals.createReader(fs, log); + Reader in = WALUtil.createReader(wals, log); while (in.next() != null) { count++; } @@ -1409,8 +1413,7 @@ public class TestWALSplit { } private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { - Writer writer = - WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); + Writer writer = WALUtil.createWriter(conf, wals, new Path(WALDIR, WAL_FILE_PREFIX + suffix)); if (closeFile) { writer.close(); } @@ -1418,8 +1421,8 @@ public class TestWALSplit { private boolean logsAreEqual(Path p1, Path p2) throws IOException { Reader in1, in2; - in1 = wals.createReader(fs, p1); - in2 = wals.createReader(fs, p2); + in1 = WALUtil.createReader(wals, p1); + in2 = WALUtil.createReader(wals, p2); Entry entry1; Entry entry2; while ((entry1 = in1.next()) != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index 861b289..9cb1ee2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -415,7 +415,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { */ private long verify(final WALFactory wals, final Path wal, final boolean verbose) throws IOException { - WAL.Reader reader = wals.createReader(wal.getFileSystem(getConf()), wal); + WAL.Reader reader = WALUtil.createReader(wals, wal); long count = 0; Map sequenceIds = new HashMap<>(); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java new file mode 100644 index 0000000..70fd66d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience; + +/* + * This is distributed FS oriented implementation for WALIdentity + */ +@InterfaceAudience.Private +public class FSWALIdentity implements WALIdentity{ + private String name; + private Path path; + + public FSWALIdentity(String name) { + this.path = new Path(name); + this.name = path.getName(); + } + + public FSWALIdentity(Path path) { + this.path = path; + if (path !=null) { + this.name = path.getName(); + } + } + + @Override + public String getName() { + return name; + } + + /** + * @return {@link Path} object of the name encapsulated in WALIdentity + */ + public Path getPath() { + return path; + } + + @Override + public int compareTo(WALIdentity o) { + FSWALIdentity that = (FSWALIdentity)o; + return this.path.compareTo(that.getPath()); + } + + @Override + public String toString() { + return this.path.toString(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof FSWALIdentity)) { + return false; + } + FSWALIdentity that = (FSWALIdentity) obj; + return this.path.equals(that.getPath()); + } + @Override + public int hashCode() { + return this.path.hashCode(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java new file mode 100644 index 0000000..fa7d2fa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * This interface defines the identification of WAL for both stream based and distributed FileSystem + * based environments. + * See {@link #getName()} method. + */ +@InterfaceAudience.Private +public interface WALIdentity extends Comparable { + + /** + * WALIdentity is uniquely identifying a WAL stored in this WALProvider. + * This name can be thought of as a human-readable, serialized form of the WALIdentity. + * + * The same value should be returned across calls to this method. + * + * @return name of the wal + */ + String getName(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java new file mode 100644 index 0000000..6707b8e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java @@ -0,0 +1,300 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.OptionalLong; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Streaming access to WAL entries. This class is given a queue of WAL {@link WALIdentity}, and continually + * iterates through all the WAL {@link Entry} in the queue. When it's done reading from an Entry, it + * dequeues and starts reading from the next Entry. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class AbstractWALEntryStream implements WALEntryStream { + private static final Logger LOG = LoggerFactory.getLogger(AbstractWALEntryStream.class); + + protected Reader reader; + protected WALIdentity currentPath; + // cache of next entry for hasNext() + protected Entry currentEntry; + // position for the current entry. As now we support peek, which means that the upper layer may + // choose to return before reading the current entry, so it is not safe to return the value below + // in getPosition. + protected long currentPositionOfEntry = 0; + // position after reading current entry + protected long currentPositionOfReader = 0; + protected final PriorityBlockingQueue logQueue; + protected final Configuration conf; + protected final WALFileLengthProvider WALFileLengthProvider; + // which region server the WALs belong to + protected final ServerName serverName; + protected final MetricsSource metrics; + + protected boolean eofAutoRecovery; + private WALProvider provider; + + /** + * Create an entry stream over the given queue at the given start position + * @param logQueue the queue of WAL paths + * @param conf {@link Configuration} to use to create {@link Reader} for this stream + * @param startPosition the position in the first WAL to start reading at + * @param serverName the server name which all WALs belong to + * @param metrics replication metrics + * @throws IOException + */ + public AbstractWALEntryStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider WALFileLengthProvider, ServerName serverName, + MetricsSource metrics, WALProvider provider) throws IOException { + this.logQueue = logQueue; + this.conf = conf; + this.currentPositionOfEntry = startPosition; + this.WALFileLengthProvider = WALFileLengthProvider; + this.serverName = serverName; + this.metrics = metrics; + this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); + this.provider = provider; + } + + @Override + public boolean hasNext() throws IOException { + if (currentEntry == null) { + try { + tryAdvanceEntry(); + } catch (IOException e) { + handleIOException(logQueue.peek(), e); + } + } + return currentEntry != null; + } + + @Override + public Entry peek() throws IOException { + return hasNext() ? currentEntry: null; + } + + @Override + public void seek(long pos) throws IOException { + reader.seek(pos); + } + + @Override + public Entry next(Entry reuse) throws IOException { + return next(); + } + + @Override + public Entry next() throws IOException { + Entry save = peek(); + currentPositionOfEntry = currentPositionOfReader; + currentEntry = null; + return save; + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + closeReader(); + } + + @Override + public WALIdentity getCurrentWALIdentity() { + return currentPath; + } + + @Override + public long getPosition() { + return currentPositionOfEntry; + } + + @Override + public void reset() throws IOException { + if (reader != null && currentPath != null) { + resetReader(); + } + } + + protected void setPosition(long position) { + currentPositionOfEntry = position; + } + + abstract void setCurrentPath(WALIdentity path); + + private void tryAdvanceEntry() throws IOException { + if (checkReader()) { + boolean beingWritten = readNextEntryAndRecordReaderPosition(); + if (currentEntry == null && !beingWritten) { + // no more entries in this log file, and the file is already closed, i.e, rolled + // Before dequeueing, we should always get one more attempt at reading. + // This is in case more entries came in after we opened the reader, and the log is rolled + // while we were reading. See HBASE-6758 + resetReader(); + readNextEntryAndRecordReaderPosition(); + if (currentEntry == null) { + if (checkAllBytesParsed()) { // now we're certain we're done with this log file + dequeueCurrentLog(); + if (openNextLog()) { + readNextEntryAndRecordReaderPosition(); + } + } + } + } + // if currentEntry != null then just return + // if currentEntry == null but the file is still being written, then we should not switch to + // the next log either, just return here and try next time to see if there are more entries in + // the current file + } + // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) + } + + + + private void dequeueCurrentLog() throws IOException { + LOG.debug("Reached the end of log {}", currentPath); + closeReader(); + logQueue.remove(); + setPosition(0); + metrics.decrSizeOfLogQueue(); + } + + /** + * Returns whether the file is opened for writing. + */ + private boolean readNextEntryAndRecordReaderPosition() throws IOException { + Entry readEntry = reader.next(); + long readerPos = reader.getPosition(); + OptionalLong fileLength = WALFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); + if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { + // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted + // data, so we need to make sure that we do not read beyond the committed file length. + if (LOG.isDebugEnabled()) { + LOG.debug("The provider tells us the valid length for " + currentPath + " is " + + fileLength.getAsLong() + ", but we have advanced to " + readerPos); + } + resetReader(); + return true; + } + if (readEntry != null) { + metrics.incrLogEditsRead(); + metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); + } + currentEntry = readEntry; // could be null + this.currentPositionOfReader = readerPos; + return fileLength.isPresent(); + } + + private void closeReader() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + } + + // if we don't have a reader, open a reader on the next log + private boolean checkReader() throws IOException { + if (reader == null) { + return openNextLog(); + } + return true; + } + + // open a reader on the next log in queue + abstract boolean openNextLog() throws IOException; + + protected void openReader(WALIdentity path) throws IOException { + try { + // Detect if this is a new file, if so get a new reader else + // reset the current reader so that we see the new data + if (reader == null || !currentPath.equals(path)) { + closeReader(); + reader = createReader(path, conf); + seek(); + setCurrentPath(path); + } else { + resetReader(); + } + } catch (RemoteException re) { + IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); + handleIOException (path, ioe); + } catch (IOException ioe) { + handleIOException(path, ioe); + } + } + + /** + * Creates a reader for a wal info + * + * @param WALIdentity walIdentity for FS based or stream name for stream based wal provider + * @param conf + * @return return a reader for the file + * @throws IOException + */ + protected Reader createReader(WALIdentity walId, Configuration conf) + throws IOException { + return provider.createReader(walId, null, false); + } + + protected void resetReader() throws IOException { + try { + currentEntry = null; + reader.reset(); + seek(); + } catch (NullPointerException npe) { + throw new IOException("NPE resetting reader, likely HDFS-4380", npe); + } catch (IOException e) { + handleIOException(currentPath, e); + } + } + + /** + * Implement for handling IO exceptions , throw back if doesn't need to be handled + * @param WALIdentity + * @param ioe IOException + * @throws IOException + */ + protected abstract void handleIOException(WALIdentity WALIdentity, IOException e) throws IOException; + + protected void seek() throws IOException { + if (currentPositionOfEntry != 0) { + reader.seek(currentPositionOfEntry); + } + } + + + protected boolean checkAllBytesParsed() throws IOException { + return true; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java new file mode 100644 index 0000000..70b3889 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALIdentity; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class FSRecoveredReplicationSource extends RecoveredReplicationSource { + + private static final Logger LOG = LoggerFactory.getLogger(FSRecoveredReplicationSource.class); + private String logDir; + + @Override + public void init(Configuration conf, ReplicationSourceManager manager, + ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, + String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + MetricsSource metrics, WALProvider walProvider) throws IOException { + super.init(conf, manager, queueStorage, replicationPeer, server, peerClusterZnode, clusterId, + walFileLengthProvider, metrics, walProvider); + this.logDir = AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString()); + } + + @Override + public void locateRecoveredWALIdentities(PriorityBlockingQueue queue) + throws IOException { + boolean hasWALIdentityChanged = false; + PriorityBlockingQueue newWALIdentities = + new PriorityBlockingQueue(queueSizePerGroup, + new LogsComparator(this.walProvider)); + FileSystem fs = CommonFSUtils.getWALFileSystem(conf); + WALIdentityLoop: + for (WALIdentity walIdentity : queue) { + FSWALIdentity fsWal = (FSWALIdentity) walIdentity; + if (fs.exists(fsWal.getPath())) { + // still in same location, don't need to do anything + newWALIdentities.add(walIdentity); + continue; + } + // WALIdentity changed - try to find the right WALIdentity. + hasWALIdentityChanged = true; + if (server instanceof ReplicationSyncUp.DummyServer) { + // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data + // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists + WALIdentity newWALIdentity = getReplSyncUpPath(fs, fsWal.getPath()); + newWALIdentities.add(newWALIdentity); + continue; + } else { + // See if Path exists in the dead RS folder (there could be a chain of failures + // to look at) + List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); + LOG.info("NB dead servers : " + deadRegionServers.size()); + final Path walDir = FSUtils.getWALRootDir(conf); + for (ServerName curDeadServerName : deadRegionServers) { + final Path deadRsDirectory = + new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName + .getServerName())); + Path[] locs = new Path[] { new Path(deadRsDirectory, walIdentity.getName()), new Path( + deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), walIdentity.getName()) }; + for (Path possibleLogLocation : locs) { + LOG.info("Possible location " + possibleLogLocation.toUri().toString()); + if (fs.exists(possibleLogLocation)) { + // We found the right new location + LOG.info("Log " + walIdentity + " still exists at " + possibleLogLocation); + newWALIdentities.add(new FSWALIdentity(possibleLogLocation)); + continue WALIdentityLoop; + } + } + } + // didn't find a new location + LOG.error(String.format("WAL Path %s doesn't exist and couldn't find its new location", + walIdentity)); + newWALIdentities.add(walIdentity); + } + } + + if (hasWALIdentityChanged) { + if (newWALIdentities.size() != queue.size()) { // this shouldn't happen + LOG.error("Recovery queue size is incorrect"); + throw new IOException("Recovery queue size error"); + } + // put the correct locations in the queue + // since this is a recovered queue with no new incoming logs, + // there shouldn't be any concurrency issues + queue.clear(); + for (WALIdentity WALIdentity : newWALIdentities) { + queue.add(WALIdentity); + } + } + } + + // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal + // area rather than to the wal area for a particular region server. + private WALIdentity getReplSyncUpPath(FileSystem fs, Path path) throws IOException { + FileStatus[] rss = fs.listStatus(manager.getLogDir()); + for (FileStatus rs : rss) { + Path p = rs.getPath(); + FileStatus[] logs = fs.listStatus(p); + for (FileStatus log : logs) { + p = new Path(p, log.getPath().getName()); + if (p.getName().equals(path.getName())) { + LOG.info("Log " + p.getName() + " found at " + p); + return this.walProvider.createWALIdentity(p.toString()); + } + } + } + LOG.error("Didn't find path for: " + path.getName()); + return this.walProvider.createWALIdentity(path.toString()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java new file mode 100644 index 0000000..4ef009d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java @@ -0,0 +1,253 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.wal.FSWALIdentity; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Streaming access to WAL entries. This class is given a queue of WAL {@link WALIdentity}, and continually + * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it + * dequeues it and starts reading from the next. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FSWALEntryStream extends AbstractWALEntryStream { + private static final Logger LOG = LoggerFactory.getLogger(FSWALEntryStream.class); + + private FileSystem fs; + + public FSWALEntryStream(FileSystem fs, PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics, WALProvider provider) throws IOException { + super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, provider); + this.fs = fs; + } + + @Override + // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file + protected boolean checkAllBytesParsed() throws IOException { + // -1 means the wal wasn't closed cleanly. + final long trailerSize = currentTrailerSize(); + FileStatus stat = null; + try { + stat = fs.getFileStatus(((FSWALIdentity)this.currentPath).getPath()); + } catch (IOException exception) { + LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", + currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat()); + metrics.incrUnknownFileLengthForClosedWAL(); + } + // Here we use currentPositionOfReader instead of currentPositionOfEntry. + // We only call this method when currentEntry is null so usually they are the same, but there + // are two exceptions. One is we have nothing in the file but only a header, in this way + // the currentPositionOfEntry will always be 0 since we have no change to update it. The other + // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the + // last valid entry, and the currentPositionOfReader will usually point to the end of the file. + if (stat != null) { + if (trailerSize < 0) { + if (currentPositionOfReader < stat.getLen()) { + final long skippedBytes = stat.getLen() - currentPositionOfReader; + LOG.debug( + "Reached the end of WAL file '{}'. It was not closed cleanly," + + " so we did not parse {} bytes of data. This is normally ok.", + currentPath, skippedBytes); + metrics.incrUncleanlyClosedWALs(); + metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); + } + } else if (currentPositionOfReader + trailerSize < stat.getLen()) { + LOG.warn( + "Processing end of WAL file '{}'. At position {}, which is too far away from" + + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", + currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); + setPosition(0); + resetReader(); + metrics.incrRestartedWALReading(); + metrics.incrRepeatedFileBytes(currentPositionOfReader); + return false; + } + } + if (LOG.isTraceEnabled()) { + LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + + (stat == null ? "N/A" : stat.getLen())); + } + metrics.incrCompletedWAL(); + return true; + } + + private Path getArchivedLog(Path path) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + + // Try found the log in old dir + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } + + // Try found the log in the seperate old log dir + oldLogDir = new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) + .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); + archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } + + LOG.error("Couldn't locate log: " + path); + return path; + } + + @Override + void setCurrentPath(WALIdentity path) { + this.currentPath = path; + } + + @Override + // open a reader on the next log in queue + boolean openNextLog() throws IOException { + WALIdentity nextPath = logQueue.peek(); + if (nextPath != null) { + openReader(nextPath); + if (reader != null) { + return true; + } + } else { + // no more files in queue, this could happen for recovered queue, or for a wal group of a sync + // replication peer which has already been transited to DA or S. + setCurrentPath(null); + } + return false; + } + + @Override + protected void openReader(WALIdentity walId) throws IOException { + try { + super.openReader(walId); + } catch (NullPointerException npe) { + // Workaround for race condition in HDFS-4380 + // which throws a NPE if we open a file before any data node has the most recent block + // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. + LOG.warn("Got NPE opening reader, will retry."); + reader = null; + } + } + + private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { + // If the log was archived, continue reading from there + Path archivedLog = getArchivedLog(path); + if (!path.equals(archivedLog)) { + openReader(new FSWALIdentity(archivedLog)); + } else { + throw fnfe; + } + } + + // For HBASE-15019 + private void recoverLease(final Configuration conf, final Path path) { + try { + final FileSystem dfs = FSUtils.getCurrentFileSystem(conf); + FSUtils fsUtils = FSUtils.getInstance(dfs, conf); + fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { + @Override + public boolean progress() { + LOG.debug("recover WAL lease: " + path); + return true; + } + }); + } catch (IOException e) { + LOG.warn("unable to recover lease for WAL: " + path, e); + } + } + + @Override + protected void handleIOException(WALIdentity path, IOException e) throws IOException { + try { + throw e; + } catch (FileNotFoundException fnfe) { + handleFileNotFound(((FSWALIdentity)path).getPath(), fnfe); + } catch (EOFException eo) { + handleEofException(eo); + } catch (LeaseNotRecoveredException lnre) { + // HBASE-15019 the WAL was not closed due to some hiccup. + LOG.warn("Try to recover the WAL lease " + currentPath, lnre); + recoverLease(conf, ((FSWALIdentity)currentPath).getPath()); + reader = null; + } + } + + private long currentTrailerSize() { + long size = -1L; + if (reader instanceof ProtobufLogReader) { + final ProtobufLogReader pblr = (ProtobufLogReader) reader; + size = pblr.trailerSize(); + } + return size; + } + + // if we get an EOF due to a zero-length log, and there are other logs in queue + // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is + // enabled, then dump the log + private void handleEofException(IOException e) { + if ((e instanceof EOFException || e.getCause() instanceof EOFException) && logQueue.size() > 1 + && this.eofAutoRecovery) { + try { + if (fs.getFileStatus(((FSWALIdentity)logQueue.peek()).getPath()).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); + logQueue.remove(); + setPosition(0); + } + } catch (IOException ioe) { + LOG.warn("Couldn't get file length information about log " + logQueue.peek()); + } + } + } + + private String getCurrentPathStat() { + StringBuilder sb = new StringBuilder(); + if (currentPath != null) { + sb.append("currently replicating from: ").append(currentPath).append(" at position: ") + .append(currentPositionOfEntry).append("\n"); + } else { + sb.append("no replication ongoing, waiting for new log"); + } + return sb.toString(); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALUtil.java new file mode 100644 index 0000000..7a54640 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALUtil.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class provides helper methods related to WALProvider. + */ +public final class WALUtil { + private static final Logger LOG = LoggerFactory.getLogger(WALUtil.class); + + public static WAL.Reader createReader(WALFactory walFactory, Path path) throws IOException { + WALProvider provider = walFactory.getWALProvider(); + return provider.createReader(provider.createWALIdentity(path.toString()), null, true); + } + + public static WAL.Reader createReader(Configuration conf, String factoryId, Path path) + throws IOException { + WALFactory factory = new WALFactory(conf, factoryId); + WALProvider provider = factory.getWALProvider(); + return provider.createReader(provider.createWALIdentity(path.toString()), null, true); + } + + public static WALProvider.Writer createRecoveredEditsWriter(Configuration conf, + WALFactory walFactory, Path path) throws IOException { + WALProvider provider = walFactory.getWALProvider(); + return provider.createWriter(conf, + provider.createWALIdentity(path.toString()), true); + } + public static WALProvider.Writer createWriter(Configuration conf, + WALFactory walFactory, Path path) throws IOException { + WALProvider provider = walFactory.getWALProvider(); + return provider.createWriter(conf, + provider.createWALIdentity(path.toString()), false); + } + public static WALProvider.Writer createWriter(Configuration conf, + String factoryId, Path path) throws IOException { + WALFactory walFactory = new WALFactory(conf, factoryId); + WALProvider provider = walFactory.getWALProvider(); + return provider.createWriter(conf, + provider.createWALIdentity(path.toString()), false); + } +}