getWALs() {
+ initProvider();
return provider.getWALs();
}
@@ -292,113 +307,11 @@ public class WALFactory {
region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) {
return getMetaProvider().getWAL(region);
} else {
+ initProvider();
return provider.getWAL(region);
}
}
- public Reader createReader(final FileSystem fs, final Path path) throws IOException {
- return createReader(fs, path, (CancelableProgressable)null);
- }
-
- /**
- * Create a reader for the WAL. If you are reading from a file that's being written to and need
- * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method
- * then just seek back to the last known good position.
- * @return A WAL reader. Close when done with it.
- * @throws IOException
- */
- public Reader createReader(final FileSystem fs, final Path path,
- CancelableProgressable reporter) throws IOException {
- return createReader(fs, path, reporter, true);
- }
-
- public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter,
- boolean allowCustom) throws IOException {
- Class extends AbstractFSWALProvider.Reader> lrClass =
- allowCustom ? logReaderClass : ProtobufLogReader.class;
- try {
- // A wal file could be under recovery, so it may take several
- // tries to get it open. Instead of claiming it is corrupted, retry
- // to open it up to 5 minutes by default.
- long startWaiting = EnvironmentEdgeManager.currentTime();
- long openTimeout = timeoutMillis + startWaiting;
- int nbAttempt = 0;
- AbstractFSWALProvider.Reader reader = null;
- while (true) {
- try {
- reader = lrClass.getDeclaredConstructor().newInstance();
- reader.init(fs, path, conf, null);
- return reader;
- } catch (IOException e) {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException exception) {
- LOG.warn("Could not close FSDataInputStream" + exception.getMessage());
- LOG.debug("exception details", exception);
- }
- }
-
- String msg = e.getMessage();
- if (msg != null
- && (msg.contains("Cannot obtain block length")
- || msg.contains("Could not obtain the last block") || msg
- .matches("Blocklist for [^ ]* has changed.*"))) {
- if (++nbAttempt == 1) {
- LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
- }
- if (reporter != null && !reporter.progress()) {
- throw new InterruptedIOException("Operation is cancelled");
- }
- if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
- LOG.error("Can't open after " + nbAttempt + " attempts and "
- + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
- } else {
- try {
- Thread.sleep(nbAttempt < 3 ? 500 : 1000);
- continue; // retry
- } catch (InterruptedException ie) {
- InterruptedIOException iioe = new InterruptedIOException();
- iioe.initCause(ie);
- throw iioe;
- }
- }
- throw new LeaseNotRecoveredException(e);
- } else {
- throw e;
- }
- }
- }
- } catch (IOException ie) {
- throw ie;
- } catch (Exception e) {
- throw new IOException("Cannot get log reader", e);
- }
- }
-
- /**
- * Create a writer for the WAL.
- * Uses defaults.
- *
- * Should be package-private. public only for tests and
- * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
- * @return A WAL writer. Close when done with it.
- */
- public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException {
- return FSHLogProvider.createWriter(conf, fs, path, false);
- }
-
- /**
- * Should be package-private, visible for recovery testing.
- * Uses defaults.
- * @return an overwritable writer for recovered edits. caller should close.
- */
- @VisibleForTesting
- public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path)
- throws IOException {
- return FSHLogProvider.createWriter(conf, fs, path, true);
- }
-
// These static methods are currently used where it's impractical to
// untangle the reliance on state in the filesystem. They rely on singleton
// WALFactory that just provides Reader / Writers.
@@ -410,7 +323,12 @@ public class WALFactory {
public static WALFactory getInstance(Configuration configuration) {
WALFactory factory = singleton.get();
if (null == factory) {
- WALFactory temp = new WALFactory(configuration);
+ WALFactory temp = null;
+ try {
+ temp = new WALFactory(configuration);
+ } catch (IOException ioe) {
+ throw new IllegalStateException(ioe);
+ }
if (singleton.compareAndSet(null, temp)) {
factory = temp;
} else {
@@ -426,60 +344,6 @@ public class WALFactory {
return factory;
}
- /**
- * Create a reader for the given path, accept custom reader classes from conf.
- * If you already have a WALFactory, you should favor the instance method.
- * @return a WAL Reader, caller must close.
- */
- public static Reader createReader(final FileSystem fs, final Path path,
- final Configuration configuration) throws IOException {
- return getInstance(configuration).createReader(fs, path);
- }
-
- /**
- * Create a reader for the given path, accept custom reader classes from conf.
- * If you already have a WALFactory, you should favor the instance method.
- * @return a WAL Reader, caller must close.
- */
- static Reader createReader(final FileSystem fs, final Path path,
- final Configuration configuration, final CancelableProgressable reporter) throws IOException {
- return getInstance(configuration).createReader(fs, path, reporter);
- }
-
- /**
- * Create a reader for the given path, ignore custom reader classes from conf.
- * If you already have a WALFactory, you should favor the instance method.
- * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor}
- * @return a WAL Reader, caller must close.
- */
- public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path,
- final Configuration configuration) throws IOException {
- return getInstance(configuration).createReader(fs, path, null, false);
- }
-
- /**
- * If you already have a WALFactory, you should favor the instance method.
- * Uses defaults.
- * @return a Writer that will overwrite files. Caller must close.
- */
- static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path,
- final Configuration configuration)
- throws IOException {
- return FSHLogProvider.createWriter(configuration, fs, path, true);
- }
-
- /**
- * If you already have a WALFactory, you should favor the instance method.
- * Uses defaults.
- * @return a writer that won't overwrite files. Caller must close.
- */
- @VisibleForTesting
- public static Writer createWALWriter(final FileSystem fs, final Path path,
- final Configuration configuration)
- throws IOException {
- return FSHLogProvider.createWriter(configuration, fs, path, false);
- }
-
public final WALProvider getWALProvider() {
return this.provider;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
index 281f3c9..9192160 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
@@ -259,7 +259,9 @@ public class WALPrettyPrinter {
throw new IOException(p + " is not a file");
}
- WAL.Reader log = WALFactory.createReader(fs, p, conf);
+ WALFactory factory = new WALFactory(conf, "pretty-printer");
+ WALProvider provider = factory.getWALProvider();
+ WAL.Reader log = provider.createReader(provider.createWALIdentity(p.toString()), null, true);
if (log instanceof ProtobufLogReader) {
List writerClsNames = ((ProtobufLogReader) log).getWriterClsNames();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index 244a636..3775000 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -23,10 +23,17 @@ import java.io.IOException;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.PriorityBlockingQueue;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -82,6 +89,11 @@ public interface WALProvider {
void sync(boolean forceSync) throws IOException;
void append(WAL.Entry entry) throws IOException;
+ /**
+ * @throws IOException if something goes wrong initializing an output stream
+ */
+ void init(WALIdentity path, Configuration c, boolean overwritable, long blocksize)
+ throws IOException;
}
interface AsyncWriter extends WriterBase {
@@ -113,4 +125,61 @@ public interface WALProvider {
return path -> getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path))
.filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
}
+
+ /**
+ * Streaming implementation to retrieve WAL entries from given set of Wals. This class is given a queue of WAL
+ * @param logQueue Queue of wals
+ * @param conf configuration
+ * @param startPosition start position for the first wal in the queue
+ * @param walFileSizeProvider
+ * @param serverName name of the server
+ * @param metrics metric source
+ * @return WALEntryStream instance
+ * @throws IOException
+ */
+ WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf,
+ long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
+ MetricsSource metrics) throws IOException;
+
+ /**
+ * Create a reader for the given WAL
+ * @param walId identity of the WAL
+ * @param reporter progress reporter
+ * @param allowCustom true if accepting custom reader classes from conf.
+ * @return a WAL Reader, caller must close.
+ */
+ Reader createReader(final WALIdentity walId, CancelableProgressable reporter,
+ boolean allowCustom) throws IOException;
+
+ /**
+ * Hook for performing recovery work needed for the given WAL.
+ * The implementation must prevent further appends to the WAL, identified by id,
+ * after this method returns. Otherwise there may be dataloss if some region server
+ * continues to write to the WAL.
+ *
+ * @param walId identity of the WAL
+ * @param conf Configuration instance
+ * @param reporter progress reporter
+ */
+ void preRecovery(WALIdentity id, Configuration conf, CancelableProgressable reporter)
+ throws IOException;
+
+ /**
+ * Creates WALIdentity for WAL path/name.
+ * The name should be uniquely identifying a WAL in this WALProvider.
+ *
+ * Note: this method is subject to change / removal upon future WAL refactoring
+ *
+ * @param wal the WAL
+ * @return WALIdentity instance for the WAL
+ */
+ WALIdentity createWALIdentity(String wal);
+
+ public Writer createWriter(final Configuration conf, final WALIdentity walId,
+ final boolean overwritable) throws IOException;
+
+ /*
+ * @return start time of the underlying WAL
+ */
+ long getWALStartTime(WALIdentity id);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index bc67d98..131948a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
@@ -140,7 +141,7 @@ public class WALSplitter {
protected Map> regionMaxSeqIdInStores = new ConcurrentHashMap<>();
// the file being split currently
- private FileStatus fileBeingSplit;
+ private WALIdentity fileBeingSplit;
// if we limit the number of writers opened for sinking recovered edits
private final boolean splitWriterCreationBounded;
@@ -150,14 +151,18 @@ public class WALSplitter {
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
- FileSystem walFS, LastSequenceId idChecker,
+ LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName = conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
this.walDir = walDir;
- this.walFS = walFS;
+ try {
+ this.walFS = CommonFSUtils.getWALFileSystem(conf);
+ } catch (IOException ioe) {
+ throw new IllegalArgumentException("unable to get FS for " + walDir, ioe);
+ }
this.sequenceIdChecker = idChecker;
this.splitLogWorkerCoordination = splitLogWorkerCoordination;
@@ -187,11 +192,11 @@ public class WALSplitter {
*
* @return false if it is interrupted by the progress-able.
*/
- public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
+ public static boolean splitLogFile(Path walDir, WALIdentity logfile, FileSystem walFS,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
throws IOException {
- WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker,
+ WALSplitter s = new WALSplitter(factory, conf, walDir, idChecker,
splitLogWorkerCoordination);
return s.splitLogFile(logfile, reporter);
}
@@ -206,10 +211,11 @@ public class WALSplitter {
final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
Collections.singletonList(logDir), null);
List splits = new ArrayList<>();
+ WALProvider provider = factory.getWALProvider();
if (ArrayUtils.isNotEmpty(logfiles)) {
for (FileStatus logfile: logfiles) {
- WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null);
- if (s.splitLogFile(logfile, null)) {
+ WALSplitter s = new WALSplitter(factory, conf, rootDir, null, null);
+ if (s.splitLogFile(provider.createWALIdentity(logfile.getPath().toString()), null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) {
splits.addAll(s.outputSink.splits);
@@ -228,27 +234,26 @@ public class WALSplitter {
* @param logfile should be an actual log file.
*/
@VisibleForTesting
- boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
+ boolean splitLogFile(WALIdentity logfile, CancelableProgressable reporter) throws IOException {
Preconditions.checkState(status == null);
- Preconditions.checkArgument(logfile.isFile(),
- "passed in file status is for something other than a regular file.");
boolean isCorrupted = false;
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
SPLIT_SKIP_ERRORS_DEFAULT);
int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
- Path logPath = logfile.getPath();
+ String logPath = logfile.getName();
boolean outputSinkStarted = false;
boolean progress_failed = false;
int editsCount = 0;
int editsSkipped = 0;
status = TaskMonitor.get().createStatus(
- "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
+ "Splitting log file " + logPath + "into a temporary staging area.");
Reader logFileReader = null;
this.fileBeingSplit = logfile;
try {
- long logLength = logfile.getLen();
- LOG.info("Splitting WAL={}, length={}", logPath, logLength);
+ // long logLength = logfile.getLen();
+ LOG.info("Splitting WAL={}, length={}", logPath //, logLength
+ );
status.setStatus("Opening log file");
if (reporter != null && !reporter.progress()) {
progress_failed = true;
@@ -266,7 +271,7 @@ public class WALSplitter {
outputSinkStarted = true;
Entry entry;
Long lastFlushedSequenceId = -1L;
- while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) {
+ while ((entry = getNextLogLine(logFileReader, logfile, skipErrors)) != null) {
byte[] region = entry.getKey().getEncodedRegionName();
String encodedRegionNameAsStr = Bytes.toString(region);
lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
@@ -323,10 +328,10 @@ public class WALSplitter {
LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
if (splitLogWorkerCoordination != null) {
// Some tests pass in a csm of null.
- splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS);
+ splitLogWorkerCoordination.markCorrupted(walDir, logPath, walFS);
} else {
// for tests only
- ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS);
+ ZKSplitLog.markCorrupted(walDir, logPath, walFS);
}
isCorrupted = true;
} catch (IOException e) {
@@ -352,7 +357,7 @@ public class WALSplitter {
String msg =
"Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
+ " regions; edits skipped=" + editsSkipped + "; log file=" + logPath +
- ", length=" + logfile.getLen() + // See if length got updated post lease recovery
+ //", length=" + logfile.getLen() + // See if length got updated post lease recovery
", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
LOG.info(msg);
status.markComplete(msg);
@@ -715,39 +720,37 @@ public class WALSplitter {
* @throws IOException
* @throws CorruptedLogFileException
*/
- protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
+ protected Reader getReader(WALIdentity file, boolean skipErrors, CancelableProgressable reporter)
throws IOException, CorruptedLogFileException {
- Path path = file.getPath();
- long length = file.getLen();
Reader in;
// Check for possibly empty file. With appends, currently Hadoop reports a
// zero length even if the file has been sync'd. Revisit if HDFS-376 or
// HDFS-878 is committed.
+ /* long length = file.getLen();
if (length <= 0) {
LOG.warn("File {} might be still open, length is 0", path);
- }
+ } */
try {
- FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter);
try {
- in = getReader(path, reporter);
+ in = getReader(file, reporter);
} catch (EOFException e) {
- if (length <= 0) {
+ /* if (length <= 0) {
// TODO should we ignore an empty, not-last log file if skip.errors
// is false? Either way, the caller should decide what to do. E.g.
// ignore if this is the last log in sequence.
// TODO is this scenario still possible if the log has been
// recovered (i.e. closed)
LOG.warn("Could not open {} for reading. File is empty", path, e);
- }
+ } */
// EOFException being ignored
return null;
}
} catch (IOException e) {
if (e instanceof FileNotFoundException) {
// A wal file may not exist anymore. Nothing can be recovered so move on
- LOG.warn("File {} does not exist anymore", path, e);
+ LOG.warn("File {} does not exist anymore", file, e);
return null;
}
if (!skipErrors || e instanceof InterruptedIOException) {
@@ -755,14 +758,14 @@ public class WALSplitter {
}
CorruptedLogFileException t =
new CorruptedLogFileException("skipErrors=true Could not open wal " +
- path + " ignoring");
+ file + " ignoring");
t.initCause(e);
throw t;
}
return in;
}
- static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
+ static private Entry getNextLogLine(Reader in, WALIdentity path, boolean skipErrors)
throws CorruptedLogFileException, IOException {
try {
return in.next();
@@ -796,15 +799,19 @@ public class WALSplitter {
*/
protected Writer createWriter(Path logfile)
throws IOException {
- return walFactory.createRecoveredEditsWriter(walFS, logfile);
+ WALProvider provider = walFactory.getWALProvider();
+ return provider.createWriter(conf, provider.createWALIdentity(logfile.toString()), true);
}
/**
* Create a new {@link Reader} for reading logs to split.
* @return new Reader instance, caller should close
*/
- protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
- return walFactory.createReader(walFS, curLogFile, reporter);
+ protected Reader getReader(WALIdentity curLogFile, CancelableProgressable reporter)
+ throws IOException {
+ WALProvider provider = walFactory.getWALProvider();
+ provider.preRecovery(curLogFile, conf, reporter);
+ return provider.createReader(curLogFile, reporter, true);
}
/**
@@ -1282,7 +1289,9 @@ public class WALSplitter {
private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst)
throws IOException {
long dstMinLogSeqNum = -1L;
- try (WAL.Reader reader = walFactory.createReader(walFS, dst)) {
+ WALProvider provider = walFactory.getWALProvider();
+ try (WAL.Reader reader = provider.createReader(provider.createWALIdentity(dst.toString()),
+ null, true)) {
WAL.Entry entry = reader.next();
if (entry != null) {
dstMinLogSeqNum = entry.getKey().getSequenceId();
@@ -1508,7 +1517,7 @@ public class WALSplitter {
String tmpDirName = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
Path regionedits = getRegionSplitEditsPath(entry,
- fileBeingSplit.getPath().getName(), tmpDirName, conf);
+ fileBeingSplit.getName(), tmpDirName, conf);
if (regionedits == null) {
return null;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java
index e657d9c..46d6dd6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -93,8 +94,8 @@ public class TestSequenceIdMonotonicallyIncreasing {
private long getMaxSeqId(HRegionServer rs, RegionInfo region) throws IOException {
Path walFile = ((AbstractFSWAL>) rs.getWAL(null)).getCurrentFileName();
long maxSeqId = -1L;
- try (WAL.Reader reader =
- WALFactory.createReader(UTIL.getTestFileSystem(), walFile, UTIL.getConfiguration())) {
+ try (WAL.Reader reader = WALUtil.createReader(UTIL.getConfiguration(), "seq-increasing",
+ walFile)) {
for (;;) {
WAL.Entry entry = reader.next();
if (entry == null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
index 1da31da..1e21db5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java
@@ -24,13 +24,13 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,13 +148,13 @@ public class SampleRegionWALCoprocessor implements WALCoprocessor, RegionCoproce
@Override
public void preWALRoll(ObserverContext extends WALCoprocessorEnvironment> ctx,
- Path oldPath, Path newPath) throws IOException {
+ WALIdentity oldPath, WALIdentity newPath) throws IOException {
preWALRollCalled = true;
}
@Override
public void postWALRoll(ObserverContext extends WALCoprocessorEnvironment> ctx,
- Path oldPath, Path newPath) throws IOException {
+ WALIdentity oldPath, WALIdentity newPath) throws IOException {
postWALRollCalled = true;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java
index ad2b2d4..3f274b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -155,7 +156,8 @@ public class TestBlockReorderMultiBlocks {
// listen for successful log rolls
final WALActionsListener listener = new WALActionsListener() {
@Override
- public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+ public void postLogRoll(final WALIdentity oldPath, final WALIdentity newPath)
+ throws IOException {
latch.countDown();
}
};
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index 19d845c..6f9eca4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -152,7 +152,7 @@ public class TestCacheOnWrite {
this.cacheCompressedData = cacheCompressedData;
this.blockCache = blockCache;
testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
- ", cacheCompressedData=" + cacheCompressedData + "]";
+ ", cacheCompressedData=" + cacheCompressedData + ", blockCache=" + blockCache + "]";
LOG.info(testDescription);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
index a11064d..e52aec3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
@@ -80,7 +80,9 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
@@ -639,7 +641,7 @@ public abstract class AbstractTestDLS {
private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException {
int count = 0;
- try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) {
+ try (WAL.Reader in = WALUtil.createReader(conf, "count-wal", log)) {
WAL.Entry e;
while ((e = in.next()) != null) {
if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
index 1490653..0f953f3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.junit.After;
import org.junit.Before;
@@ -125,6 +126,11 @@ public class TestFailedAppendAndSync {
final Writer w = super.createWriterInstance(path);
return new Writer() {
@Override
+ public void init(WALIdentity path, Configuration c, boolean overwritable, long blocksize)
+ throws IOException {
+ }
+
+ @Override
public void close() throws IOException {
w.close();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index b2d9a1b..4de326e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -140,7 +140,6 @@ import org.apache.hadoop.hbase.regionserver.TestHStore.FaultyFileSystem;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
-import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationObserver;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
@@ -158,10 +157,12 @@ import org.apache.hadoop.hbase.wal.FaultyFSLog;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -689,7 +690,7 @@ public class TestHRegion {
for (long i = minSeqId; i <= maxSeqId; i += 10) {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
fs.create(recoveredEdits);
- WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
+ WALProvider.Writer writer = WALUtil.createRecoveredEditsWriter(CONF, wals, recoveredEdits);
long time = System.nanoTime();
WALEdit edit = new WALEdit();
@@ -740,7 +741,8 @@ public class TestHRegion {
for (long i = minSeqId; i <= maxSeqId; i += 10) {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
fs.create(recoveredEdits);
- WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
+ WALProvider.Writer writer = WALUtil.createRecoveredEditsWriter(CONF,
+ wals, recoveredEdits);
long time = System.nanoTime();
WALEdit edit = new WALEdit();
@@ -826,7 +828,8 @@ public class TestHRegion {
for (long i = minSeqId; i <= maxSeqId; i += 10) {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
fs.create(recoveredEdits);
- WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
+ WALProvider.Writer writer = WALUtil.createRecoveredEditsWriter(CONF,
+ wals, recoveredEdits);
long time = System.nanoTime();
WALEdit edit = null;
@@ -931,14 +934,16 @@ public class TestHRegion {
storeFiles, Lists.newArrayList(newFile),
region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
- WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
+ org.apache.hadoop.hbase.regionserver.wal.WALUtil.writeCompactionMarker(
+ region.getWAL(), this.region.getReplicationScope(),
this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
fs.create(recoveredEdits);
- WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
+ WALProvider.Writer writer = WALUtil.createRecoveredEditsWriter(CONF,
+ wals, recoveredEdits);
long time = System.nanoTime();
@@ -1018,8 +1023,7 @@ public class TestHRegion {
// now verify that the flush markers are written
wal.shutdown();
- WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal),
- TEST_UTIL.getConfiguration());
+ WAL.Reader reader = WALUtil.createReader(wals, AbstractFSWALProvider.getCurrentFileName(wal));
try {
List flushDescriptors = new ArrayList<>();
long lastFlushSeqId = -1;
@@ -1063,7 +1067,8 @@ public class TestHRegion {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000));
fs.create(recoveredEdits);
- WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits);
+ WALProvider.Writer writer = WALUtil.createRecoveredEditsWriter(CONF,
+ wals, recoveredEdits);
for (WAL.Entry entry : flushDescriptors) {
writer.append(entry);
@@ -1162,6 +1167,11 @@ public class TestHRegion {
}
@Override
+ public void init(WALIdentity path, Configuration c, boolean overwritable, long blocksize)
+ throws IOException {
+ }
+
+ @Override
public void sync(boolean forceSync) throws IOException {
w.sync(forceSync);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 308dc03..d85112f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
import org.junit.Before;
@@ -307,9 +308,7 @@ public class TestHRegionReplayEvents {
}
WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException {
- return WALFactory.createReader(TEST_UTIL.getTestFileSystem(),
- AbstractFSWALProvider.getCurrentFileName(walPrimary),
- TEST_UTIL.getConfiguration());
+ return WALUtil.createReader(wals, AbstractFSWALProvider.getCurrentFileName(walPrimary));
}
@Test
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
index 34f6ca1..034026c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
@@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@@ -172,7 +172,7 @@ public class TestRecoveredEdits {
// Based on HRegion#replayRecoveredEdits
WAL.Reader reader = null;
try {
- reader = WALFactory.createReader(fs, edits, conf);
+ reader = WALUtil.createReader(conf, "replay", edits);
WAL.Entry entry;
while ((entry = reader.next()) != null) {
WALKey key = entry.getKey();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
index 7aeff84..dd5e5a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -155,8 +156,7 @@ public class TestRecoveredEditsReplayAndAbort {
String.format("%019d", i));
LOG.info("Begin to write recovered.edits : " + recoveredEdits);
fs.create(recoveredEdits);
- WALProvider.Writer writer = wals
- .createRecoveredEditsWriter(fs, recoveredEdits);
+ WALProvider.Writer writer = WALUtil.createRecoveredEditsWriter(CONF, wals, recoveredEdits);
for (long j = i; j < i + 100; j++) {
long time = System.nanoTime();
WALEdit edit = new WALEdit();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index 0c38ee3..5926102 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -260,7 +261,8 @@ public class TestSplitLogWorker {
CreateMode.PERSISTENT);
SplitLogWorker slw =
- new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
+ new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS,
+ new WALFactory(TEST_UTIL.getConfiguration(), "acquire-task"), neverEndingTask);
slw.start();
try {
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
@@ -295,10 +297,13 @@ public class TestSplitLogWorker {
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
RegionServerServices mockedRS1 = getRegionServer(SVR1);
RegionServerServices mockedRS2 = getRegionServer(SVR2);
+ WALFactory factory = new WALFactory(TEST_UTIL.getConfiguration(), "race-for-task");
SplitLogWorker slw1 =
- new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1, neverEndingTask);
+ new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS1,
+ factory, neverEndingTask);
SplitLogWorker slw2 =
- new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2, neverEndingTask);
+ new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS2,
+ factory, neverEndingTask);
slw1.start();
slw2.start();
try {
@@ -325,7 +330,8 @@ public class TestSplitLogWorker {
final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
RegionServerServices mockedRS = getRegionServer(SRV);
SplitLogWorker slw =
- new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
+ new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS,
+ new WALFactory(TEST_UTIL.getConfiguration(), "preempt-task"), neverEndingTask);
slw.start();
try {
Thread.yield(); // let the worker start
@@ -358,7 +364,8 @@ public class TestSplitLogWorker {
final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
RegionServerServices mockedRS = getRegionServer(SRV);
SplitLogWorker slw =
- new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
+ new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS,
+ new WALFactory(TEST_UTIL.getConfiguration(), "multiple-tasks"), neverEndingTask);
slw.start();
try {
Thread.yield(); // let the worker start
@@ -400,7 +407,8 @@ public class TestSplitLogWorker {
SplitLogCounters.resetCounters();
final ServerName SRV = ServerName.valueOf("svr,1,1");
RegionServerServices mockedRS = getRegionServer(SRV);
- slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
+ slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS,
+ new WALFactory(TEST_UTIL.getConfiguration(), "rescan"), neverEndingTask);
slw.start();
Thread.yield(); // let the worker start
Thread.sleep(100);
@@ -463,7 +471,8 @@ public class TestSplitLogWorker {
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
- SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
+ SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS,
+ new WALFactory(TEST_UTIL.getConfiguration(), "acquire-multi-task"), neverEndingTask);
slw.start();
try {
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, maxTasks, WAIT_TIME);
@@ -506,7 +515,8 @@ public class TestSplitLogWorker {
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
- SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask);
+ SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS,
+ new WALFactory(TEST_UTIL.getConfiguration(), "avg-tasks"), neverEndingTask);
slw.start();
try {
int acquiredTasks = 0;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
index 84b8d6c..9054e30 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
@@ -170,6 +171,10 @@ public class TestWALLockup {
public void close() throws IOException {
w.close();
}
+ @Override
+ public void init(WALIdentity path, Configuration c, boolean overwritable, long blocksize)
+ throws IOException {
+ }
@Override
public void sync(boolean forceSync) throws IOException {
@@ -361,6 +366,10 @@ public class TestWALLockup {
public void close() throws IOException {
w.close();
}
+ @Override
+ public void init(WALIdentity path, Configuration c, boolean overwritable, long blocksize)
+ throws IOException {
+ }
@Override
public void sync(boolean forceSync) throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
index 599260b..ced0547 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -47,6 +46,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -85,7 +85,6 @@ public class TestWALMonotonicallyIncreasingSeqId {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Path testDir = TEST_UTIL.getDataTestDir("TestWALMonotonicallyIncreasingSeqId");
private WALFactory wals;
- private FileSystem fileSystem;
private Configuration walConf;
private HRegion region;
@@ -117,7 +116,6 @@ public class TestWALMonotonicallyIncreasingSeqId {
RegionInfo info = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(startKey)
.setEndKey(stopKey).setReplicaId(replicaId).setRegionId(0).build();
- fileSystem = tableDir.getFileSystem(conf);
final Configuration walConf = new Configuration(conf);
FSUtils.setRootDir(walConf, tableDir);
this.walConf = walConf;
@@ -203,9 +201,9 @@ public class TestWALMonotonicallyIncreasingSeqId {
private WAL.Reader createReader(Path logPath, Path oldWalsDir) throws IOException {
try {
- return wals.createReader(fileSystem, logPath);
+ return WALUtil.createReader(wals, logPath);
} catch (IOException e) {
- return wals.createReader(fileSystem, new Path(oldWalsDir, logPath.getName()));
+ return WALUtil.createReader(wals, new Path(oldWalsDir, logPath.getName()));
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java
index 9322c5e..fd9254d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertFalse;
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
@@ -30,6 +29,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -130,10 +130,10 @@ public abstract class AbstractTestLogRollPeriod {
private void checkMinLogRolls(final WAL log, final int minRolls)
throws Exception {
- final List paths = new ArrayList<>();
+ final List paths = new ArrayList<>();
log.registerWALActionsListener(new WALActionsListener() {
@Override
- public void postLogRoll(Path oldFile, Path newFile) {
+ public void postLogRoll(WALIdentity oldFile, WALIdentity newFile) {
LOG.debug("postLogRoll: oldFile="+oldFile+" newFile="+newFile);
paths.add(newFile);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
index 5098609..100c7b8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -118,7 +119,7 @@ public abstract class AbstractTestProtobufLog {
try (WALProvider.Writer writer = createWriter(path)) {
ProtobufLogTestHelper.doWrite(writer, withTrailer, tableName, columnCount, recordCount, row,
timestamp);
- try (ProtobufLogReader reader = (ProtobufLogReader) wals.createReader(fs, path)) {
+ try (ProtobufLogReader reader = (ProtobufLogReader) WALUtil.createReader(wals, path)) {
ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row,
timestamp);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index 3f9040b..3a1c170 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -899,8 +899,8 @@ public abstract class AbstractTestWALReplay {
FileStatus[] listStatus = wal.getFiles();
assertNotNull(listStatus);
assertTrue(listStatus.length > 0);
- WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
- this.fs, this.conf, null, null, null, wals);
+ WALSplitter.splitLogFile(hbaseRootDir, wals.getWALProvider().createWALIdentity(
+ listStatus[0].getPath().toString()), this.fs, this.conf, null, null, null, wals);
FileStatus[] listStatus1 = this.fs.listStatus(
new Path(FSUtils.getWALTableDir(conf, tableName), new Path(hri.getEncodedName(),
"recovered.edits")), new PathFilter() {
@@ -1053,8 +1053,10 @@ public abstract class AbstractTestWALReplay {
first = fs.getFileStatus(smallFile);
second = fs.getFileStatus(largeFile);
}
- WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals);
- WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals);
+ WALSplitter.splitLogFile(hbaseRootDir, wals.getWALProvider().createWALIdentity(
+ first.getPath().toString()), fs, conf, null, null, null, wals);
+ WALSplitter.splitLogFile(hbaseRootDir, wals.getWALProvider().createWALIdentity(
+ second.getPath().toString()), fs, conf, null, null, null, wals);
WAL wal = createWAL(this.conf, hbaseRootDir, logName);
region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());
@@ -1229,7 +1231,8 @@ public abstract class AbstractTestWALReplay {
StreamLacksCapabilityException {
fs.mkdirs(file.getParent());
ProtobufLogWriter writer = new ProtobufLogWriter();
- writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file));
+ writer.init(wals.getWALProvider().createWALIdentity(file.toString()), conf, true,
+ WALUtil.getWALBlockSize(conf, fs, file));
for (FSWALEntry entry : entries) {
writer.append(entry);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
index f73b4f1..a45eabe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider.AsyncWriter;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -112,11 +113,11 @@ public class TestCombinedAsyncWriter {
CombinedAsyncWriter writer = CombinedAsyncWriter.create(writer1, writer2)) {
ProtobufLogTestHelper.doWrite(new WriterOverAsyncWriter(writer), withTrailer, tableName,
columnCount, recordCount, row, timestamp);
- try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path1)) {
+ try (ProtobufLogReader reader = (ProtobufLogReader) WALUtil.createReader(WALS, path1)) {
ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row,
timestamp);
}
- try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path2)) {
+ try (ProtobufLogReader reader = (ProtobufLogReader) WALUtil.createReader(WALS, path2)) {
ProtobufLogTestHelper.doRead(reader, withTrailer, tableName, columnCount, recordCount, row,
timestamp);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
index 4effa6d..c7af0fc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
@@ -277,7 +278,7 @@ public class TestDurability {
private void verifyWALCount(WALFactory wals, WAL log, int expected) throws Exception {
Path walPath = AbstractFSWALProvider.getCurrentFileName(log);
- WAL.Reader reader = wals.createReader(FS, walPath);
+ WAL.Reader reader = WALUtil.createReader(wals, walPath);
int count = 0;
WAL.Entry entry = new WAL.Entry();
while (reader.next(entry) != null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
index e19361e..31c6773 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
@@ -30,7 +30,6 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
@@ -52,8 +51,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.BeforeClass;
@@ -251,20 +253,20 @@ public class TestLogRolling extends AbstractTestLogRolling {
server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo();
final WAL log = server.getWAL(region);
- final List paths = new ArrayList<>(1);
+ final List paths = new ArrayList<>(1);
final List preLogRolledCalled = new ArrayList<>();
- paths.add(AbstractFSWALProvider.getCurrentFileName(log));
+ paths.add(new FSWALIdentity(AbstractFSWALProvider.getCurrentFileName(log)));
log.registerWALActionsListener(new WALActionsListener() {
@Override
- public void preLogRoll(Path oldFile, Path newFile) {
+ public void preLogRoll(WALIdentity oldFile, WALIdentity newFile) {
LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile);
preLogRolledCalled.add(new Integer(1));
}
@Override
- public void postLogRoll(Path oldFile, Path newFile) {
+ public void postLogRoll(WALIdentity oldFile, WALIdentity newFile) {
paths.add(newFile);
}
});
@@ -315,15 +317,16 @@ public class TestLogRolling extends AbstractTestLogRolling {
// read back the data written
Set loggedRows = new HashSet<>();
FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration());
- for (Path p : paths) {
+ for (WALIdentity wi : paths) {
+ FSWALIdentity p = (FSWALIdentity)wi;
LOG.debug("recovering lease for " + p);
- fsUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, TEST_UTIL.getConfiguration(),
- null);
+ fsUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p.getPath(),
+ TEST_UTIL.getConfiguration(), null);
- LOG.debug("Reading WAL " + FSUtils.getPath(p));
+ LOG.debug("Reading WAL " + FSUtils.getPath(p.getPath()));
WAL.Reader reader = null;
try {
- reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration());
+ reader = WALUtil.createReader(TEST_UTIL.getConfiguration(), "rolling", p.getPath());
WAL.Entry entry;
while ((entry = reader.next()) != null) {
LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells());
@@ -333,7 +336,7 @@ public class TestLogRolling extends AbstractTestLogRolling {
}
}
} catch (EOFException e) {
- LOG.debug("EOF reading file " + FSUtils.getPath(p));
+ LOG.debug("EOF reading file " + FSUtils.getPath(p.getPath()));
} finally {
if (reader != null) reader.close();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
index 819df67..d6aba71 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java
@@ -114,7 +114,7 @@ public class TestLogRollingNoCluster {
wals.close();
}
for (int i = 0; i < numThreads; i++) {
- assertFalse(appenders[i].isException());
+ assertFalse("got " + appenders[i].getException(), appenders[i].isException());
}
TEST_UTIL.shutdownMiniDFSCluster();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java
index d429a01..a3bb6d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java
@@ -22,8 +22,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@@ -36,6 +36,6 @@ public class TestProtobufLog extends AbstractTestProtobufLog {
@Override
protected Writer createWriter(Path path) throws IOException {
- return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false);
+ return WALUtil.createWriter(TEST_UTIL.getConfiguration(), "protobuf-log", path);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
index 0967a75..d140a5c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.After;
import org.junit.Before;
@@ -142,12 +143,12 @@ public class TestWALActionsListener {
public int closedCount = 0;
@Override
- public void preLogRoll(Path oldFile, Path newFile) {
+ public void preLogRoll(WALIdentity oldFile, WALIdentity newFile) {
preLogRollCounter++;
}
@Override
- public void postLogRoll(Path oldFile, Path newFile) {
+ public void postLogRoll(WALIdentity oldFile, WALIdentity newFile) {
postLogRollCounter++;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java
index 9d938b0..bf1aa0a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/WriterOverAsyncWriter.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
@@ -35,6 +37,11 @@ class WriterOverAsyncWriter implements WALProvider.Writer {
}
@Override
+ public void init(WALIdentity path, Configuration c, boolean overwritable, long blocksize)
+ throws IOException {
+ }
+
+ @Override
public void close() throws IOException {
asyncWriter.close();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 67f793d..5c80297 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -22,7 +22,6 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@@ -32,7 +31,9 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterfa
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALIdentity;
/**
* Source that does nothing at all, helpful to test ReplicationSourceManager
@@ -42,31 +43,33 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
private ReplicationSourceManager manager;
private ReplicationPeer replicationPeer;
private String peerClusterId;
- private Path currentPath;
+ private WALIdentity currentPath;
private MetricsSource metrics;
private WALFileLengthProvider walFileLengthProvider;
private AtomicBoolean startup = new AtomicBoolean(false);
+ private WALProvider walProvider;
@Override
- public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+ public void init(Configuration conf, ReplicationSourceManager manager,
ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
- UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
- throws IOException {
+ UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics,
+ WALProvider walProvider) throws IOException {
this.manager = manager;
this.peerClusterId = peerClusterId;
this.metrics = metrics;
this.walFileLengthProvider = walFileLengthProvider;
this.replicationPeer = rp;
+ this.walProvider = walProvider;
}
@Override
- public void enqueueLog(Path log) {
+ public void enqueueLog(WALIdentity log) {
this.currentPath = log;
metrics.incrSizeOfLogQueue();
}
@Override
- public Path getCurrentPath() {
+ public WALIdentity getCurrentWALIdentity() {
return this.currentPath;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
index 1b98518..05fea44 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -69,6 +70,7 @@ public class SerialReplicationTestBase {
public final TestName name = new TestName();
protected Path logPath;
+ protected static WALFactory factory;
public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint {
@@ -126,6 +128,7 @@ public class SerialReplicationTestBase {
LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
FS = UTIL.getTestFileSystem();
FS.mkdirs(LOG_DIR);
+ factory = new WALFactory(UTIL.getConfiguration(), "repl-base");
}
@AfterClass
@@ -184,7 +187,7 @@ public class SerialReplicationTestBase {
protected final void setupWALWriter() throws IOException {
logPath = new Path(LOG_DIR, name.getMethodName());
- WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
+ WRITER = WALUtil.createWriter(UTIL.getConfiguration(), factory, logPath);
}
protected final void waitUntilReplicationDone(int expectedEntries) throws Exception {
@@ -192,7 +195,7 @@ public class SerialReplicationTestBase {
@Override
public boolean evaluate() throws Exception {
- try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
+ try (WAL.Reader reader = WALUtil.createReader(factory, logPath)) {
int count = 0;
while (reader.next() != null) {
count++;
@@ -224,8 +227,7 @@ public class SerialReplicationTestBase {
}
protected final void checkOrder(int expectedEntries) throws IOException {
- try (WAL.Reader reader =
- WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+ try (WAL.Reader reader = WALUtil.createReader(factory, logPath)) {
long seqId = -1L;
int count = 0;
for (Entry entry;;) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 37ca7dc..400f981 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -695,7 +696,8 @@ public class TestMasterReplication {
// listen for successful log rolls
final WALActionsListener listener = new WALActionsListener() {
@Override
- public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+ public void postLogRoll(final WALIdentity oldPath, final WALIdentity newPath)
+ throws IOException {
latch.countDown();
}
};
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
index 225ca7f..5c2a763 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
@@ -26,7 +26,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.Admin;
@@ -42,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.BeforeClass;
@@ -219,7 +219,8 @@ public class TestMultiSlaveReplication {
// listen for successful log rolls
final WALActionsListener listener = new WALActionsListener() {
@Override
- public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+ public void postLogRoll(final WALIdentity oldPath, final WALIdentity newPath)
+ throws IOException {
latch.countDown();
}
};
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
index 4effe41..4e1616a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
@@ -33,6 +33,9 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -58,18 +61,21 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
Waiter.waitFor(conf1, 10000, new Waiter.Predicate() {
@Override
public boolean evaluate() throws Exception {
+ WALFactory factory = new WALFactory(conf1, "empty-wal-recovery");
+ WALProvider provider = factory.getWALProvider();
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
RegionInfo regionInfo =
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
Path currentFile = ((AbstractFSWAL>) wal).getCurrentFileName();
+ WALIdentity walId = provider.createWALIdentity(currentFile.toString());
Replication replicationService = (Replication) utility1.getHBaseCluster()
.getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
.getSources()) {
ReplicationSource source = (ReplicationSource) rsi;
- if (!currentFile.equals(source.getCurrentPath())) {
+ if (!walId.equals(source.getCurrentWALIdentity())) {
return false;
}
}
@@ -97,6 +103,8 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
emptyWalPaths.add(emptyWalPath);
}
+ WALFactory factory = new WALFactory(conf1, "empty-wal-recovery");
+ WALProvider provider = factory.getWALProvider();
// inject our empty wal into the replication queue, and then roll the original wal, which
// enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
// determine if the file being replicated currently is still opened for write, so just inject a
@@ -104,8 +112,10 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
Replication replicationService = (Replication) hrs.getReplicationSourceService();
- replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i));
- replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i));
+ WALIdentity id = provider.createWALIdentity(
+ emptyWalPaths.get(i).toString());
+ replicationService.getReplicationManager().preLogRoll(id);
+ replicationService.getReplicationManager().postLogRoll(id);
RegionInfo regionInfo =
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
index 8ff4d84..8d229aa 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication;
import java.util.Map;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Get;
@@ -28,6 +27,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
@@ -74,7 +74,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase {
}
rs = utility1.getRSForFirstRegionInTable(tableName);
metrics = rs.getWalGroupsReplicationStatus();
- Path lastPath = null;
+ WALIdentity lastPath = null;
for (Map.Entry metric : metrics.entrySet()) {
lastPath = metric.getValue().getCurrentPath();
Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
index 07e626b..a5e36cb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -107,8 +108,8 @@ public class TestSerialReplication extends SerialReplicationTestBase {
Map regionsToSeqId = new HashMap<>();
regionsToSeqId.put(region.getEncodedName(), -1L);
regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
- try (WAL.Reader reader =
- WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+ try (WAL.Reader reader = WALUtil.createReader(UTIL.getConfiguration(), "region-split",
+ logPath)) {
int count = 0;
for (Entry entry;;) {
entry = reader.next();
@@ -168,8 +169,8 @@ public class TestSerialReplication extends SerialReplicationTestBase {
RegionInfo region = regionsAfterMerge.get(0);
regionsToSeqId.put(region.getEncodedName(), -1L);
regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
- try (WAL.Reader reader =
- WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+ try (WAL.Reader reader = WALUtil.createReader(UTIL.getConfiguration(), "region-merge",
+ logPath)) {
int count = 0;
for (Entry entry;;) {
entry = reader.next();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
index 42adab6..b6d973a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
@@ -124,7 +124,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
Assert.assertTrue(files.length > 0);
for (FileStatus file : files) {
try (
- Reader reader = WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) {
+ Reader reader = WALUtil.createReader(utility.getConfiguration(), "sync-repl-active",
+ file.getPath())) {
Entry entry = reader.next();
Assert.assertTrue(entry != null);
while (entry != null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
index d01a0ac..17c8224 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java
@@ -52,7 +52,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -157,10 +159,13 @@ public class TestRecoverStandbyProcedure {
if (!fs.exists(peerRemoteWALDir)) {
fs.mkdirs(peerRemoteWALDir);
}
+ WALFactory factory = new WALFactory(conf, "standby");
+ WALProvider provider = factory.getWALProvider();
for (int i = 0; i < WAL_NUMBER; i++) {
try (ProtobufLogWriter writer = new ProtobufLogWriter()) {
Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep");
- writer.init(fs, wal, conf, true, WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir));
+ writer.init(provider.createWALIdentity(wal.toString()), conf, true,
+ WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir));
List entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT);
for (Entry entry : entries) {
writer.append(entry);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
index bd800a8..05c2c34 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -74,6 +75,8 @@ public class TestRaceWhenCreatingReplicationSource {
private static FileSystem FS;
+ private static WALFactory walFactory;
+
private static Path LOG_PATH;
private static WALProvider.Writer WRITER;
@@ -138,7 +141,8 @@ public class TestRaceWhenCreatingReplicationSource {
Path dir = UTIL.getDataTestDirOnTestFS();
FS = UTIL.getTestFileSystem();
LOG_PATH = new Path(dir, "replicated");
- WRITER = WALFactory.createWALWriter(FS, LOG_PATH, UTIL.getConfiguration());
+ walFactory = new WALFactory(UTIL.getConfiguration(), "race");
+ WRITER = WALUtil.createWriter(UTIL.getConfiguration(), walFactory, LOG_PATH);
UTIL.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(),
@@ -184,7 +188,7 @@ public class TestRaceWhenCreatingReplicationSource {
@Override
public boolean evaluate() throws Exception {
- try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) {
+ try (WAL.Reader reader = WALUtil.createReader(walFactory, LOG_PATH)) {
return reader.next() != null;
} catch (IOException e) {
return false;
@@ -196,7 +200,7 @@ public class TestRaceWhenCreatingReplicationSource {
return "Replication has not catched up";
}
});
- try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) {
+ try (WAL.Reader reader = WALUtil.createReader(walFactory, LOG_PATH)) {
Cell cell = reader.next().getEdit().getCells().get(0);
assertEquals(1, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
assertArrayEquals(CF, CellUtil.cloneFamily(cell));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 274ccab..be58eac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -57,11 +57,14 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WALUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -118,8 +121,8 @@ public class TestReplicationSource {
Path logPath = new Path(logDir, "log");
if (!FS.exists(logDir)) FS.mkdirs(logDir);
if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
- WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
- TEST_UTIL.getConfiguration());
+ WALProvider.Writer writer = WALUtil.createWriter(TEST_UTIL.getConfiguration(), "moving",
+ logPath);
for(int i = 0; i < 3; i++) {
byte[] b = Bytes.toBytes(Integer.toString(i));
KeyValue kv = new KeyValue(b,b,b);
@@ -132,7 +135,7 @@ public class TestReplicationSource {
}
writer.close();
- WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration());
+ WAL.Reader reader = WALUtil.createReader(TEST_UTIL.getConfiguration(), "moving", logPath);
WAL.Entry entry = reader.next();
assertNotNull(entry);
@@ -174,8 +177,8 @@ public class TestReplicationSource {
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
- source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
- p -> OptionalLong.empty(), null);
+ source.init(testConf, manager, null, mockPeer, null, "testPeer", null,
+ p -> OptionalLong.empty(), null, null);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future> future = executor.submit(new Runnable() {
@@ -301,8 +304,8 @@ public class TestReplicationSource {
String walGroupId = "fake-wal-group-id";
ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
- PriorityBlockingQueue queue = new PriorityBlockingQueue<>();
- queue.put(new Path("/www/html/test"));
+ PriorityBlockingQueue queue = new PriorityBlockingQueue<>();
+ queue.put(new FSWALIdentity(new Path("/www/html/test")));
RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
Server server = Mockito.mock(Server.class);
Mockito.when(server.getServerName()).thenReturn(serverName);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 0872ea7..e9e037e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -87,7 +87,9 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -337,7 +339,7 @@ public abstract class TestReplicationSourceManager {
when(source.isRecovered()).thenReturn(false);
when(source.isSyncReplication()).thenReturn(false);
manager.logPositionAndCleanOldLogs(source,
- new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
+ new WALEntryBatch(0, manager.getSources().get(0).getCurrentWALIdentity()));
wal.append(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
@@ -572,7 +574,8 @@ public abstract class TestReplicationSourceManager {
assertNotNull(source);
final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
// Enqueue log and check if metrics updated
- source.enqueueLog(new Path("abc"));
+ WALFactory factory = new WALFactory(utility.getConfiguration(), "remove-peer");
+ source.enqueueLog(factory.getWALProvider().createWALIdentity((new Path("abc")).toString()));
assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());
@@ -614,13 +617,17 @@ public abstract class TestReplicationSourceManager {
ReplicationPeerConfig.newBuilder()
.setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build(),
true);
+ WALFactory wals =
+ new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8"));
+ WALProvider provider = wals.getWALProvider();
try {
// make sure that we can deal with files which does not exist
String walNameNotExists =
"remoteWAL-12345-" + slaveId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX;
Path wal = new Path(logDir, walNameNotExists);
- manager.preLogRoll(wal);
- manager.postLogRoll(wal);
+ WALIdentity walId = provider.createWALIdentity(wal.toString());
+ manager.preLogRoll(walId);
+ manager.postLogRoll(walId);
Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
fs.mkdirs(remoteLogDirForPeer);
@@ -630,8 +637,8 @@ public abstract class TestReplicationSourceManager {
new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
fs.create(remoteWAL).close();
wal = new Path(logDir, walName);
- manager.preLogRoll(wal);
- manager.postLogRoll(wal);
+ manager.preLogRoll(walId);
+ manager.postLogRoll(walId);
ReplicationSourceInterface source = mockReplicationSource(peerId2);
manager.cleanOldLogs(walName, true, source);
@@ -649,13 +656,16 @@ public abstract class TestReplicationSourceManager {
@Test
public void testSameWALPrefix() throws IOException {
Set latestWalsBefore =
- manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet());
+ manager.getLastestPath().stream().map(WALIdentity::getName).collect(Collectors.toSet());
+ WALFactory wals =
+ new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8"));
+ WALProvider provider = wals.getWALProvider();
String walName1 = "localhost,8080,12345-45678-Peer.34567";
String walName2 = "localhost,8080,12345.56789";
- manager.preLogRoll(new Path(walName1));
- manager.preLogRoll(new Path(walName2));
+ manager.preLogRoll(provider.createWALIdentity((new Path(walName1)).toString()));
+ manager.preLogRoll(provider.createWALIdentity((new Path(walName2)).toString()));
- Set latestWals = manager.getLastestPath().stream().map(Path::getName)
+ Set latestWals = manager.getLastestPath().stream().map(WALIdentity::getName)
.filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet());
assertEquals(2, latestWals.size());
assertTrue(latestWals.contains(walName1));
@@ -819,10 +829,10 @@ public abstract class TestReplicationSourceManager {
static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
@Override
- public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+ public void init(Configuration conf, ReplicationSourceManager manager,
ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
- UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
- throws IOException {
+ UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics,
+ WALProvider provider) throws IOException {
throw new IOException("Failing deliberately");
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index fac6f74..34b084b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -56,10 +55,12 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
@@ -98,8 +99,9 @@ public class TestWALEntryStream {
}
private WAL log;
- PriorityBlockingQueue walQueue;
+ PriorityBlockingQueue walQueue;
private PathWatcher pathWatcher;
+ private WALFactory wals;
@Rule
public TestName tn = new TestName();
@@ -124,7 +126,7 @@ public class TestWALEntryStream {
public void setUp() throws Exception {
walQueue = new PriorityBlockingQueue<>();
pathWatcher = new PathWatcher();
- final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
+ wals = new WALFactory(CONF, tn.getMethodName());
wals.getWALProvider().addWALActionsListener(pathWatcher);
log = wals.getWAL(info);
}
@@ -156,7 +158,8 @@ public class TestWALEntryStream {
log.rollWriter();
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"),
+ wals.getWALProvider())) {
int i = 0;
while (entryStream.hasNext()) {
assertNotNull(entryStream.next());
@@ -183,7 +186,8 @@ public class TestWALEntryStream {
appendToLogAndSync();
long oldPos;
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"),
+ wals.getWALProvider())) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.peek();
@@ -197,8 +201,8 @@ public class TestWALEntryStream {
appendToLogAndSync();
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos,
- log, null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, oldPos,
+ log, null, new MetricsSource("1"), wals.getWALProvider())) {
// Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
@@ -211,8 +215,8 @@ public class TestWALEntryStream {
log.rollWriter();
appendToLogAndSync();
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos,
- log, null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, oldPos,
+ log, null, new MetricsSource("1"), wals.getWALProvider())) {
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
@@ -237,7 +241,8 @@ public class TestWALEntryStream {
appendToLog("1");
appendToLog("2");// 2
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"),
+ wals.getWALProvider())) {
assertEquals("1", getRow(entryStream.next()));
appendToLog("3"); // 3 - comes in after reader opened
@@ -262,7 +267,8 @@ public class TestWALEntryStream {
public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1");
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"),
+ wals.getWALProvider())) {
entryStream.next(); // we've hit the end of the stream at this point
// some new entries come in while we're streaming
@@ -285,7 +291,8 @@ public class TestWALEntryStream {
long lastPosition = 0;
appendToLog("1");
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"),
+ wals.getWALProvider())) {
entryStream.next(); // we've hit the end of the stream at this point
appendToLog("2");
appendToLog("3");
@@ -293,7 +300,8 @@ public class TestWALEntryStream {
}
// next stream should picks up where we left off
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, lastPosition, log, null, new MetricsSource("1"),
+ wals.getWALProvider())) {
assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext()); // done
@@ -310,14 +318,15 @@ public class TestWALEntryStream {
long lastPosition = 0;
appendEntriesToLogAndSync(3);
// read only one element
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, lastPosition,
- log, null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, lastPosition,
+ log, null, new MetricsSource("1"), wals.getWALProvider())) {
entryStream.next();
lastPosition = entryStream.getPosition();
}
// there should still be two more entries from where we left off
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, lastPosition, log, null, new MetricsSource("1"),
+ wals.getWALProvider())) {
assertNotNull(entryStream.next());
assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext());
@@ -328,7 +337,8 @@ public class TestWALEntryStream {
@Test
public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"),
+ wals.getWALProvider())) {
assertFalse(entryStream.hasNext());
}
}
@@ -361,7 +371,8 @@ public class TestWALEntryStream {
// get ending position
long position;
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"),
+ wals.getWALProvider())) {
entryStream.next();
entryStream.next();
entryStream.next();
@@ -369,7 +380,7 @@ public class TestWALEntryStream {
}
// start up a reader
- Path walPath = walQueue.peek();
+ WALIdentity walPath = walQueue.peek();
ReplicationSourceWALReader reader = createReader(false, CONF);
WALEntryBatch entryBatch = reader.take();
@@ -389,7 +400,7 @@ public class TestWALEntryStream {
@Test
public void testReplicationSourceWALReaderRecovered() throws Exception {
appendEntriesToLogAndSync(10);
- Path walPath = walQueue.peek();
+ WALIdentity walPath = walQueue.peek();
log.rollWriter();
appendEntriesToLogAndSync(5);
log.shutdown();
@@ -422,14 +433,14 @@ public class TestWALEntryStream {
@Test
public void testReplicationSourceWALReaderWrongPosition() throws Exception {
appendEntriesToLogAndSync(1);
- Path walPath = walQueue.peek();
+ FSWALIdentity walPath = (FSWALIdentity)walQueue.peek();
log.rollWriter();
appendEntriesToLogAndSync(20);
TEST_UTIL.waitFor(5000, new ExplainingPredicate() {
@Override
public boolean evaluate() throws Exception {
- return fs.getFileStatus(walPath).getLen() > 0;
+ return fs.getFileStatus(walPath.getPath()).getLen() > 0;
}
@Override
@@ -438,7 +449,7 @@ public class TestWALEntryStream {
}
});
- long walLength = fs.getFileStatus(walPath).getLen();
+ long walLength = fs.getFileStatus(walPath.getPath()).getLen();
ReplicationSourceWALReader reader = createReader(false, CONF);
@@ -449,7 +460,7 @@ public class TestWALEntryStream {
assertEquals(1, entryBatch.getNbEntries());
assertTrue(entryBatch.isEndOfFile());
- Path walPath2 = walQueue.peek();
+ WALIdentity walPath2 = walQueue.peek();
entryBatch = reader.take();
assertEquals(walPath2, entryBatch.getLastWalPath());
assertEquals(20, entryBatch.getNbEntries());
@@ -462,7 +473,7 @@ public class TestWALEntryStream {
assertEquals(0, entryBatch.getNbEntries());
assertTrue(entryBatch.isEndOfFile());
- Path walPath3 = walQueue.peek();
+ WALIdentity walPath3 = walQueue.peek();
entryBatch = reader.take();
assertEquals(walPath3, entryBatch.getLastWalPath());
assertEquals(10, entryBatch.getNbEntries());
@@ -476,7 +487,8 @@ public class TestWALEntryStream {
// get ending position
long position;
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"),
+ wals.getWALProvider())) {
entryStream.next();
entryStream.next();
entryStream.next();
@@ -484,7 +496,7 @@ public class TestWALEntryStream {
}
// start up a reader
- Path walPath = walQueue.peek();
+ WALIdentity walPath = walQueue.peek();
ReplicationSource source = mockReplicationSource(false, CONF);
AtomicInteger invokeCount = new AtomicInteger(0);
AtomicBoolean enabled = new AtomicBoolean(false);
@@ -577,10 +589,10 @@ public class TestWALEntryStream {
class PathWatcher implements WALActionsListener {
- Path currentPath;
+ WALIdentity currentPath;
@Override
- public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+ public void preLogRoll(WALIdentity oldPath, WALIdentity newPath) throws IOException {
walQueue.add(newPath);
currentPath = newPath;
}
@@ -592,8 +604,9 @@ public class TestWALEntryStream {
appendToLog("2");
long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
AtomicLong fileLength = new AtomicLong(size - 1);
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 0,
- p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 0,
+ p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"),
+ wals.getWALProvider())) {
assertTrue(entryStream.hasNext());
assertNotNull(entryStream.next());
// can not get log 2
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index d062c77..d337d03 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -18,10 +18,6 @@
*/
package org.apache.hadoop.hbase.wal;
-import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.DEFAULT_PROVIDER_ID;
-import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID;
-import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -34,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
// imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
@@ -70,7 +67,7 @@ import org.slf4j.LoggerFactory;
* management over time, becaue the data set size may result in additional HDFS block allocations.
*/
@InterfaceAudience.Private
-public class IOTestProvider implements WALProvider {
+public class IOTestProvider extends AbstractFSWALProvider {
private static final Logger LOG = LoggerFactory.getLogger(IOTestProvider.class);
private static final String ALLOWED_OPERATIONS = "hbase.wal.iotestprovider.operations";
@@ -108,12 +105,15 @@ public class IOTestProvider implements WALProvider {
this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID;
}
+ protected void doInit(Configuration conf) throws IOException {
+ }
+
@Override
public List getWALs() {
return Collections.singletonList(log);
}
- private FSHLog createWAL() throws IOException {
+ protected FSHLog createWAL() throws IOException {
String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
return new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
AbstractFSWALProvider.getWALDirectoryName(factory.factoryId),
@@ -122,7 +122,7 @@ public class IOTestProvider implements WALProvider {
}
@Override
- public WAL getWAL(RegionInfo region) throws IOException {
+ public AbstractFSWAL getWAL(RegionInfo region) throws IOException {
FSHLog log = this.log;
if (log != null) {
return log;
@@ -210,7 +210,9 @@ public class IOTestProvider implements WALProvider {
LOG.info("creating new writer instance.");
final ProtobufLogWriter writer = new IOTestWriter();
try {
- writer.init(fs, path, conf, false, this.blocksize);
+ WALFactory factory = new WALFactory(conf, "io-test-wal");
+ writer.init(factory.getWALProvider().createWALIdentity(path.toString()), conf, false,
+ this.blocksize);
} catch (CommonFSUtils.StreamLacksCapabilityException exception) {
throw new IOException("Can't create writer instance because underlying FileSystem " +
"doesn't support needed stream capabilities.", exception);
@@ -237,7 +239,7 @@ public class IOTestProvider implements WALProvider {
private boolean doSyncs;
@Override
- public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
+ public void init(WALIdentity path, Configuration conf, boolean overwritable,
long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException {
Collection operations = conf.getStringCollection(ALLOWED_OPERATIONS);
if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) {
@@ -250,7 +252,7 @@ public class IOTestProvider implements WALProvider {
}
LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") +
" and syncs " + (doSyncs ? "enabled" : "disabled"));
- super.init(fs, path, conf, overwritable, blocksize);
+ super.init(path, conf, overwritable, blocksize);
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
index 8193806..8224b59 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java
@@ -145,7 +145,7 @@ public class TestSecureWAL {
assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value));
// Confirm the WAL can be read back
- WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
+ WAL.Reader reader = WALUtil.createReader(wals, walPath);
int count = 0;
WAL.Entry entry = new WAL.Entry();
while (reader.next(entry) != null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
index 8189cef..f3f6c3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java
@@ -115,12 +115,12 @@ public class TestSyncReplicationWALProvider {
Path localFile = wal.getCurrentFileName();
Path remoteFile = new Path(REMOTE_WAL_DIR + "/" + PEER_ID, localFile.getName());
try (ProtobufLogReader reader =
- (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
+ (ProtobufLogReader) WALUtil.createReader(FACTORY, localFile)) {
ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
timestamp);
}
try (ProtobufLogReader reader =
- (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
+ (ProtobufLogReader) WALUtil.createReader(FACTORY, remoteFile)) {
ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
timestamp);
}
@@ -146,12 +146,12 @@ public class TestSyncReplicationWALProvider {
}
});
try (ProtobufLogReader reader =
- (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
+ (ProtobufLogReader) WALUtil.createReader(FACTORY, localFile)) {
ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
timestamp);
}
try (ProtobufLogReader reader =
- (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
+ (ProtobufLogReader) WALUtil.createReader(FACTORY, remoteFile)) {
ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
timestamp);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 8fbe09d..2cbd533 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -277,8 +277,10 @@ public class TestWALFactory {
// gives you EOFE.
wal.sync();
// Open a Reader.
+ WALProvider provider = wals.getWALProvider();
Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
- reader = wals.createReader(fs, walPath);
+ WALIdentity walId = provider.createWALIdentity(walPath.toString());
+ reader = provider.createReader(walId, null, true);
int count = 0;
WAL.Entry entry = new WAL.Entry();
while ((entry = reader.next(entry)) != null) count++;
@@ -293,14 +295,14 @@ public class TestWALFactory {
System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
wal.sync();
- reader = wals.createReader(fs, walPath);
+ reader = provider.createReader(walId, null, true);
count = 0;
while((entry = reader.next(entry)) != null) count++;
assertTrue(count >= total);
reader.close();
// If I sync, should see double the edits.
wal.sync();
- reader = wals.createReader(fs, walPath);
+ reader = provider.createReader(walId, null, true);
count = 0;
while((entry = reader.next(entry)) != null) count++;
assertEquals(total * 2, count);
@@ -316,14 +318,14 @@ public class TestWALFactory {
}
// Now I should have written out lots of blocks. Sync then read.
wal.sync();
- reader = wals.createReader(fs, walPath);
+ reader = provider.createReader(walId, null, true);
count = 0;
while((entry = reader.next(entry)) != null) count++;
assertEquals(total * 3, count);
reader.close();
// shutdown and ensure that Reader gets right length also.
wal.shutdown();
- reader = wals.createReader(fs, walPath);
+ reader = provider.createReader(walId, null, true);
count = 0;
while((entry = reader.next(entry)) != null) count++;
assertEquals(total * 3, count);
@@ -338,7 +340,7 @@ public class TestWALFactory {
assertEquals(howmany * howmany, splits.size());
for (int i = 0; i < splits.size(); i++) {
LOG.info("Verifying=" + splits.get(i));
- WAL.Reader reader = wals.createReader(fs, splits.get(i));
+ WAL.Reader reader = WALUtil.createReader(wals, splits.get(i));
try {
int count = 0;
String previousRegion = null;
@@ -476,7 +478,7 @@ public class TestWALFactory {
throw t.exception;
// Make sure you can read all the content
- WAL.Reader reader = wals.createReader(fs, walPath);
+ WAL.Reader reader = WALUtil.createReader(wals, walPath);
int count = 0;
WAL.Entry entry = new WAL.Entry();
while (reader.next(entry) != null) {
@@ -532,7 +534,7 @@ public class TestWALFactory {
log.shutdown();
Path filename = AbstractFSWALProvider.getCurrentFileName(log);
// Now open a reader on the log and assert append worked.
- reader = wals.createReader(fs, filename);
+ reader = WALUtil.createReader(wals, filename);
// Above we added all columns on a single row so we only read one
// entry in the below... thats why we have '1'.
for (int i = 0; i < 1; i++) {
@@ -590,7 +592,7 @@ public class TestWALFactory {
log.shutdown();
Path filename = AbstractFSWALProvider.getCurrentFileName(log);
// Now open a reader on the log and assert append worked.
- reader = wals.createReader(fs, filename);
+ reader = WALUtil.createReader(wals, filename);
WAL.Entry entry = reader.next();
assertEquals(colCount, entry.getEdit().size());
int idx = 0;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
index 7d7896c..f698318 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java
@@ -122,8 +122,8 @@ public class TestWALOpenAfterDNRollingStart {
currentFile = new Path(oldLogDir, currentFile.getName());
}
// if the log is not rolled, then we can never open this wal forever.
- try (WAL.Reader reader = WALFactory.createReader(TEST_UTIL.getTestFileSystem(), currentFile,
- TEST_UTIL.getConfiguration())) {
+ try (WAL.Reader reader = WALUtil.createReader(TEST_UTIL.getConfiguration(), "test",
+ currentFile)) {
reader.next();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
index bc21a65..91b2cb4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java
@@ -163,9 +163,10 @@ public class TestWALReaderOnSecureWAL {
in.close();
assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value));
+ WALProvider provider = wals.getWALProvider();
// Confirm the WAL cannot be read back by ProtobufLogReader
try {
- wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
+ provider.createReader(provider.createWALIdentity(walPath.toString()), null, true);
assertFalse(true);
} catch (IOException ioe) {
// expected IOE
@@ -174,8 +175,8 @@ public class TestWALReaderOnSecureWAL {
FileStatus[] listStatus = fs.listStatus(walPath.getParent());
Path rootdir = FSUtils.getRootDir(conf);
try {
- WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null);
- s.splitLogFile(listStatus[0], null);
+ WALSplitter s = new WALSplitter(wals, conf, rootdir, null, null);
+ s.splitLogFile(provider.createWALIdentity(listStatus[0].getPath().toString()), null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt");
assertTrue(fs.exists(file));
@@ -209,7 +210,7 @@ public class TestWALReaderOnSecureWAL {
// Confirm the WAL can be read back by SecureProtobufLogReader
try {
- WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
+ WAL.Reader reader = WALUtil.createReader(wals, walPath);
reader.close();
} catch (IOException ioe) {
assertFalse(true);
@@ -218,8 +219,9 @@ public class TestWALReaderOnSecureWAL {
FileStatus[] listStatus = fs.listStatus(walPath.getParent());
Path rootdir = FSUtils.getRootDir(conf);
try {
- WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null);
- s.splitLogFile(listStatus[0], null);
+ WALSplitter s = new WALSplitter(wals, conf, rootdir, null, null);
+ s.splitLogFile(wals.getWALProvider().createWALIdentity(listStatus[0].getPath().toString()),
+ null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt");
assertTrue(!fs.exists(file));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index e6644f0..60750d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -423,7 +423,8 @@ public class TestWALSplit {
FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
String parentOfParent = p.getParent().getParent().getName();
assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
- WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
+ WALFactory factory = new WALFactory(conf, "sidelined");
+ WALUtil.createRecoveredEditsWriter(conf, wals, p).close();
}
private void useDifferentDFSClient() throws IOException {
@@ -687,7 +688,7 @@ public class TestWALSplit {
assertEquals(1, splitLog.length);
int actualCount = 0;
- Reader in = wals.createReader(fs, splitLog[0]);
+ Reader in = WALUtil.createReader(wals, splitLog[0]);
@SuppressWarnings("unused")
Entry entry;
while ((entry = in.next()) != null) ++actualCount;
@@ -810,7 +811,7 @@ public class TestWALSplit {
}
assertTrue("There should be some log greater than size 0.", 0 < largestSize);
// Set up a splitter that will throw an IOE on the output side
- WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) {
+ WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, null, null) {
@Override
protected Writer createWriter(Path logfile) throws IOException {
Writer mockWriter = Mockito.mock(Writer.class);
@@ -843,7 +844,8 @@ public class TestWALSplit {
t.setDaemon(true);
t.start();
try {
- logSplitter.splitLogFile(logfiles[largestLogFile], null);
+ logSplitter.splitLogFile(wals.getWALProvider().createWALIdentity(
+ logfiles[largestLogFile].getPath().toString()), null);
fail("Didn't throw!");
} catch (IOException ioe) {
assertTrue(ioe.toString().contains("Injected"));
@@ -940,7 +942,8 @@ public class TestWALSplit {
try {
conf.setInt("hbase.splitlog.report.period", 1000);
boolean ret = WALSplitter.splitLogFile(
- HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, wals);
+ HBASEDIR, wals.getWALProvider().createWALIdentity(logfile.getPath().toString()), spiedFs,
+ conf, localReporter, null, null, wals);
assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0);
} catch (IOException e) {
@@ -998,7 +1001,7 @@ public class TestWALSplit {
makeRegionDirs(regions);
// Create a splitter that reads and writes the data without touching disk
- WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, null, null) {
+ WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, null, null) {
/* Produce a mock writer that doesn't write anywhere */
@Override
@@ -1034,7 +1037,7 @@ public class TestWALSplit {
/* Produce a mock reader that generates fake entries */
@Override
- protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
+ protected Reader getReader(WALIdentity curLogFile, CancelableProgressable reporter)
throws IOException {
Reader mockReader = Mockito.mock(Reader.class);
Mockito.doAnswer(new Answer() {
@@ -1059,7 +1062,7 @@ public class TestWALSplit {
}
};
- logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
+ logSplitter.splitLogFile(wals.getWALProvider().createWALIdentity(logPath.toString()), null);
// Verify number of written edits per region
Map outputCounts = logSplitter.outputSink.getOutputCounts();
@@ -1147,11 +1150,11 @@ public class TestWALSplit {
assertTrue("There should be some log file",
logfiles != null && logfiles.length > 0);
- WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) {
+ WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, null, null) {
@Override
protected Writer createWriter(Path logfile)
throws IOException {
- Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
+ Writer writer = WALUtil.createRecoveredEditsWriter(conf, wals, logfile);
// After creating writer, simulate region's
// replayRecoveredEditsIfAny() which gets SplitEditFiles of this
// region and delete them, excluding files with '.temp' suffix.
@@ -1169,7 +1172,8 @@ public class TestWALSplit {
}
};
try{
- logSplitter.splitLogFile(logfiles[0], null);
+ logSplitter.splitLogFile(wals.getWALProvider().createWALIdentity(
+ logfiles[0].getPath().toString()), null);
} catch (IOException e) {
LOG.info(e.toString(), e);
fail("Throws IOException when spliting "
@@ -1210,7 +1214,7 @@ public class TestWALSplit {
int seq = 0;
int numRegionEventsAdded = 0;
for (int i = 0; i < writers; i++) {
- ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
+ ws[i] = WALUtil.createWriter(conf, wals, new Path(WALDIR, WAL_FILE_PREFIX + i));
for (int j = 0; j < entries; j++) {
int prefix = 0;
for (String region : REGIONS) {
@@ -1339,7 +1343,7 @@ public class TestWALSplit {
private int countWAL(Path log) throws IOException {
int count = 0;
- Reader in = wals.createReader(fs, log);
+ Reader in = WALUtil.createReader(wals, log);
while (in.next() != null) {
count++;
}
@@ -1409,8 +1413,7 @@ public class TestWALSplit {
}
private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
- Writer writer =
- WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
+ Writer writer = WALUtil.createWriter(conf, wals, new Path(WALDIR, WAL_FILE_PREFIX + suffix));
if (closeFile) {
writer.close();
}
@@ -1418,8 +1421,8 @@ public class TestWALSplit {
private boolean logsAreEqual(Path p1, Path p2) throws IOException {
Reader in1, in2;
- in1 = wals.createReader(fs, p1);
- in2 = wals.createReader(fs, p2);
+ in1 = WALUtil.createReader(wals, p1);
+ in2 = WALUtil.createReader(wals, p2);
Entry entry1;
Entry entry2;
while ((entry1 = in1.next()) != null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
index 861b289..9cb1ee2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java
@@ -415,7 +415,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
*/
private long verify(final WALFactory wals, final Path wal, final boolean verbose)
throws IOException {
- WAL.Reader reader = wals.createReader(wal.getFileSystem(getConf()), wal);
+ WAL.Reader reader = WALUtil.createReader(wals, wal);
long count = 0;
Map sequenceIds = new HashMap<>();
try {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java
new file mode 100644
index 0000000..70fd66d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/*
+ * This is distributed FS oriented implementation for WALIdentity
+ */
+@InterfaceAudience.Private
+public class FSWALIdentity implements WALIdentity{
+ private String name;
+ private Path path;
+
+ public FSWALIdentity(String name) {
+ this.path = new Path(name);
+ this.name = path.getName();
+ }
+
+ public FSWALIdentity(Path path) {
+ this.path = path;
+ if (path !=null) {
+ this.name = path.getName();
+ }
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * @return {@link Path} object of the name encapsulated in WALIdentity
+ */
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public int compareTo(WALIdentity o) {
+ FSWALIdentity that = (FSWALIdentity)o;
+ return this.path.compareTo(that.getPath());
+ }
+
+ @Override
+ public String toString() {
+ return this.path.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof FSWALIdentity)) {
+ return false;
+ }
+ FSWALIdentity that = (FSWALIdentity) obj;
+ return this.path.equals(that.getPath());
+ }
+ @Override
+ public int hashCode() {
+ return this.path.hashCode();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java
new file mode 100644
index 0000000..fa7d2fa
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * This interface defines the identification of WAL for both stream based and distributed FileSystem
+ * based environments.
+ * See {@link #getName()} method.
+ */
+@InterfaceAudience.Private
+public interface WALIdentity extends Comparable {
+
+ /**
+ * WALIdentity is uniquely identifying a WAL stored in this WALProvider.
+ * This name can be thought of as a human-readable, serialized form of the WALIdentity.
+ *
+ * The same value should be returned across calls to this method.
+ *
+ * @return name of the wal
+ */
+ String getName();
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java
new file mode 100644
index 0000000..6707b8e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java
@@ -0,0 +1,300 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.OptionalLong;
+import java.util.concurrent.PriorityBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Streaming access to WAL entries. This class is given a queue of WAL {@link WALIdentity}, and continually
+ * iterates through all the WAL {@link Entry} in the queue. When it's done reading from an Entry, it
+ * dequeues and starts reading from the next Entry.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class AbstractWALEntryStream implements WALEntryStream {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractWALEntryStream.class);
+
+ protected Reader reader;
+ protected WALIdentity currentPath;
+ // cache of next entry for hasNext()
+ protected Entry currentEntry;
+ // position for the current entry. As now we support peek, which means that the upper layer may
+ // choose to return before reading the current entry, so it is not safe to return the value below
+ // in getPosition.
+ protected long currentPositionOfEntry = 0;
+ // position after reading current entry
+ protected long currentPositionOfReader = 0;
+ protected final PriorityBlockingQueue logQueue;
+ protected final Configuration conf;
+ protected final WALFileLengthProvider WALFileLengthProvider;
+ // which region server the WALs belong to
+ protected final ServerName serverName;
+ protected final MetricsSource metrics;
+
+ protected boolean eofAutoRecovery;
+ private WALProvider provider;
+
+ /**
+ * Create an entry stream over the given queue at the given start position
+ * @param logQueue the queue of WAL paths
+ * @param conf {@link Configuration} to use to create {@link Reader} for this stream
+ * @param startPosition the position in the first WAL to start reading at
+ * @param serverName the server name which all WALs belong to
+ * @param metrics replication metrics
+ * @throws IOException
+ */
+ public AbstractWALEntryStream(PriorityBlockingQueue logQueue, Configuration conf,
+ long startPosition, WALFileLengthProvider WALFileLengthProvider, ServerName serverName,
+ MetricsSource metrics, WALProvider provider) throws IOException {
+ this.logQueue = logQueue;
+ this.conf = conf;
+ this.currentPositionOfEntry = startPosition;
+ this.WALFileLengthProvider = WALFileLengthProvider;
+ this.serverName = serverName;
+ this.metrics = metrics;
+ this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
+ this.provider = provider;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (currentEntry == null) {
+ try {
+ tryAdvanceEntry();
+ } catch (IOException e) {
+ handleIOException(logQueue.peek(), e);
+ }
+ }
+ return currentEntry != null;
+ }
+
+ @Override
+ public Entry peek() throws IOException {
+ return hasNext() ? currentEntry: null;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ reader.seek(pos);
+ }
+
+ @Override
+ public Entry next(Entry reuse) throws IOException {
+ return next();
+ }
+
+ @Override
+ public Entry next() throws IOException {
+ Entry save = peek();
+ currentPositionOfEntry = currentPositionOfReader;
+ currentEntry = null;
+ return save;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() throws IOException {
+ closeReader();
+ }
+
+ @Override
+ public WALIdentity getCurrentWALIdentity() {
+ return currentPath;
+ }
+
+ @Override
+ public long getPosition() {
+ return currentPositionOfEntry;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ if (reader != null && currentPath != null) {
+ resetReader();
+ }
+ }
+
+ protected void setPosition(long position) {
+ currentPositionOfEntry = position;
+ }
+
+ abstract void setCurrentPath(WALIdentity path);
+
+ private void tryAdvanceEntry() throws IOException {
+ if (checkReader()) {
+ boolean beingWritten = readNextEntryAndRecordReaderPosition();
+ if (currentEntry == null && !beingWritten) {
+ // no more entries in this log file, and the file is already closed, i.e, rolled
+ // Before dequeueing, we should always get one more attempt at reading.
+ // This is in case more entries came in after we opened the reader, and the log is rolled
+ // while we were reading. See HBASE-6758
+ resetReader();
+ readNextEntryAndRecordReaderPosition();
+ if (currentEntry == null) {
+ if (checkAllBytesParsed()) { // now we're certain we're done with this log file
+ dequeueCurrentLog();
+ if (openNextLog()) {
+ readNextEntryAndRecordReaderPosition();
+ }
+ }
+ }
+ }
+ // if currentEntry != null then just return
+ // if currentEntry == null but the file is still being written, then we should not switch to
+ // the next log either, just return here and try next time to see if there are more entries in
+ // the current file
+ }
+ // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
+ }
+
+
+
+ private void dequeueCurrentLog() throws IOException {
+ LOG.debug("Reached the end of log {}", currentPath);
+ closeReader();
+ logQueue.remove();
+ setPosition(0);
+ metrics.decrSizeOfLogQueue();
+ }
+
+ /**
+ * Returns whether the file is opened for writing.
+ */
+ private boolean readNextEntryAndRecordReaderPosition() throws IOException {
+ Entry readEntry = reader.next();
+ long readerPos = reader.getPosition();
+ OptionalLong fileLength = WALFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath);
+ if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
+ // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
+ // data, so we need to make sure that we do not read beyond the committed file length.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The provider tells us the valid length for " + currentPath + " is " +
+ fileLength.getAsLong() + ", but we have advanced to " + readerPos);
+ }
+ resetReader();
+ return true;
+ }
+ if (readEntry != null) {
+ metrics.incrLogEditsRead();
+ metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
+ }
+ currentEntry = readEntry; // could be null
+ this.currentPositionOfReader = readerPos;
+ return fileLength.isPresent();
+ }
+
+ private void closeReader() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ }
+
+ // if we don't have a reader, open a reader on the next log
+ private boolean checkReader() throws IOException {
+ if (reader == null) {
+ return openNextLog();
+ }
+ return true;
+ }
+
+ // open a reader on the next log in queue
+ abstract boolean openNextLog() throws IOException;
+
+ protected void openReader(WALIdentity path) throws IOException {
+ try {
+ // Detect if this is a new file, if so get a new reader else
+ // reset the current reader so that we see the new data
+ if (reader == null || !currentPath.equals(path)) {
+ closeReader();
+ reader = createReader(path, conf);
+ seek();
+ setCurrentPath(path);
+ } else {
+ resetReader();
+ }
+ } catch (RemoteException re) {
+ IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
+ handleIOException (path, ioe);
+ } catch (IOException ioe) {
+ handleIOException(path, ioe);
+ }
+ }
+
+ /**
+ * Creates a reader for a wal info
+ *
+ * @param WALIdentity walIdentity for FS based or stream name for stream based wal provider
+ * @param conf
+ * @return return a reader for the file
+ * @throws IOException
+ */
+ protected Reader createReader(WALIdentity walId, Configuration conf)
+ throws IOException {
+ return provider.createReader(walId, null, false);
+ }
+
+ protected void resetReader() throws IOException {
+ try {
+ currentEntry = null;
+ reader.reset();
+ seek();
+ } catch (NullPointerException npe) {
+ throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
+ } catch (IOException e) {
+ handleIOException(currentPath, e);
+ }
+ }
+
+ /**
+ * Implement for handling IO exceptions , throw back if doesn't need to be handled
+ * @param WALIdentity
+ * @param ioe IOException
+ * @throws IOException
+ */
+ protected abstract void handleIOException(WALIdentity WALIdentity, IOException e) throws IOException;
+
+ protected void seek() throws IOException {
+ if (currentPositionOfEntry != 0) {
+ reader.seek(currentPositionOfEntry);
+ }
+ }
+
+
+ protected boolean checkAllBytesParsed() throws IOException {
+ return true;
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java
new file mode 100644
index 0000000..70b3889
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.PriorityBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
+import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class FSRecoveredReplicationSource extends RecoveredReplicationSource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FSRecoveredReplicationSource.class);
+ private String logDir;
+
+ @Override
+ public void init(Configuration conf, ReplicationSourceManager manager,
+ ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
+ String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
+ MetricsSource metrics, WALProvider walProvider) throws IOException {
+ super.init(conf, manager, queueStorage, replicationPeer, server, peerClusterZnode, clusterId,
+ walFileLengthProvider, metrics, walProvider);
+ this.logDir = AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString());
+ }
+
+ @Override
+ public void locateRecoveredWALIdentities(PriorityBlockingQueue queue)
+ throws IOException {
+ boolean hasWALIdentityChanged = false;
+ PriorityBlockingQueue newWALIdentities =
+ new PriorityBlockingQueue(queueSizePerGroup,
+ new LogsComparator(this.walProvider));
+ FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
+ WALIdentityLoop:
+ for (WALIdentity walIdentity : queue) {
+ FSWALIdentity fsWal = (FSWALIdentity) walIdentity;
+ if (fs.exists(fsWal.getPath())) {
+ // still in same location, don't need to do anything
+ newWALIdentities.add(walIdentity);
+ continue;
+ }
+ // WALIdentity changed - try to find the right WALIdentity.
+ hasWALIdentityChanged = true;
+ if (server instanceof ReplicationSyncUp.DummyServer) {
+ // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
+ // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
+ WALIdentity newWALIdentity = getReplSyncUpPath(fs, fsWal.getPath());
+ newWALIdentities.add(newWALIdentity);
+ continue;
+ } else {
+ // See if Path exists in the dead RS folder (there could be a chain of failures
+ // to look at)
+ List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
+ LOG.info("NB dead servers : " + deadRegionServers.size());
+ final Path walDir = FSUtils.getWALRootDir(conf);
+ for (ServerName curDeadServerName : deadRegionServers) {
+ final Path deadRsDirectory =
+ new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName
+ .getServerName()));
+ Path[] locs = new Path[] { new Path(deadRsDirectory, walIdentity.getName()), new Path(
+ deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), walIdentity.getName()) };
+ for (Path possibleLogLocation : locs) {
+ LOG.info("Possible location " + possibleLogLocation.toUri().toString());
+ if (fs.exists(possibleLogLocation)) {
+ // We found the right new location
+ LOG.info("Log " + walIdentity + " still exists at " + possibleLogLocation);
+ newWALIdentities.add(new FSWALIdentity(possibleLogLocation));
+ continue WALIdentityLoop;
+ }
+ }
+ }
+ // didn't find a new location
+ LOG.error(String.format("WAL Path %s doesn't exist and couldn't find its new location",
+ walIdentity));
+ newWALIdentities.add(walIdentity);
+ }
+ }
+
+ if (hasWALIdentityChanged) {
+ if (newWALIdentities.size() != queue.size()) { // this shouldn't happen
+ LOG.error("Recovery queue size is incorrect");
+ throw new IOException("Recovery queue size error");
+ }
+ // put the correct locations in the queue
+ // since this is a recovered queue with no new incoming logs,
+ // there shouldn't be any concurrency issues
+ queue.clear();
+ for (WALIdentity WALIdentity : newWALIdentities) {
+ queue.add(WALIdentity);
+ }
+ }
+ }
+
+ // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
+ // area rather than to the wal area for a particular region server.
+ private WALIdentity getReplSyncUpPath(FileSystem fs, Path path) throws IOException {
+ FileStatus[] rss = fs.listStatus(manager.getLogDir());
+ for (FileStatus rs : rss) {
+ Path p = rs.getPath();
+ FileStatus[] logs = fs.listStatus(p);
+ for (FileStatus log : logs) {
+ p = new Path(p, log.getPath().getName());
+ if (p.getName().equals(path.getName())) {
+ LOG.info("Log " + p.getName() + " found at " + p);
+ return this.walProvider.createWALIdentity(p.toString());
+ }
+ }
+ }
+ LOG.error("Didn't find path for: " + path.getName());
+ return this.walProvider.createWALIdentity(path.toString());
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java
new file mode 100644
index 0000000..4ef009d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java
@@ -0,0 +1,253 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.PriorityBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
+import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Streaming access to WAL entries. This class is given a queue of WAL {@link WALIdentity}, and continually
+ * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
+ * dequeues it and starts reading from the next.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class FSWALEntryStream extends AbstractWALEntryStream {
+ private static final Logger LOG = LoggerFactory.getLogger(FSWALEntryStream.class);
+
+ private FileSystem fs;
+
+ public FSWALEntryStream(FileSystem fs, PriorityBlockingQueue logQueue, Configuration conf,
+ long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
+ MetricsSource metrics, WALProvider provider) throws IOException {
+ super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, provider);
+ this.fs = fs;
+ }
+
+ @Override
+ // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
+ protected boolean checkAllBytesParsed() throws IOException {
+ // -1 means the wal wasn't closed cleanly.
+ final long trailerSize = currentTrailerSize();
+ FileStatus stat = null;
+ try {
+ stat = fs.getFileStatus(((FSWALIdentity)this.currentPath).getPath());
+ } catch (IOException exception) {
+ LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
+ currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat());
+ metrics.incrUnknownFileLengthForClosedWAL();
+ }
+ // Here we use currentPositionOfReader instead of currentPositionOfEntry.
+ // We only call this method when currentEntry is null so usually they are the same, but there
+ // are two exceptions. One is we have nothing in the file but only a header, in this way
+ // the currentPositionOfEntry will always be 0 since we have no change to update it. The other
+ // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the
+ // last valid entry, and the currentPositionOfReader will usually point to the end of the file.
+ if (stat != null) {
+ if (trailerSize < 0) {
+ if (currentPositionOfReader < stat.getLen()) {
+ final long skippedBytes = stat.getLen() - currentPositionOfReader;
+ LOG.debug(
+ "Reached the end of WAL file '{}'. It was not closed cleanly,"
+ + " so we did not parse {} bytes of data. This is normally ok.",
+ currentPath, skippedBytes);
+ metrics.incrUncleanlyClosedWALs();
+ metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
+ }
+ } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
+ LOG.warn(
+ "Processing end of WAL file '{}'. At position {}, which is too far away from"
+ + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
+ currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat());
+ setPosition(0);
+ resetReader();
+ metrics.incrRestartedWALReading();
+ metrics.incrRepeatedFileBytes(currentPositionOfReader);
+ return false;
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is "
+ + (stat == null ? "N/A" : stat.getLen()));
+ }
+ metrics.incrCompletedWAL();
+ return true;
+ }
+
+ private Path getArchivedLog(Path path) throws IOException {
+ Path rootDir = FSUtils.getRootDir(conf);
+
+ // Try found the log in old dir
+ Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ Path archivedLogLocation = new Path(oldLogDir, path.getName());
+ if (fs.exists(archivedLogLocation)) {
+ LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+ return archivedLogLocation;
+ }
+
+ // Try found the log in the seperate old log dir
+ oldLogDir = new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
+ .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
+ archivedLogLocation = new Path(oldLogDir, path.getName());
+ if (fs.exists(archivedLogLocation)) {
+ LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+ return archivedLogLocation;
+ }
+
+ LOG.error("Couldn't locate log: " + path);
+ return path;
+ }
+
+ @Override
+ void setCurrentPath(WALIdentity path) {
+ this.currentPath = path;
+ }
+
+ @Override
+ // open a reader on the next log in queue
+ boolean openNextLog() throws IOException {
+ WALIdentity nextPath = logQueue.peek();
+ if (nextPath != null) {
+ openReader(nextPath);
+ if (reader != null) {
+ return true;
+ }
+ } else {
+ // no more files in queue, this could happen for recovered queue, or for a wal group of a sync
+ // replication peer which has already been transited to DA or S.
+ setCurrentPath(null);
+ }
+ return false;
+ }
+
+ @Override
+ protected void openReader(WALIdentity walId) throws IOException {
+ try {
+ super.openReader(walId);
+ } catch (NullPointerException npe) {
+ // Workaround for race condition in HDFS-4380
+ // which throws a NPE if we open a file before any data node has the most recent block
+ // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
+ LOG.warn("Got NPE opening reader, will retry.");
+ reader = null;
+ }
+ }
+
+ private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
+ // If the log was archived, continue reading from there
+ Path archivedLog = getArchivedLog(path);
+ if (!path.equals(archivedLog)) {
+ openReader(new FSWALIdentity(archivedLog));
+ } else {
+ throw fnfe;
+ }
+ }
+
+ // For HBASE-15019
+ private void recoverLease(final Configuration conf, final Path path) {
+ try {
+ final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
+ FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
+ fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
+ @Override
+ public boolean progress() {
+ LOG.debug("recover WAL lease: " + path);
+ return true;
+ }
+ });
+ } catch (IOException e) {
+ LOG.warn("unable to recover lease for WAL: " + path, e);
+ }
+ }
+
+ @Override
+ protected void handleIOException(WALIdentity path, IOException e) throws IOException {
+ try {
+ throw e;
+ } catch (FileNotFoundException fnfe) {
+ handleFileNotFound(((FSWALIdentity)path).getPath(), fnfe);
+ } catch (EOFException eo) {
+ handleEofException(eo);
+ } catch (LeaseNotRecoveredException lnre) {
+ // HBASE-15019 the WAL was not closed due to some hiccup.
+ LOG.warn("Try to recover the WAL lease " + currentPath, lnre);
+ recoverLease(conf, ((FSWALIdentity)currentPath).getPath());
+ reader = null;
+ }
+ }
+
+ private long currentTrailerSize() {
+ long size = -1L;
+ if (reader instanceof ProtobufLogReader) {
+ final ProtobufLogReader pblr = (ProtobufLogReader) reader;
+ size = pblr.trailerSize();
+ }
+ return size;
+ }
+
+ // if we get an EOF due to a zero-length log, and there are other logs in queue
+ // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
+ // enabled, then dump the log
+ private void handleEofException(IOException e) {
+ if ((e instanceof EOFException || e.getCause() instanceof EOFException) && logQueue.size() > 1
+ && this.eofAutoRecovery) {
+ try {
+ if (fs.getFileStatus(((FSWALIdentity)logQueue.peek()).getPath()).getLen() == 0) {
+ LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
+ logQueue.remove();
+ setPosition(0);
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Couldn't get file length information about log " + logQueue.peek());
+ }
+ }
+ }
+
+ private String getCurrentPathStat() {
+ StringBuilder sb = new StringBuilder();
+ if (currentPath != null) {
+ sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
+ .append(currentPositionOfEntry).append("\n");
+ } else {
+ sb.append("no replication ongoing, waiting for new log");
+ }
+ return sb.toString();
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALUtil.java
new file mode 100644
index 0000000..7a54640
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.wal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides helper methods related to WALProvider.
+ */
+public final class WALUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(WALUtil.class);
+
+ public static WAL.Reader createReader(WALFactory walFactory, Path path) throws IOException {
+ WALProvider provider = walFactory.getWALProvider();
+ return provider.createReader(provider.createWALIdentity(path.toString()), null, true);
+ }
+
+ public static WAL.Reader createReader(Configuration conf, String factoryId, Path path)
+ throws IOException {
+ WALFactory factory = new WALFactory(conf, factoryId);
+ WALProvider provider = factory.getWALProvider();
+ return provider.createReader(provider.createWALIdentity(path.toString()), null, true);
+ }
+
+ public static WALProvider.Writer createRecoveredEditsWriter(Configuration conf,
+ WALFactory walFactory, Path path) throws IOException {
+ WALProvider provider = walFactory.getWALProvider();
+ return provider.createWriter(conf,
+ provider.createWALIdentity(path.toString()), true);
+ }
+ public static WALProvider.Writer createWriter(Configuration conf,
+ WALFactory walFactory, Path path) throws IOException {
+ WALProvider provider = walFactory.getWALProvider();
+ return provider.createWriter(conf,
+ provider.createWALIdentity(path.toString()), false);
+ }
+ public static WALProvider.Writer createWriter(Configuration conf,
+ String factoryId, Path path) throws IOException {
+ WALFactory walFactory = new WALFactory(conf, factoryId);
+ WALProvider provider = walFactory.getWALProvider();
+ return provider.createWriter(conf,
+ provider.createWALIdentity(path.toString()), false);
+ }
+}