From 831273ccff30dde99e0b7fb5d0722751ad31b62a Mon Sep 17 00:00:00 2001 From: Ankit Singhal Date: Fri, 18 Jan 2019 13:36:52 -0500 Subject: [PATCH] HBASE-21020 WAL API changes for replication Signed-off-by: Josh Elser --- .../hbase/regionserver/HRegionServer.java | 2 +- .../regionserver/ReplicationService.java | 4 +- .../hbase/regionserver/wal/AbstractFSWAL.java | 7 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 3 +- .../replication/ReplicationEndpoint.java | 14 +- .../regionserver/FSWALEntryStream.java | 498 ++++++++++++++++++ .../HBaseInterClusterReplicationEndpoint.java | 2 - .../RecoveredReplicationSource.java | 135 ++--- .../replication/regionserver/Replication.java | 8 +- .../regionserver/ReplicationSource.java | 68 +-- .../ReplicationSourceInterface.java | 24 +- .../ReplicationSourceManager.java | 64 +-- .../ReplicationSourceShipper.java | 8 +- .../ReplicationSourceWALReader.java | 41 +- .../regionserver/ReplicationSyncUp.java | 16 +- .../regionserver/ReplicationThrottler.java | 6 +- .../SerialReplicationSourceWALReader.java | 5 +- .../regionserver/WALEntryStream.java | 397 +------------- .../regionserver/WALFileLengthProvider.java | 5 +- .../hbase/wal/AbstractFSWALProvider.java | 112 +++- .../hadoop/hbase/wal/DisabledWALProvider.java | 93 +++- .../hbase/wal/RegionGroupingProvider.java | 31 ++ .../hbase/wal/SyncReplicationWALProvider.java | 25 + .../apache/hadoop/hbase/wal/WALProvider.java | 44 +- .../replication/ReplicationSourceDummy.java | 20 +- .../regionserver/TestReplicationSource.java | 10 +- .../TestReplicationSourceManager.java | 12 +- .../TestReplicationThrottler.java | 4 +- .../regionserver/TestWALEntryStream.java | 65 ++- .../hadoop/hbase/wal/IOTestProvider.java | 43 +- .../hadoop/hbase/wal/TestWALFactory.java | 15 +- 31 files changed, 1018 insertions(+), 763 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 6242d3670f..7c7b4cc02e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3006,7 +3006,7 @@ public class HRegionServer extends HasThread implements throw new IOException("Could not find class for " + classname); } T service = ReflectionUtils.newInstance(clazz, conf); - service.initialize(server, walFs, logDir, oldLogDir, walProvider); + service.initialize(server, walProvider); return service; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index e9bbaea8ae..864736b47c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.wal.WALProvider; @@ -38,7 +36,7 @@ public interface ReplicationService { * @param walProvider can be null if not initialized inside a live region server environment, for * example, {@code ReplicationSyncUp}. */ - void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider) + void initialize(Server rs, WALProvider walProvider) throws IOException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index ab58b6700c..5beda79839 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -69,7 +69,6 @@ import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALPrettyPrinter; import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; @@ -996,11 +995,11 @@ public abstract class AbstractFSWAL implements WAL { * https://issues.apache.org/jira/browse/HBASE-14004 for more details. */ @Override - public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity walId) { + public OptionalLong getLogFileSizeIfBeingWritten(Path path) { rollWriterLock.lock(); try { - FSWALIdentity currentPath = new FSWALIdentity(getOldPath()); - if (walId.equals(currentPath)) { + Path currentPath = getOldPath(); + if (path.equals(currentPath)) { W writer = this.writer; return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty(); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index baa87a4c7d..10a07db6b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -215,7 +215,6 @@ public class FSHLog extends AbstractFSWAL { 5); this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2); this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); - // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is // put on the ring buffer. String hostingThreadName = Thread.currentThread().getName(); @@ -308,7 +307,7 @@ public class FSHLog extends AbstractFSWAL { SyncFuture syncFuture = null; SafePointZigZagLatch zigzagLatch = null; long sequence = -1L; - if (this.ringBufferEventHandler != null) { + if (this.ringBufferEventHandler != null && writer != null) { // Get sequence first to avoid dead lock when ring buffer is full // Considering below sequence // 1. replaceWriter is called and zigzagLatch is initialized diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index f4c37b1ea8..ad326b8006 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -23,15 +23,13 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - -import org.apache.hadoop.hbase.Abortable; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.TableDescriptors; -import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; /** * ReplicationEndpoint is a plugin which implements replication @@ -55,7 +53,6 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { class Context { private final Configuration localConf; private final Configuration conf; - private final FileSystem fs; private final TableDescriptors tableDescriptors; private final ReplicationPeer replicationPeer; private final String peerId; @@ -67,7 +64,6 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { public Context( final Configuration localConf, final Configuration conf, - final FileSystem fs, final String peerId, final UUID clusterId, final ReplicationPeer replicationPeer, @@ -76,7 +72,6 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { final Abortable abortable) { this.localConf = localConf; this.conf = conf; - this.fs = fs; this.clusterId = clusterId; this.peerId = peerId; this.replicationPeer = replicationPeer; @@ -90,9 +85,6 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { public Configuration getLocalConfiguration() { return localConf; } - public FileSystem getFilesystem() { - return fs; - } public UUID getClusterId() { return clusterId; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java new file mode 100644 index 0000000000..4fcdc2b2f0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java @@ -0,0 +1,498 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import com.google.common.annotations.VisibleForTesting; +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.OptionalLong; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.wal.FSWALIdentity; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually + * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it + * dequeues it and starts reading from the next. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FSWALEntryStream implements WALEntryStream { + private static final Logger LOG = LoggerFactory.getLogger(FSWALEntryStream.class); + + private FileSystem fs; + + private boolean eofAutoRecovery; + protected Reader reader; + protected WALIdentity currentWAlIdentity; + // cache of next entry for hasNext() + protected Entry currentEntry; + // position for the current entry. As now we support peek, which means that the upper layer may + // choose to return before reading the current entry, so it is not safe to return the value below + // in getPosition. + protected long currentPositionOfEntry = 0; + // position after reading current entry + protected long currentPositionOfReader = 0; + protected final PriorityBlockingQueue logQueue; + protected final Configuration conf; + // which region server the WALs belong to + protected final ServerName serverName; + protected final MetricsSource metrics; + + protected final WALProvider walProvider; + + /** + * Create an entry stream over the given queue at the given start position + * @param logQueue the queue of WAL walIds + * @param conf {@link Configuration} to use to create {@link Reader} for this stream + * @param startPosition the position in the first WAL to start reading at + * @param serverName the server name which all WALs belong to + * @param metrics replication metrics + * @param walProvider wal provider + */ + + public FSWALEntryStream(FileSystem fs, PriorityBlockingQueue logQueue, + Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics, + WALProvider walProvider) throws IOException { + this.logQueue = logQueue; + this.conf = conf; + this.currentPositionOfEntry = startPosition; + this.serverName = serverName; + this.metrics = metrics; + this.walProvider = walProvider; + this.fs = fs; + this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); + } + + /** + * @return true if there is another WAL {@link Entry} + */ + @Override + public boolean hasNext() throws IOException { + if (currentEntry == null) { + try { + tryAdvanceEntry(); + } catch (IOException e) { + handleIOException(logQueue.peek(), e); + } + } + return currentEntry != null; + } + + /** + * Returns the next WAL entry in this stream but does not advance. + */ + @Override + public Entry peek() throws IOException { + return hasNext() ? currentEntry : null; + } + + /** + * Returns the next WAL entry in this stream and advance the stream. + */ + @Override + public Entry next() throws IOException { + Entry save = peek(); + currentPositionOfEntry = currentPositionOfReader; + currentEntry = null; + return save; + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + closeReader(); + } + + /** + * @return the position of the last Entry returned by next() + */ + @Override + public long getPosition() { + return currentPositionOfEntry; + } + + /** + * @return the {@link WALIdentity} of the current WAL + */ + @Override + public WALIdentity getCurrentWalIdentity() { + return currentWAlIdentity; + } + + protected String getCurrentWalIdStat() { + StringBuilder sb = new StringBuilder(); + if (currentWAlIdentity != null) { + sb.append("currently replicating from: ").append(currentWAlIdentity).append(" at position: ") + .append(currentPositionOfEntry).append("\n"); + } else { + sb.append("no replication ongoing, waiting for new log"); + } + return sb.toString(); + } + + /** + * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned + * false) + */ + @Override + public void reset() throws IOException { + if (reader != null && currentWAlIdentity != null) { + resetReader(); + } + } + + protected void setPosition(long position) { + currentPositionOfEntry = position; + } + + private void setCurrentWalId(WALIdentity walId) { + this.currentWAlIdentity = walId; + } + + private void tryAdvanceEntry() throws IOException { + if (checkReader()) { + boolean beingWritten = readNextEntryAndRecordReaderPosition(); + if (currentEntry == null && !beingWritten) { + // no more entries in this log file, and the file is already closed, i.e, rolled + // Before dequeueing, we should always get one more attempt at reading. + // This is in case more entries came in after we opened the reader, and the log is rolled + // while we were reading. See HBASE-6758 + resetReader(); + readNextEntryAndRecordReaderPosition(); + if (currentEntry == null) { + if (checkAllBytesParsed()) { // now we're certain we're done with this log file + dequeueCurrentLog(); + if (openNextLog()) { + readNextEntryAndRecordReaderPosition(); + } + } + } + } + // if currentEntry != null then just return + // if currentEntry == null but the file is still being written, then we should not switch to + // the next log either, just return here and try next time to see if there are more entries in + // the current file + } + // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) + } + + private void dequeueCurrentLog() throws IOException { + LOG.debug("Reached the end of log {}", currentWAlIdentity); + closeReader(); + logQueue.remove(); + setPosition(0); + metrics.decrSizeOfLogQueue(); + } + + private void closeReader() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + } + + // if we don't have a reader, open a reader on the next log + private boolean checkReader() throws IOException { + if (reader == null) { + return openNextLog(); + } + return true; + } + + // open a reader on the next log in queue + private boolean openNextLog() throws IOException { + WALIdentity nextWalId = logQueue.peek(); + if (nextWalId != null) { + openReader(nextWalId); + if (reader != null) { + return true; + } + } else { + // no more files in queue, this could happen for recovered queue, or for a wal group of a sync + // replication peer which has already been transited to DA or S. + setCurrentWalId(null); + } + return false; + } + + private void openReader(WALIdentity walId) throws IOException { + try { + // Detect if this is a new file, if so get a new reader else + // reset the current reader so that we see the new data + if (reader == null || !getCurrentWalIdentity().equals(walId)) { + closeReader(); + reader = createReader(walId, conf); + seek(); + setCurrentWalId(walId); + } else { + resetReader(); + } + } catch (RemoteException re) { + IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); + if (!(ioe instanceof FileNotFoundException)) { + throw ioe; + } + handleIOException(walId, ioe); + } catch (IOException ioe) { + handleIOException(walId, ioe); + } catch (NullPointerException npe) { + // Workaround for race condition in HDFS-4380 + // which throws a NPE if we open a file before any data node has the most recent block + // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. + LOG.warn("Got NPE opening reader, will retry."); + reader = null; + } + } + + private void resetReader() throws IOException { + try { + currentEntry = null; + reader.reset(); + seek(); + } catch (IOException io) { + handleIOException(currentWAlIdentity, io); + } catch (NullPointerException npe) { + throw new IOException("NPE resetting reader, likely HDFS-4380", npe); + } + } + + private void seek() throws IOException { + if (currentPositionOfEntry != 0) { + reader.seek(currentPositionOfEntry); + } + } + + @Override + public Entry next(Entry reuse) throws IOException { + return reader.next(reuse); + } + + @Override + public void seek(long pos) throws IOException { + reader.seek(pos); + } + + // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file + private boolean checkAllBytesParsed() throws IOException { + // -1 means the wal wasn't closed cleanly. + final long trailerSize = currentTrailerSize(); + FileStatus stat = null; + try { + stat = fs.getFileStatus(((FSWALIdentity) this.currentWAlIdentity).getPath()); + } catch (IOException exception) { + LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", + currentWAlIdentity, trailerSize < 0 ? "was not" : "was", getCurrentWalIdStat()); + metrics.incrUnknownFileLengthForClosedWAL(); + } + // Here we use currentPositionOfReader instead of currentPositionOfEntry. + // We only call this method when currentEntry is null so usually they are the same, but there + // are two exceptions. One is we have nothing in the file but only a header, in this way + // the currentPositionOfEntry will always be 0 since we have no change to update it. The other + // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the + // last valid entry, and the currentPositionOfReader will usually point to the end of the file. + if (stat != null) { + if (trailerSize < 0) { + if (currentPositionOfReader < stat.getLen()) { + final long skippedBytes = stat.getLen() - currentPositionOfReader; + LOG.debug( + "Reached the end of WAL file '{}'. It was not closed cleanly," + + " so we did not parse {} bytes of data. This is normally ok.", + currentWAlIdentity, skippedBytes); + metrics.incrUncleanlyClosedWALs(); + metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); + } + } else if (currentPositionOfReader + trailerSize < stat.getLen()) { + LOG.warn("Processing end of WAL file '{}'. At position {}, which is too far away from" + + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", + currentWAlIdentity, currentPositionOfReader, stat.getLen(), getCurrentWalIdStat()); + setPosition(0); + resetReader(); + metrics.incrRestartedWALReading(); + metrics.incrRepeatedFileBytes(currentPositionOfReader); + return false; + } + } + if (LOG.isTraceEnabled()) { + LOG.trace("Reached the end of log " + this.currentWAlIdentity + + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen())); + } + metrics.incrCompletedWAL(); + return true; + } + + private Path getArchivedLog(Path path) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + + // Try found the log in old dir + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } + + // Try found the log in the seperate old log dir + oldLogDir = new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) + .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); + archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } + + LOG.error("Couldn't locate log: " + path); + return path; + } + + private void handleFileNotFound(FSWALIdentity walId, FileNotFoundException fnfe) + throws IOException { + + // If the log was archived, continue reading from there + FSWALIdentity archivedLog = new FSWALIdentity(getArchivedLog(walId.getPath())); + if (!walId.equals(archivedLog)) { + openReader(archivedLog); + } else { + throw fnfe; + } + } + + // For HBASE-15019 + private void recoverLease(final Configuration conf, final Path path) { + try { + final FileSystem dfs = FSUtils.getCurrentFileSystem(conf); + FSUtils fsUtils = FSUtils.getInstance(dfs, conf); + fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { + @Override + public boolean progress() { + LOG.debug("recover WAL lease: " + path); + return true; + } + }); + } catch (IOException e) { + LOG.warn("unable to recover lease for WAL: " + path, e); + } + } + + private void handleIOException(WALIdentity walId, IOException e) throws IOException { + try { + throw e; + } catch (FileNotFoundException fnfe) { + handleFileNotFound((FSWALIdentity) walId, fnfe); + } catch (EOFException eo) { + handleEofException(eo); + } catch (LeaseNotRecoveredException lnre) { + // HBASE-15019 the WAL was not closed due to some hiccup. + LOG.warn("Try to recover the WAL lease " + currentWAlIdentity, lnre); + recoverLease(conf, ((FSWALIdentity) currentWAlIdentity).getPath()); + reader = null; + } + } + + private long currentTrailerSize() { + long size = -1L; + if (reader instanceof ProtobufLogReader) { + final ProtobufLogReader pblr = (ProtobufLogReader) reader; + size = pblr.trailerSize(); + } + return size; + } + + // if we get an EOF due to a zero-length log, and there are other logs in queue + // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is + // enabled, then dump the log + private void handleEofException(IOException e) { + if ((e instanceof EOFException || e.getCause() instanceof EOFException) && logQueue.size() > 1 + && this.eofAutoRecovery) { + WALIdentity walId = logQueue.peek(); + try { + Path path = ((FSWALIdentity) walId).getPath(); + if (fs.getFileStatus(path).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: {} ", walId); + logQueue.remove(); + currentPositionOfEntry = 0; + } + } catch (IOException ioe) { + LOG.warn("Couldn't get file length information about log: {} ", walId); + } + } + } + + private Reader createReader(WALIdentity walId, Configuration conf) throws IOException { + Path path = ((FSWALIdentity) walId).getPath(); + return WALFactory.createReader(fs, path, conf); + } + + /** + * Returns whether the file is opened for writing. + */ + protected boolean readNextEntryAndRecordReaderPosition() throws IOException { + Entry readEntry = reader.next(); + long readerPos = reader.getPosition(); + OptionalLong fileLength = getWALFileLengthProvider() + .getLogFileSizeIfBeingWritten(((FSWALIdentity) currentWAlIdentity).getPath()); + if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { + // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted + // data, so we need to make sure that we do not read beyond the committed file length. + if (LOG.isDebugEnabled()) { + LOG.debug("The provider tells us the valid length for " + currentWAlIdentity + " is " + + fileLength.getAsLong() + ", but we have advanced to " + readerPos); + } + resetReader(); + return true; + } + if (readEntry != null) { + metrics.incrLogEditsRead(); + metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); + } + currentEntry = readEntry; // could be null + this.currentPositionOfReader = readerPos; + return fileLength.isPresent(); + } + + @VisibleForTesting + public WALFileLengthProvider getWALFileLengthProvider() { + return path -> this.walProvider.getWALs().stream() + .map(w -> w.getLogFileSizeIfBeingWritten(path)).filter(o -> o.isPresent()).findAny() + .orElse(OptionalLong.empty()); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 7db53aa7c1..d0bb50c1c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -86,7 +86,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; private ClusterConnection conn; - private Configuration localConf; private Configuration conf; // How long should we sleep for each retry private long sleepForRetries; @@ -117,7 +116,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi public void init(Context context) throws IOException { super.init(context); this.conf = HBaseConfiguration.create(ctx.getConfiguration()); - this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration()); decorateConf(); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 4bb1fe3c5d..d954082776 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -18,21 +18,15 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; -import java.util.List; import java.util.UUID; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,17 +38,16 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private public class RecoveredReplicationSource extends ReplicationSource { - private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class); - private String actualPeerId; + private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class); @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { - super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode, - clusterId, walFileLengthProvider, metrics); + String peerClusterZnode, UUID clusterId, MetricsSource metrics, WALProvider walProvider) + throws IOException { + super.init(conf, manager, queueStorage, replicationPeer, server, peerClusterZnode, clusterId, + metrics, walProvider); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } @@ -64,88 +57,6 @@ public class RecoveredReplicationSource extends ReplicationSource { return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); } - public void locateRecoveredWalIds(PriorityBlockingQueue queue) throws IOException { - boolean hasPathChanged = false; - PriorityBlockingQueue newWalIds = - new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator()); - pathsLoop: for (WALIdentity walId : queue) { - if (fs.exists(((FSWALIdentity) walId).getPath())) { - // still in same location, don't need to - // do anything - newWalIds.add(walId); - continue; - } - // Path changed - try to find the right path. - hasPathChanged = true; - if (server instanceof ReplicationSyncUp.DummyServer) { - // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data - // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists - Path newPath = getReplSyncUpPath(((FSWALIdentity)walId).getPath()); - newWalIds.add(new FSWALIdentity(newPath)); - continue; - } else { - // See if Path exists in the dead RS folder (there could be a chain of failures - // to look at) - List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); - LOG.info("NB dead servers : " + deadRegionServers.size()); - final Path walDir = FSUtils.getWALRootDir(conf); - for (ServerName curDeadServerName : deadRegionServers) { - final Path deadRsDirectory = - new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName - .getServerName())); - Path[] locs = new Path[] { new Path(deadRsDirectory, walId.getName()), new Path( - deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), walId.getName()) }; - for (Path possibleLogLocation : locs) { - LOG.info("Possible location " + possibleLogLocation.toUri().toString()); - if (manager.getFs().exists(possibleLogLocation)) { - // We found the right new location - LOG.info("Log " + walId + " still exists at " + possibleLogLocation); - newWalIds.add(new FSWALIdentity(possibleLogLocation)); - continue pathsLoop; - } - } - } - // didn't find a new location - LOG.error( - String.format("WAL Path %s doesn't exist and couldn't find its new location", walId)); - newWalIds.add(walId); - } - } - - if (hasPathChanged) { - if (newWalIds.size() != queue.size()) { // this shouldn't happen - LOG.error("Recovery queue size is incorrect"); - throw new IOException("Recovery queue size error"); - } - // put the correct locations in the queue - // since this is a recovered queue with no new incoming logs, - // there shouldn't be any concurrency issues - queue.clear(); - for (WALIdentity walId : newWalIds) { - queue.add(walId); - } - } - } - - // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal - // area rather than to the wal area for a particular region server. - private Path getReplSyncUpPath(Path path) throws IOException { - FileStatus[] rss = fs.listStatus(manager.getLogDir()); - for (FileStatus rs : rss) { - Path p = rs.getPath(); - FileStatus[] logs = fs.listStatus(p); - for (FileStatus log : logs) { - p = new Path(p, log.getPath().getName()); - if (p.getName().equals(path.getName())) { - LOG.info("Log " + p.getName() + " found at " + p); - return p; - } - } - } - LOG.error("Didn't find path for: " + path.getName()); - return path; - } - void tryFinish() { if (workerThreads.isEmpty()) { this.getSourceMetrics().clear(); @@ -167,4 +78,36 @@ public class RecoveredReplicationSource extends ReplicationSource { public boolean isRecovered() { return true; } + + /** + * Get the updated queue of the wals if the wals are moved to another location. + * @param queue Updated queue with the new walIds + * @throws IOException IOException + */ + public void locateRecoveredWalIds(PriorityBlockingQueue queue) throws IOException { + PriorityBlockingQueue newWalIds = + new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator()); + boolean hasPathChanged = false; + for (WALIdentity wal : queue) { + WALIdentity updateRecoveredWalIds = this.getWalProvider().locateWalId(wal, server, + this.replicationQueueInfo.getDeadRegionServers()); + if (!updateRecoveredWalIds.equals(wal)) { + hasPathChanged = true; + } + newWalIds.add(updateRecoveredWalIds); + } + if (hasPathChanged) { + if (newWalIds.size() != queue.size()) { // this shouldn't happen + LOG.error("Recovery queue size is incorrect"); + throw new IOException("Recovery queue size error"); + } + // put the correct locations in the queue + // since this is a recovered queue with no new incoming logs, + // there shouldn't be any concurrency issues + queue.clear(); + for (WALIdentity walId : newWalIds) { + queue.add(walId); + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 799d9750ed..eef9dddf2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -20,13 +20,11 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.OptionalLong; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; @@ -86,7 +84,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer } @Override - public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, + public void initialize(Server server, WALProvider walProvider) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); @@ -125,9 +123,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer } SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager(); this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, - replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, - walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), - mapping); + replicationTracker, conf, this.server, clusterId, mapping, walProvider); this.syncReplicationPeerInfoProvider = new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping); PeerActionListener peerActionListener = PeerActionListener.DUMMY; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 12c63fd6a9..2db5a9699c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath; - -import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; @@ -37,7 +34,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -61,9 +57,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +99,6 @@ public class ReplicationSource implements ReplicationSourceInterface { protected Server server; // How long should we sleep for each retry private long sleepForRetries; - protected FileSystem fs; // id of this cluster private UUID clusterId; // total number of edits we replicated @@ -126,7 +121,6 @@ public class ReplicationSource implements ReplicationSourceInterface { private ReplicationThrottler throttler; private long defaultBandwidth; private long currentBandwidth; - private WALFileLengthProvider walFileLengthProvider; @VisibleForTesting protected final ConcurrentHashMap workerThreads = new ConcurrentHashMap<>(); @@ -139,22 +133,13 @@ public class ReplicationSource implements ReplicationSourceInterface { private int waitOnEndpointSeconds = -1; private Thread initThread; + protected WALProvider walProvider; - /** - * Instantiation method used by region servers - * @param conf configuration to use - * @param fs file system to use - * @param manager replication manager to ping to - * @param server the server for this region server - * @param queueId the id of our replication queue - * @param clusterId unique UUID for the cluster - * @param metrics metrics for replication source - */ @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { + String queueId, UUID clusterId, MetricsSource metrics, WALProvider walProvider) + throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); this.waitOnEndpointSeconds = @@ -170,7 +155,6 @@ public class ReplicationSource implements ReplicationSourceInterface { this.manager = manager; this.metrics = metrics; this.clusterId = clusterId; - this.fs = fs; this.queueId = queueId; this.replicationQueueInfo = new ReplicationQueueInfo(queueId); this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); @@ -179,7 +163,7 @@ public class ReplicationSource implements ReplicationSourceInterface { currentBandwidth = getCurrentBandwidth(); this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); - this.walFileLengthProvider = walFileLengthProvider; + this.walProvider = walProvider; LOG.info("queueId={}, ReplicationSource : {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); } @@ -283,7 +267,7 @@ public class ReplicationSource implements ReplicationSourceInterface { tableDescriptors = ((HRegionServer) server).getTableDescriptors(); } replicationEndpoint - .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, + .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server)); replicationEndpoint.start(); replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); @@ -320,7 +304,8 @@ public class ReplicationSource implements ReplicationSourceInterface { @Override public Map getWalGroupStatus() { Map sourceReplicationStatus = new TreeMap<>(); - long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize; + long lastTimeStamp, ageOfLastShippedOp, replicationDelay; + long logSize = -1; for (Map.Entry walGroupShipper : workerThreads.entrySet()) { String walGroupId = walGroupShipper.getKey(); ReplicationSourceShipper shipper = walGroupShipper.getValue(); @@ -330,19 +315,14 @@ public class ReplicationSource implements ReplicationSourceInterface { replicationDelay = ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); WALIdentity currentPath = shipper.getCurrentWALIdentity(); - try { - fileSize = getFileSize(((FSWALIdentity)currentPath).getPath()); - } catch (IOException e) { - LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e); - fileSize = -1; - } + //TODO Fix log size ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); statusBuilder.withPeerId(this.getPeerId()) .withQueueSize(queueSize) .withWalGroup(walGroupId) .withCurrentWalId(currentPath) .withCurrentPosition(shipper.getCurrentPosition()) - .withFileSize(fileSize) + .withFileSize(logSize) .withAgeOfLastShippedOp(ageOfLastShippedOp) .withReplicationDelay(replicationDelay); sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build()); @@ -350,16 +330,6 @@ public class ReplicationSource implements ReplicationSourceInterface { return sourceReplicationStatus; } - private long getFileSize(Path currentPath) throws IOException { - long fileSize; - try { - fileSize = fs.getContentSummary(currentPath).getLength(); - } catch (FileNotFoundException e) { - currentPath = getArchivedLogPath(currentPath, conf); - fileSize = fs.getContentSummary(currentPath).getLength(); - } - return fileSize; - } protected ReplicationSourceShipper createNewShipper(String walGroupId, PriorityBlockingQueue queue) { @@ -369,8 +339,8 @@ public class ReplicationSource implements ReplicationSourceInterface { private ReplicationSourceWALReader createNewWALReader(String walGroupId, PriorityBlockingQueue queue, long startPosition) { return replicationPeer.getPeerConfig().isSerial() - ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) - : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); + ? new SerialReplicationSourceWALReader(conf, queue, startPosition, walEntryFilter, this) + : new ReplicationSourceWALReader(conf, queue, startPosition, walEntryFilter, this); } protected final void uncaughtException(Thread t, Throwable e) { @@ -663,7 +633,7 @@ public class ReplicationSource implements ReplicationSourceInterface { @Override //offsets totalBufferUsed by deducting shipped batchSize. - public void postShipEdits(List entries, int batchSize) { + public void postShipEdits(List entries, long batchSize) { if (throttler.isEnabled()) { throttler.addPushSize(batchSize); } @@ -671,11 +641,6 @@ public class ReplicationSource implements ReplicationSourceInterface { totalBufferUsed.addAndGet(-batchSize); } - @Override - public WALFileLengthProvider getWALFileLengthProvider() { - return walFileLengthProvider; - } - @Override public ServerName getServerWALsBelongTo() { return server.getServerName(); @@ -697,4 +662,9 @@ public class ReplicationSource implements ReplicationSourceInterface { void removeWorker(ReplicationSourceShipper worker) { workerThreads.remove(worker.walGroupId, worker); } + + public WALProvider getWalProvider() { + return walProvider; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 3058fcc5a6..d86bb74809 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -23,9 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; @@ -37,6 +35,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; /** @@ -48,14 +47,20 @@ public interface ReplicationSourceInterface { /** * Initializer for the source * @param conf the configuration to use - * @param fs the file system to use * @param manager the manager to use + * @param queueStorage replication queue storage + * @param replicationPeer Replication Peer * @param server the server for this region server + * @param queueId Id of the queue + * @param clusterId id of the cluster + * @param metrics metric source for publishing replication metrics + * @param walProvider wal provider + * @throws IOException IOException */ - void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, - String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException; + String queueId, UUID clusterId, MetricsSource metrics, WALProvider walProvider) + throws IOException; /** * Add a log to the list of logs to replicate @@ -159,11 +164,6 @@ public interface ReplicationSourceInterface { */ ReplicationSourceManager getSourceManager(); - /** - * @return the wal file length provider - */ - WALFileLengthProvider getWALFileLengthProvider(); - /** * Try to throttle when the peer config with a bandwidth * @param batchSize entries size will be pushed @@ -176,7 +176,7 @@ public interface ReplicationSourceInterface { * @param entries pushed * @param batchSize entries size pushed */ - void postShipEdits(List entries, int batchSize); + void postShipEdits(List entries, long batchSize); /** * The queue of WALs only belong to one region server. This will return the server name which all diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index dd31a01e0a..595d278e5b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -63,9 +63,9 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -149,14 +149,8 @@ public class ReplicationSourceManager implements ReplicationListener { private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager; private final Configuration conf; - private final FileSystem fs; // The paths to the latest log of each wal group, for new coming peers private final Map latestWalIds; - // Path to the wals directories - private final Path logDir; - // Path to the wal archive - private final Path oldLogDir; - private final WALFileLengthProvider walFileLengthProvider; // The number of ms that we wait before moving znodes, HBASE-3596 private final long sleepBeforeFailover; // Homemade executer service for replication @@ -172,6 +166,7 @@ public class ReplicationSourceManager implements ReplicationListener { // Maximum number of retries before taking bold actions when deleting remote wal files for sync // replication peer. private final int maxRetriesMultiplier; + private final WALProvider walProvider; /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -180,16 +175,14 @@ public class ReplicationSourceManager implements ReplicationListener { * @param replicationTracker * @param conf the configuration to use * @param server the server for this region server - * @param fs the file system to use - * @param logDir the directory that contains all wal directories of live RSs - * @param oldLogDir the directory where old logs are archived - * @param clusterId + * @param clusterId id of the cluster + * @param walProvider Wal Provider */ public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, - Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, - WALFileLengthProvider walFileLengthProvider, - SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) throws IOException { + Server server, UUID clusterId, + SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, WALProvider walProvider) + throws IOException { this.sources = new ConcurrentHashMap<>(); this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; @@ -199,13 +192,10 @@ public class ReplicationSourceManager implements ReplicationListener { this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); this.oldsources = new ArrayList<>(); this.conf = conf; - this.fs = fs; - this.logDir = logDir; - this.oldLogDir = oldLogDir; + this.walProvider = walProvider; // 30 seconds this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); this.clusterId = clusterId; - this.walFileLengthProvider = walFileLengthProvider; this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager; this.replicationTracker.registerListener(this); // It's preferable to failover 1 RS at a time, but with good zk servers @@ -352,8 +342,8 @@ public class ReplicationSourceManager implements ReplicationListener { MetricsSource metrics = new MetricsSource(queueId); // init replication source - src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, - walFileLengthProvider, metrics); + src.init(conf, this, queueStorage, replicationPeer, server, queueId, clusterId, + metrics, walProvider); return src; } @@ -487,7 +477,8 @@ public class ReplicationSourceManager implements ReplicationListener { toRemove.terminate(terminateMessage); } for (NavigableSet walsByGroup : walsById.get(peerId).values()) { - walsByGroup.forEach(wal -> src.enqueueLog(new FSWALIdentity(new Path(this.logDir, wal)))); + walsByGroup.forEach( + wal -> src.enqueueLog(walProvider.createWalIdentity(server.getServerName(), wal, false))); } } LOG.info("Startup replication source for " + src.getPeerId()); @@ -508,7 +499,8 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface replicationSource = createSource(queueId, peer); this.oldsources.add(replicationSource); for (SortedSet walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { - walsByGroup.forEach(wal -> src.enqueueLog(new FSWALIdentity(wal))); + walsByGroup.forEach(wal -> src + .enqueueLog(walProvider.createWalIdentity(server.getServerName(), wal, false))); } toStartup.add(replicationSource); } @@ -675,6 +667,8 @@ public class ReplicationSourceManager implements ReplicationListener { private void removeRemoteWALs(String peerId, String remoteWALDir, Collection wals) throws IOException { Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); + //Currently Sync replication is only supported on FS based WALProvider + //TODO: Abstract FileSystem once all APIs for Sync replication is conciled for remote calls. FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); for (String wal : wals) { Path walFile = new Path(remoteWALDirForPeer, wal); @@ -964,7 +958,7 @@ public class ReplicationSourceManager implements ReplicationListener { } oldsources.add(src); for (String wal : walsSet) { - src.enqueueLog(new FSWALIdentity(new Path(oldLogDir, wal))); + src.enqueueLog(walProvider.createWalIdentity(server.getServerName(), wal, true)); } src.startup(); } @@ -1059,30 +1053,6 @@ public class ReplicationSourceManager implements ReplicationListener { return totalBufferUsed; } - /** - * Get the directory where wals are archived - * @return the directory where wals are archived - */ - public Path getOldLogDir() { - return this.oldLogDir; - } - - /** - * Get the directory where wals are stored by their RSs - * @return the directory where wals are stored by their RSs - */ - public Path getLogDir() { - return this.logDir; - } - - /** - * Get the handle on the local file system - * @return Handle on the local file system - */ - public FileSystem getFs() { - return this.fs; - } - /** * Get the ReplicationPeers used by this ReplicationSourceManager * @return the ReplicationPeers used by this ReplicationSourceManager diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 8ecd5bdc8e..23c2088fb6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -148,8 +148,8 @@ public class ReplicationSourceShipper extends Thread { * get batchEntry size excludes bulk load file sizes. * Uses ReplicationSourceWALReader's static method. */ - private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) { - int totalSize = 0; + private long getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) { + long totalSize = 0; for(Entry entry : entryBatch.getWalEntries()) { totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry); } @@ -172,7 +172,7 @@ public class ReplicationSourceShipper extends Thread { return; } int currentSize = (int) entryBatch.getHeapSize(); - int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch); + long sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch); while (isActive()) { try { try { @@ -290,7 +290,7 @@ public class ReplicationSourceShipper extends Thread { public void startup(UncaughtExceptionHandler handler) { String name = Thread.currentThread().getName(); Threads.setDaemonThreadRunning(this, - name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); + name + ".replicationSource.shipper " + walGroupId + "," + source.getQueueId(), handler); } WALIdentity getCurrentWALIdentity() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index a0b2ecd10f..babeab7219 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import java.io.EOFException; import java.io.IOException; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -27,14 +26,12 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.wal.FSWALIdentity; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALIdentity; @@ -57,7 +54,6 @@ class ReplicationSourceWALReader extends Thread { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); private final PriorityBlockingQueue logQueue; - private final FileSystem fs; private final Configuration conf; private final WALEntryFilter filter; private final ReplicationSource source; @@ -71,7 +67,6 @@ class ReplicationSourceWALReader extends Thread { private long currentPosition; private final long sleepForRetries; private final int maxRetriesMultiplier; - private final boolean eofAutoRecovery; //Indicates whether this particular worker is running private boolean isReaderRunning = true; @@ -82,19 +77,16 @@ class ReplicationSourceWALReader extends Thread { /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. - * @param fs the files system to use * @param conf configuration to use * @param logQueue The WAL queue to read off of * @param startPosition position in the first WAL to start reading from * @param filter The filter to use while reading * @param source replication source */ - public ReplicationSourceWALReader(FileSystem fs, Configuration conf, - PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, - ReplicationSource source) { + public ReplicationSourceWALReader(Configuration conf, PriorityBlockingQueue logQueue, + long startPosition, WALEntryFilter filter, ReplicationSource source) { this.logQueue = logQueue; this.currentPosition = startPosition; - this.fs = fs; this.conf = conf; this.filter = filter; this.source = source; @@ -111,7 +103,6 @@ class ReplicationSourceWALReader extends Thread { this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per - this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : " + source.getPeerId() @@ -124,10 +115,9 @@ class ReplicationSourceWALReader extends Thread { public void run() { int sleepMultiplier = 1; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream - try (WALEntryStream entryStream = - new WALEntryStream(logQueue, fs, conf, currentPosition, - source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), - source.getSourceMetrics())) { + try (WALEntryStream entryStream = this.source.getWalProvider().getWalStream(logQueue, conf, + currentPosition, source.getServerWALsBelongTo(), + source.getSourceMetrics())) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!source.isPeerEnabled()) { Threads.sleep(sleepForRetries); @@ -153,9 +143,6 @@ class ReplicationSourceWALReader extends Thread { if (sleepMultiplier < maxRetriesMultiplier) { LOG.debug("Failed to read stream of replication entries: " + e); sleepMultiplier++; - } else { - LOG.error("Failed to read stream of replication entries", e); - handleEofException(e); } Threads.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { @@ -242,24 +229,6 @@ class ReplicationSourceWALReader extends Thread { } } - // if we get an EOF due to a zero-length log, and there are other logs in queue - // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is - // enabled, then dump the log - private void handleEofException(IOException e) { - if ((e instanceof EOFException || e.getCause() instanceof EOFException) && - logQueue.size() > 1 && this.eofAutoRecovery) { - try { - if (fs.getFileStatus(((FSWALIdentity)logQueue.peek()).getPath()).getLen() == 0) { - LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); - logQueue.remove(); - currentPosition = 0; - } - } catch (IOException ioe) { - LOG.warn("Couldn't get file length information about log " + logQueue.peek()); - } - } - } - public WALIdentity getCurrentWalId() { // if we've read some WAL entries, get the walId we read from WALEntryBatch batchQueueHead = entryBatchQueue.peek(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index c7bccb3d67..e13feeba2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -21,17 +21,15 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -75,14 +73,12 @@ public class ReplicationSyncUp extends Configured implements Tool { Configuration conf = getConf(); try (ZKWatcher zkw = new ZKWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable, true)) { - Path walRootDir = FSUtils.getWALRootDir(conf); - FileSystem fs = FSUtils.getWALFileSystem(conf); - Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); - Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); - System.out.println("Start Replication Server start"); + DummyServer dummyServer = new DummyServer(zkw); + WALFactory factory = + new WALFactory(conf, dummyServer.getServerName().toString()); Replication replication = new Replication(); - replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null); + replication.initialize(dummyServer, factory.getWALProvider()); ReplicationSourceManager manager = replication.getReplicationManager(); manager.init().get(); while (manager.activeFailoverTaskCount() > 0) { @@ -99,7 +95,7 @@ public class ReplicationSyncUp extends Configured implements Tool { return 0; } - class DummyServer implements Server { + public class DummyServer implements Server { String hostname; ZKWatcher zkw; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java index 7f73030699..ba0c1078f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationThrottler.java @@ -101,12 +101,12 @@ public class ReplicationThrottler { /** * Add current size to the current cycle's total push size - * @param size is the current size added to the current cycle's + * @param batchSize is the current size added to the current cycle's * total push size */ - public void addPushSize(final int size) { + public void addPushSize(final long batchSize) { if (this.enabled) { - this.cyclePushSize += size; + this.cyclePushSize += batchSize; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index 5f33e73d95..d2464994cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; @@ -43,10 +42,10 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader private final SerialReplicationChecker checker; - public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf, + public SerialReplicationSourceWALReader(Configuration conf, PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, ReplicationSource source) { - super(fs, conf, logQueue, startPosition, filter, source); + super(conf, logQueue, startPosition, filter, source); checker = new SerialReplicationChecker(conf, source); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 3d90153080..4d89190b74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -18,31 +18,12 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import java.io.Closeable; -import java.io.FileNotFoundException; import java.io.IOException; -import java.util.OptionalLong; -import java.util.concurrent.PriorityBlockingQueue; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; -import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; -import org.apache.hadoop.hbase.wal.FSWALIdentity; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WAL.Reader; -import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALIdentity; -import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Streaming access to WAL entries. This class is given a queue of WAL {@link WALIdentity}, and @@ -51,385 +32,19 @@ import org.slf4j.LoggerFactory; */ @InterfaceAudience.Private @InterfaceStability.Evolving -class WALEntryStream implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class); - - private Reader reader; - private WALIdentity currentWAlIdentity; - // cache of next entry for hasNext() - private Entry currentEntry; - // position for the current entry. As now we support peek, which means that the upper layer may - // choose to return before reading the current entry, so it is not safe to return the value below - // in getPosition. - private long currentPositionOfEntry = 0; - // position after reading current entry - private long currentPositionOfReader = 0; - private final PriorityBlockingQueue logQueue; - private final FileSystem fs; - private final Configuration conf; - private final WALFileLengthProvider walFileLengthProvider; - // which region server the WALs belong to - private final ServerName serverName; - private final MetricsSource metrics; - +public interface WALEntryStream extends WAL.Reader { /** - * Create an entry stream over the given queue at the given start position - * @param logQueue the queue of WAL walIds - * @param fs {@link FileSystem} to use to create {@link Reader} for this stream - * @param conf {@link Configuration} to use to create {@link Reader} for this stream - * @param startPosition the position in the first WAL to start reading at - * @param serverName the server name which all WALs belong to - * @param metrics replication metrics - * @throws IOException + * @return the {@link WALIdentity} of the current WAL */ - public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs, - Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider, - ServerName serverName, MetricsSource metrics) throws IOException { - this.logQueue = logQueue; - this.fs = fs; - this.conf = conf; - this.currentPositionOfEntry = startPosition; - this.walFileLengthProvider = walFileLengthProvider; - this.serverName = serverName; - this.metrics = metrics; - } + public WALIdentity getCurrentWalIdentity(); /** * @return true if there is another WAL {@link Entry} */ - public boolean hasNext() throws IOException { - if (currentEntry == null) { - tryAdvanceEntry(); - } - return currentEntry != null; - } + public boolean hasNext() throws IOException; /** * Returns the next WAL entry in this stream but does not advance. */ - public Entry peek() throws IOException { - return hasNext() ? currentEntry: null; - } - - /** - * Returns the next WAL entry in this stream and advance the stream. - */ - public Entry next() throws IOException { - Entry save = peek(); - currentPositionOfEntry = currentPositionOfReader; - currentEntry = null; - return save; - } - - /** - * {@inheritDoc} - */ - @Override - public void close() throws IOException { - closeReader(); - } - - /** - * @return the position of the last Entry returned by next() - */ - public long getPosition() { - return currentPositionOfEntry; - } - - /** - * @return the {@link WALIdentity} of the current WAL - */ - public WALIdentity getCurrentWalIdentity() { - return currentWAlIdentity; - } - - private String getCurrentWalIdStat() { - StringBuilder sb = new StringBuilder(); - if (currentWAlIdentity != null) { - sb.append("currently replicating from: ").append(currentWAlIdentity).append(" at position: ") - .append(currentPositionOfEntry).append("\n"); - } else { - sb.append("no replication ongoing, waiting for new log"); - } - return sb.toString(); - } - - /** - * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned - * false) - */ - public void reset() throws IOException { - if (reader != null && currentWAlIdentity != null) { - resetReader(); - } - } - - private void setPosition(long position) { - currentPositionOfEntry = position; - } - - private void setCurrentWalId(WALIdentity walId) { - this.currentWAlIdentity = walId; - } - - private void tryAdvanceEntry() throws IOException { - if (checkReader()) { - boolean beingWritten = readNextEntryAndRecordReaderPosition(); - if (currentEntry == null && !beingWritten) { - // no more entries in this log file, and the file is already closed, i.e, rolled - // Before dequeueing, we should always get one more attempt at reading. - // This is in case more entries came in after we opened the reader, and the log is rolled - // while we were reading. See HBASE-6758 - resetReader(); - readNextEntryAndRecordReaderPosition(); - if (currentEntry == null) { - if (checkAllBytesParsed()) { // now we're certain we're done with this log file - dequeueCurrentLog(); - if (openNextLog()) { - readNextEntryAndRecordReaderPosition(); - } - } - } - } - // if currentEntry != null then just return - // if currentEntry == null but the file is still being written, then we should not switch to - // the next log either, just return here and try next time to see if there are more entries in - // the current file - } - // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) - } - - // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file - private boolean checkAllBytesParsed() throws IOException { - // -1 means the wal wasn't closed cleanly. - final long trailerSize = currentTrailerSize(); - FileStatus stat = null; - try { - stat = fs.getFileStatus(((FSWALIdentity)this.currentWAlIdentity).getPath()); - } catch (IOException exception) { - LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", - currentWAlIdentity, trailerSize < 0 ? "was not" : "was", getCurrentWalIdStat()); - metrics.incrUnknownFileLengthForClosedWAL(); - } - // Here we use currentPositionOfReader instead of currentPositionOfEntry. - // We only call this method when currentEntry is null so usually they are the same, but there - // are two exceptions. One is we have nothing in the file but only a header, in this way - // the currentPositionOfEntry will always be 0 since we have no change to update it. The other - // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the - // last valid entry, and the currentPositionOfReader will usually point to the end of the file. - if (stat != null) { - if (trailerSize < 0) { - if (currentPositionOfReader < stat.getLen()) { - final long skippedBytes = stat.getLen() - currentPositionOfReader; - LOG.debug( - "Reached the end of WAL file '{}'. It was not closed cleanly," + - " so we did not parse {} bytes of data. This is normally ok.", - currentWAlIdentity, skippedBytes); - metrics.incrUncleanlyClosedWALs(); - metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); - } - } else if (currentPositionOfReader + trailerSize < stat.getLen()) { - LOG.warn( - "Processing end of WAL file '{}'. At position {}, which is too far away from" + - " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", - currentWAlIdentity, currentPositionOfReader, stat.getLen(), getCurrentWalIdStat()); - setPosition(0); - resetReader(); - metrics.incrRestartedWALReading(); - metrics.incrRepeatedFileBytes(currentPositionOfReader); - return false; - } - } - if (LOG.isTraceEnabled()) { - LOG.trace("Reached the end of log " + this.currentWAlIdentity - + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen())); - } - metrics.incrCompletedWAL(); - return true; - } - - private void dequeueCurrentLog() throws IOException { - LOG.debug("Reached the end of log {}", currentWAlIdentity); - closeReader(); - logQueue.remove(); - setPosition(0); - metrics.decrSizeOfLogQueue(); - } - - /** - * Returns whether the file is opened for writing. - */ - private boolean readNextEntryAndRecordReaderPosition() throws IOException { - Entry readEntry = reader.next(); - long readerPos = reader.getPosition(); - OptionalLong fileLength = - walFileLengthProvider.getLogFileSizeIfBeingWritten(currentWAlIdentity); - if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { - // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted - // data, so we need to make sure that we do not read beyond the committed file length. - if (LOG.isDebugEnabled()) { - LOG.debug("The provider tells us the valid length for " + currentWAlIdentity + " is " + - fileLength.getAsLong() + ", but we have advanced to " + readerPos); - } - resetReader(); - return true; - } - if (readEntry != null) { - metrics.incrLogEditsRead(); - metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); - } - currentEntry = readEntry; // could be null - this.currentPositionOfReader = readerPos; - return fileLength.isPresent(); - } - - private void closeReader() throws IOException { - if (reader != null) { - reader.close(); - reader = null; - } - } - - // if we don't have a reader, open a reader on the next log - private boolean checkReader() throws IOException { - if (reader == null) { - return openNextLog(); - } - return true; - } - - // open a reader on the next log in queue - private boolean openNextLog() throws IOException { - WALIdentity nextWalId = logQueue.peek(); - if (nextWalId != null) { - openReader((FSWALIdentity)nextWalId); - if (reader != null) { - return true; - } - } else { - // no more files in queue, this could happen for recovered queue, or for a wal group of a sync - // replication peer which has already been transited to DA or S. - setCurrentWalId(null); - } - return false; - } - - private Path getArchivedLog(Path path) throws IOException { - Path rootDir = FSUtils.getRootDir(conf); - - // Try found the log in old dir - Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); - Path archivedLogLocation = new Path(oldLogDir, path.getName()); - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } - - // Try found the log in the seperate old log dir - oldLogDir = - new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) - .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); - archivedLogLocation = new Path(oldLogDir, path.getName()); - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } - - LOG.error("Couldn't locate log: " + path); - return path; - } - - private void handleFileNotFound(FSWALIdentity walId, FileNotFoundException fnfe) - throws IOException { - // If the log was archived, continue reading from there - FSWALIdentity archivedLog = new FSWALIdentity(getArchivedLog(walId.getPath())); - if (!walId.equals(archivedLog)) { - openReader(archivedLog); - } else { - throw fnfe; - } - } - - private void openReader(FSWALIdentity walId) throws IOException { - try { - // Detect if this is a new file, if so get a new reader else - // reset the current reader so that we see the new data - if (reader == null || !getCurrentWalIdentity().equals(walId)) { - closeReader(); - reader = WALFactory.createReader(fs, walId.getPath(), conf); - seek(); - setCurrentWalId(walId); - } else { - resetReader(); - } - } catch (FileNotFoundException fnfe) { - handleFileNotFound(walId, fnfe); - } catch (RemoteException re) { - IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); - if (!(ioe instanceof FileNotFoundException)) throw ioe; - handleFileNotFound(walId, (FileNotFoundException)ioe); - } catch (LeaseNotRecoveredException lnre) { - // HBASE-15019 the WAL was not closed due to some hiccup. - LOG.warn("Try to recover the WAL lease " + currentWAlIdentity, lnre); - recoverLease(conf, ((FSWALIdentity)currentWAlIdentity).getPath()); - reader = null; - } catch (NullPointerException npe) { - // Workaround for race condition in HDFS-4380 - // which throws a NPE if we open a file before any data node has the most recent block - // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. - LOG.warn("Got NPE opening reader, will retry."); - reader = null; - } - } - - // For HBASE-15019 - private void recoverLease(final Configuration conf, final Path path) { - try { - final FileSystem dfs = FSUtils.getCurrentFileSystem(conf); - FSUtils fsUtils = FSUtils.getInstance(dfs, conf); - fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { - @Override - public boolean progress() { - LOG.debug("recover WAL lease: " + path); - return true; - } - }); - } catch (IOException e) { - LOG.warn("unable to recover lease for WAL: " + path, e); - } - } - - private void resetReader() throws IOException { - try { - currentEntry = null; - reader.reset(); - seek(); - } catch (FileNotFoundException fnfe) { - // If the log was archived, continue reading from there - FSWALIdentity archivedLog = - new FSWALIdentity(getArchivedLog(((FSWALIdentity) currentWAlIdentity).getPath())); - if (!currentWAlIdentity.equals(archivedLog)) { - openReader(archivedLog); - } else { - throw fnfe; - } - } catch (NullPointerException npe) { - throw new IOException("NPE resetting reader, likely HDFS-4380", npe); - } - } - - private void seek() throws IOException { - if (currentPositionOfEntry != 0) { - reader.seek(currentPositionOfEntry); - } - } - - private long currentTrailerSize() { - long size = -1L; - if (reader instanceof ProtobufLogReader) { - final ProtobufLogReader pblr = (ProtobufLogReader) reader; - size = pblr.trailerSize(); - } - return size; - } + public Entry peek() throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java index d0b63cc244..08bff9b56f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java @@ -18,8 +18,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.OptionalLong; - -import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.fs.Path; import org.apache.yetus.audience.InterfaceAudience; /** @@ -30,5 +29,5 @@ import org.apache.yetus.audience.InterfaceAudience; @FunctionalInterface public interface WALFileLengthProvider { - OptionalLong getLogFileSizeIfBeingWritten(WALIdentity walId); + OptionalLong getLogFileSizeIfBeingWritten(Path walId); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 1f24548cb9..e5303b0fce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -23,28 +23,36 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -62,6 +70,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @InterfaceStability.Evolving public abstract class AbstractFSWALProvider> implements WALProvider { + // Path to the wals directories + // Path to the wal archive private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWALProvider.class); /** Separate old log into different dir by regionserver name **/ @@ -94,6 +104,12 @@ public abstract class AbstractFSWALProvider> implemen */ private final ReadWriteLock walCreateLock = new ReentrantReadWriteLock(); + private Path rootDir; + + private Path oldLogDir; + + private FileSystem fs; + /** * @param factory factory that made us, identity used for FS layout. may not be null * @param conf may not be null @@ -118,6 +134,9 @@ public abstract class AbstractFSWALProvider> implemen } } logPrefix = sb.toString(); + rootDir = FSUtils.getRootDir(conf); + oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + this.fs = CommonFSUtils.getWALFileSystem(conf); doInit(conf); } @@ -554,4 +573,87 @@ public abstract class AbstractFSWALProvider> implemen public static long getWALStartTimeFromWALName(String name) { return Long.parseLong(getWALNameGroupFromWALName(name, 2)); } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, + Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics) + throws IOException { + return new FSWALEntryStream(fs, logQueue, conf, startPosition, + serverName, metrics, this); + } + + @Override + public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) { + Path walPath; + if (isArchive) { + walPath = new Path(oldLogDir, walName); + } else { + Path logDir = + new Path(rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); + walPath = new Path(logDir, walName); + } + return new FSWALIdentity(walPath); + } + + @Override + public WALIdentity locateWalId(WALIdentity walId, Server server, + List deadRegionServers) throws IOException { + FSWALIdentity fsWALId = ((FSWALIdentity) walId); + if (fs.exists(fsWALId.getPath())) { + // still in same location, don't need to + // do anything + return fsWALId; + } + // Path changed - try to find the right path. + if (server instanceof ReplicationSyncUp.DummyServer) { + // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data + // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists + Path newPath = getReplSyncUpPath(server.getServerName(), ((FSWALIdentity) fsWALId).getPath()); + return new FSWALIdentity(newPath); + } else { + // See if Path exists in the dead RS folder (there could be a chain of failures + // to look at) + LOG.info("NB dead servers : " + deadRegionServers.size()); + + for (ServerName curDeadServerName : deadRegionServers) { + final Path deadRsDirectory = new Path(rootDir, + AbstractFSWALProvider.getWALDirectoryName(curDeadServerName.getServerName())); + Path[] locs = new Path[] { new Path(deadRsDirectory, fsWALId.getName()), new Path( + deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), fsWALId.getName()) }; + for (Path possibleLogLocation : locs) { + LOG.info("Possible location " + possibleLogLocation.toUri().toString()); + if (fs.exists(possibleLogLocation)) { + // We found the right new location + LOG.info("Log " + fsWALId + " still exists at " + possibleLogLocation); + return new FSWALIdentity(possibleLogLocation); + } + } + } + // didn't find a new location + LOG.error( + String.format("WAL Path %s doesn't exist and couldn't find its new location", fsWALId)); + return fsWALId; + } + } + + // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal + // area rather than to the wal area for a particular region server. + private Path getReplSyncUpPath(ServerName server, Path path) throws IOException { + Path logDir = new Path(rootDir, + AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString())); + FileStatus[] rss = fs.listStatus(logDir); + for (FileStatus rs : rss) { + Path p = rs.getPath(); + FileStatus[] logs = fs.listStatus(p); + for (FileStatus log : logs) { + p = new Path(p, log.getPath().getName()); + if (p.getName().equals(path.getName())) { + LOG.info("Log " + p.getName() + " found at " + p); + return p; + } + } + } + LOG.error("Didn't find path for: " + path.getName()); + return path; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 8822f29c36..1099b078ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -24,16 +24,22 @@ import java.util.Map; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -267,7 +273,7 @@ class DisabledWALProvider implements WALProvider { } @Override - public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity path) { + public OptionalLong getLogFileSizeIfBeingWritten(Path path) { return OptionalLong.empty(); } } @@ -286,4 +292,87 @@ class DisabledWALProvider implements WALProvider { public void addWALActionsListener(WALActionsListener listener) { disabled.registerWALActionsListener(listener); } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, + Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics) + throws IOException { + return new WALEntryStream() { + + @Override + public void close() throws IOException { + } + + @Override + public void seek(long pos) throws IOException { + } + + @Override + public void reset() throws IOException { + } + + @Override + public Entry next(Entry reuse) throws IOException { + return null; + } + + @Override + public Entry next() throws IOException { + return null; + } + + @Override + public long getPosition() throws IOException { + return 0; + } + + @Override + public Entry peek() throws IOException { + return null; + } + + @Override + public boolean hasNext() throws IOException { + return false; + } + + @Override + public WALIdentity getCurrentWalIdentity() { + return null; + } + }; + } + + @Override + public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) { + return new WALIdentity() { + + @Override + public int compareTo(WALIdentity o) { + return 0; + } + + @Override + public String getName() { + return walName; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof WALIdentity; + } + + @Override + public int hashCode() { + return 0; + } + }; + } + + @Override + public WALIdentity locateWalId(WALIdentity wal, Server server, List deadRegionServers) + throws IOException { + return wal; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index 0b7b8dad6b..8cd667af82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -26,15 +26,22 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; // imports for classes still in regionserver.wal import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -88,6 +95,7 @@ public class RegionGroupingProvider implements WALProvider { } } + /** * instantiate a strategy from a config property. * requires conf to have already been set (as well as anything the provider might need to read). @@ -137,6 +145,7 @@ public class RegionGroupingProvider implements WALProvider { private List listeners = new ArrayList<>(); private String providerId; private Class providerClass; + private WALProvider delegateProvider; @Override public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { @@ -156,6 +165,8 @@ public class RegionGroupingProvider implements WALProvider { this.providerId = sb.toString(); this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY); this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER); + delegateProvider = WALFactory.createProvider(providerClass); + delegateProvider.init(factory, conf, providerId); } private WALProvider createProvider(String group) throws IOException { @@ -285,4 +296,24 @@ public class RegionGroupingProvider implements WALProvider { // extra code actually works, then we will have other big problems. So leave it as is. listeners.add(listener); } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, + Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics) + throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + serverName, metrics, this); + } + + @Override + public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) { + return delegateProvider.createWalIdentity(serverName, walName, isArchive); + } + + @Override + public WALIdentity locateWalId(WALIdentity wal, Server server, List deadRegionServers) + throws IOException { + return delegateProvider.locateWalId(wal, server, deadRegionServers); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 9859c20464..fc287604e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.function.BiPredicate; @@ -36,14 +37,19 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener; import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.KeyLocker; @@ -349,4 +355,23 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen return provider; } + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, + Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics) + throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + serverName, metrics, this); + } + + @Override + public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) { + return provider.createWalIdentity(serverName, walName, isArchive); + } + + @Override + public WALIdentity locateWalId(WALIdentity wal, Server server, List deadRegionServers) + throws IOException { + return provider.locateWalId(wal, server, deadRegionServers); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 244a636226..3ef4efa6da 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.wal; import java.io.Closeable; import java.io.IOException; import java.util.List; -import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; import org.apache.yetus.audience.InterfaceAudience; /** @@ -109,8 +112,37 @@ public interface WALProvider { */ void addWALActionsListener(WALActionsListener listener); - default WALFileLengthProvider getWALFileLengthProvider() { - return path -> getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)) - .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); - } + /** + * Streaming implementation to retrieve WAL entries from given set of Wals. This class is given a + * queue of WAL + * @param logQueue Queue of wals + * @param conf configuration + * @param startPosition start position for the first wal in the queue + * @param serverName name of the server + * @param metrics metric source + * @return WALEntryStream + * @throws IOException IOException + */ + WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, ServerName serverName, MetricsSource metrics) throws IOException; + + /** + * Create wal identity wrapper for wal Name + * @param serverName regionserver name + * @param walName Name of the wal + * @param isArchive where it is archived + * @return WALIdentity + */ + WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive); + + /** + * Get the updated walId if it is moved after the server is dead + * @param wal original walId + * @param server server which is dead + * @param deadRegionServers list of dead region servers + * @return updated walId after relocation + * @throws IOException IOException + */ + WALIdentity locateWalId(WALIdentity wal, Server server, + List deadRegionServers) throws IOException; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index ed71e6e483..ec0ed409af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -21,9 +21,7 @@ import java.io.IOException; import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; @@ -31,10 +29,10 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; -import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; /** * Source that does nothing at all, helpful to test ReplicationSourceManager @@ -46,18 +44,15 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { private String peerClusterId; private WALIdentity currentWalId; private MetricsSource metrics; - private WALFileLengthProvider walFileLengthProvider; private AtomicBoolean startup = new AtomicBoolean(false); @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, - UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) - throws IOException { + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage rq, + ReplicationPeer rp, Server server, String peerClusterId, UUID clusterId, + MetricsSource metrics, WALProvider walProvider) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; this.metrics = metrics; - this.walFileLengthProvider = walFileLengthProvider; this.replicationPeer = rp; } @@ -144,12 +139,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override - public void postShipEdits(List entries, int batchSize) { - } - - @Override - public WALFileLengthProvider getWALFileLengthProvider() { - return walFileLengthProvider; + public void postShipEdits(List entries, long batchSize) { } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 23202058c1..6322f7903b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import java.io.IOException; -import java.util.OptionalLong; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -48,12 +47,6 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; -import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; -import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper; -import org.apache.hadoop.hbase.replication.regionserver.Replication; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -176,8 +169,7 @@ public class TestReplicationSource { testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, - p -> OptionalLong.empty(), null); + source.init(testConf, manager, null, mockPeer, null, "testPeer", null, null, null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit(new Runnable() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index def737eb06..f816ea694e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.DummyServer; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -196,7 +198,10 @@ public abstract class TestReplicationSourceManager { logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME); remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME); replication = new Replication(); - replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null); + DummyServer dummyServer = new DummyServer(); + WALFactory factory = + new WALFactory(conf, dummyServer.getServerName().toString()); + replication.initialize(dummyServer, factory.getWALProvider()); managerOfCluster = getManagerFromCluster(); if (managerOfCluster != null) { // After replication procedure, we need to add peer by hand (other than by receiving @@ -822,10 +827,9 @@ public abstract class TestReplicationSourceManager { static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, - UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) - throws IOException { + UUID clusterId, MetricsSource metrics, WALProvider provider) throws IOException { throw new IOException("Failing deliberately"); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java index c4d529e23c..2775a37de9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationThrottler.java @@ -58,8 +58,8 @@ public class TestReplicationThrottler { assertEquals(0, ticks1); assertEquals(0, ticks2); - throttler1.addPushSize(1000); - throttler2.addPushSize(1000); + throttler1.addPushSize(1000l); + throttler2.addPushSize(1000l); ticks1 = throttler1.getNextSleepInterval(5); ticks2 = throttler2.getNextSleepInterval(5); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index d22f96ab7b..4f2f163ccd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.AfterClass; @@ -106,6 +107,8 @@ public class TestWALEntryStream { public TestName tn = new TestName(); private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + private WALProvider walProvider; + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL = new HBaseTestingUtility(); @@ -125,8 +128,9 @@ public class TestWALEntryStream { public void setUp() throws Exception { walQueue = new PriorityBlockingQueue<>(); pathWatcher = new PathWatcher(); - final WALFactory wals = new WALFactory(CONF, tn.getMethodName()); - wals.getWALProvider().addWALActionsListener(pathWatcher); + WALFactory wals = new WALFactory(CONF, tn.getMethodName()); + walProvider = wals.getWALProvider(); + walProvider.addWALActionsListener(pathWatcher); log = wals.getWAL(info); } @@ -156,8 +160,8 @@ public class TestWALEntryStream { log.rollWriter(); - try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 0, null, + new MetricsSource("1"), this.walProvider)) { int i = 0; while (entryStream.hasNext()) { assertNotNull(entryStream.next()); @@ -184,7 +188,7 @@ public class TestWALEntryStream { appendToLogAndSync(); long oldPos; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, null, new MetricsSource("1"), walProvider)) { // There's one edit in the log, read it. Reading past it needs to throw exception assertTrue(entryStream.hasNext()); WAL.Entry entry = entryStream.peek(); @@ -198,8 +202,8 @@ public class TestWALEntryStream { appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos, - log, null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, oldPos, null, + new MetricsSource("1"), walProvider)) { // Read the newly added entry, make sure we made progress WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); @@ -212,8 +216,8 @@ public class TestWALEntryStream { log.rollWriter(); appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos, - log, null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, oldPos, null, + new MetricsSource("1"), walProvider)) { WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); assertNotNull(entry); @@ -238,7 +242,7 @@ public class TestWALEntryStream { appendToLog("1"); appendToLog("2");// 2 try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, null, new MetricsSource("1"), walProvider)) { assertEquals("1", getRow(entryStream.next())); appendToLog("3"); // 3 - comes in after reader opened @@ -263,7 +267,7 @@ public class TestWALEntryStream { public void testNewEntriesWhileStreaming() throws Exception { appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, null, new MetricsSource("1"), walProvider)) { entryStream.next(); // we've hit the end of the stream at this point // some new entries come in while we're streaming @@ -286,15 +290,15 @@ public class TestWALEntryStream { long lastPosition = 0; appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, null, new MetricsSource("1"), walProvider)) { entryStream.next(); // we've hit the end of the stream at this point appendToLog("2"); appendToLog("3"); lastPosition = entryStream.getPosition(); } // next stream should picks up where we left off - try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, lastPosition, null, + new MetricsSource("1"), walProvider)) { assertEquals("2", getRow(entryStream.next())); assertEquals("3", getRow(entryStream.next())); assertFalse(entryStream.hasNext()); // done @@ -311,14 +315,14 @@ public class TestWALEntryStream { long lastPosition = 0; appendEntriesToLogAndSync(3); // read only one element - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, lastPosition, - log, null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, lastPosition, null, + new MetricsSource("1"), walProvider)) { entryStream.next(); lastPosition = entryStream.getPosition(); } // there should still be two more entries from where we left off - try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, lastPosition, null, + new MetricsSource("1"), walProvider)) { assertNotNull(entryStream.next()); assertNotNull(entryStream.next()); assertFalse(entryStream.hasNext()); @@ -329,7 +333,7 @@ public class TestWALEntryStream { @Test public void testEmptyStream() throws Exception { try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, null, new MetricsSource("1"), walProvider)) { assertFalse(entryStream.hasNext()); } } @@ -341,9 +345,9 @@ public class TestWALEntryStream { ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.getSourceManager()).thenReturn(mockSourceManager); when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); - when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); + when(source.getWalProvider()).thenReturn(this.walProvider); return source; } @@ -351,7 +355,7 @@ public class TestWALEntryStream { ReplicationSource source = mockReplicationSource(recovered, conf); when(source.isPeerEnabled()).thenReturn(true); ReplicationSourceWALReader reader = - new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); + new ReplicationSourceWALReader(conf, walQueue, 0, getDummyFilter(), source); reader.start(); return reader; } @@ -362,7 +366,7 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, null, new MetricsSource("1"), walProvider)) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -476,8 +480,8 @@ public class TestWALEntryStream { appendEntriesToLogAndSync(3); // get ending position long position; - try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 0, null, + new MetricsSource("1"), this.walProvider)) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -495,7 +499,7 @@ public class TestWALEntryStream { }); ReplicationSourceWALReader reader = - new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source); + new ReplicationSourceWALReader(CONF, walQueue, 0, getDummyFilter(), source); reader.start(); Future future = ForkJoinPool.commonPool().submit(() -> { return reader.take(); @@ -591,10 +595,15 @@ public class TestWALEntryStream { public void testReadBeyondCommittedLength() throws IOException, InterruptedException { appendToLog("1"); appendToLog("2"); - long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong(); + long size = + log.getLogFileSizeIfBeingWritten(((FSWALIdentity) walQueue.peek()).getPath()).getAsLong(); AtomicLong fileLength = new AtomicLong(size - 1); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 0, - p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) { + try (WALEntryStream entryStream = + new FSWALEntryStream(fs, walQueue, CONF, 0, null, new MetricsSource("1"), walProvider) { + public WALFileLengthProvider getWALFileLengthProvider() { + return p -> OptionalLong.of(fileLength.get()); + } + }) { assertTrue(entryStream.hasNext()); assertNotNull(entryStream.next()); // can not get log 2 diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index d062c77cb3..e2cf6f5af6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -27,18 +27,24 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -92,6 +98,11 @@ public class IOTestProvider implements WALProvider { protected AtomicBoolean initialized = new AtomicBoolean(false); private List listeners = new ArrayList<>(); + + private Path oldLogDir; + + private Path rootDir; + /** * @param factory factory that made us, identity used for FS layout. may not be null * @param conf may not be null @@ -106,6 +117,8 @@ public class IOTestProvider implements WALProvider { this.factory = factory; this.conf = conf; this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID; + rootDir = FSUtils.getRootDir(conf); + oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); } @Override @@ -288,4 +301,32 @@ public class IOTestProvider implements WALProvider { // TODO Implement WALProvider.addWALActionLister } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, + Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics) + throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + serverName, metrics, this); + } + + @Override + public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) { + Path walPath; + if (isArchive) { + walPath = new Path(oldLogDir, walName); + } else { + Path logDir = + new Path(rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); + walPath = new Path(logDir, walName); + } + return new FSWALIdentity(walPath); + } + + @Override + public WALIdentity locateWalId(WALIdentity wal, Server server, List deadRegionServers) + throws IOException { + return wal; + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 8fbe09dd30..3e73300077 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -151,10 +151,9 @@ public class TestWALFactory { TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, SampleRegionWALCoprocessor.class.getName()); TEST_UTIL.startMiniDFSCluster(3); - + hbaseWALDir = TEST_UTIL.createWALRootDir(); conf = TEST_UTIL.getConfiguration(); cluster = TEST_UTIL.getDFSCluster(); - hbaseDir = TEST_UTIL.createRootDir(); hbaseWALDir = TEST_UTIL.createWALRootDir(); } @@ -679,7 +678,7 @@ public class TestWALFactory { @Test public void testWALProviders() throws IOException { - Configuration conf = new Configuration(); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); // if providers are not set but enable SyncReplicationWALProvider by default for master node // with not only system tables WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); @@ -695,7 +694,7 @@ public class TestWALFactory { @Test public void testOnlySetWALProvider() throws IOException { - Configuration conf = new Configuration(); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name()); WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); WALProvider wrappedWALProvider = ((SyncReplicationWALProvider) walFactory.getWALProvider()) @@ -709,7 +708,7 @@ public class TestWALFactory { @Test public void testOnlySetMetaWALProvider() throws IOException { - Configuration conf = new Configuration(); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name()); WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); WALProvider wrappedWALProvider = ((SyncReplicationWALProvider) walFactory.getWALProvider()) @@ -722,7 +721,7 @@ public class TestWALFactory { @Test public void testDefaultProvider() throws IOException { - final Configuration conf = new Configuration(); + final Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); // AsyncFSWal is the default, we should be able to request any WAL. final WALFactory normalWalFactory = new WALFactory(conf, this.currentServername.toString()); Class fshLogProvider = normalWalFactory.getProviderClass( @@ -745,7 +744,7 @@ public class TestWALFactory { @Test public void testCustomProvider() throws IOException { - final Configuration config = new Configuration(); + final Configuration config = new Configuration(TEST_UTIL.getConfiguration()); config.set(WALFactory.WAL_PROVIDER, IOTestProvider.class.getName()); final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); Class walProvider = walFactory.getProviderClass( @@ -757,7 +756,7 @@ public class TestWALFactory { @Test public void testCustomMetaProvider() throws IOException { - final Configuration config = new Configuration(); + final Configuration config = new Configuration(TEST_UTIL.getConfiguration()); config.set(WALFactory.META_WAL_PROVIDER, IOTestProvider.class.getName()); final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); Class walProvider = walFactory.getProviderClass( -- 2.18.0