diff --git a/conf/hbase-site.xml b/conf/hbase-site.xml
index c516ac7291..10ff930874 100644
--- a/conf/hbase-site.xml
+++ b/conf/hbase-site.xml
@@ -21,4 +21,11 @@
*/
-->
+
+ hbase.zookeeper.property.clientPort
+ 65026
+ Property from ZooKeeper's config zoo.cfg.
+ The port at which the clients will connect.
+
+
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java
index d12a1cf455..89ab00dc31 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java
@@ -71,6 +71,7 @@ public class FSWALIdentity implements WALIdentity{
FSWALIdentity that = (FSWALIdentity) obj;
return this.path.equals(that.getPath());
}
+
@Override
public int hashCode() {
return this.path.hashCode();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6242d3670f..7c7b4cc02e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -3006,7 +3006,7 @@ public class HRegionServer extends HasThread implements
throw new IOException("Could not find class for " + classname);
}
T service = ReflectionUtils.newInstance(clazz, conf);
- service.initialize(server, walFs, logDir, oldLogDir, walProvider);
+ service.initialize(server, walProvider);
return service;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index e9bbaea8ae..864736b47c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.wal.WALProvider;
@@ -38,7 +36,7 @@ public interface ReplicationService {
* @param walProvider can be null if not initialized inside a live region server environment, for
* example, {@code ReplicationSyncUp}.
*/
- void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider)
+ void initialize(Server rs, WALProvider walProvider)
throws IOException;
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index ab58b6700c..5beda79839 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -69,7 +69,6 @@ import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
@@ -996,11 +995,11 @@ public abstract class AbstractFSWAL implements WAL {
* https://issues.apache.org/jira/browse/HBASE-14004 for more details.
*/
@Override
- public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity walId) {
+ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
rollWriterLock.lock();
try {
- FSWALIdentity currentPath = new FSWALIdentity(getOldPath());
- if (walId.equals(currentPath)) {
+ Path currentPath = getOldPath();
+ if (path.equals(currentPath)) {
W writer = this.writer;
return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty();
} else {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index baa87a4c7d..4c028a1501 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -215,7 +215,8 @@ public class FSHLog extends AbstractFSWAL {
5);
this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 2);
this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC);
-
+ // rollWriter sets this.hdfs_out if it can.
+ //rollWriter();
// This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is
// put on the ring buffer.
String hostingThreadName = Thread.currentThread().getName();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index f4c37b1ea8..ad326b8006 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -23,15 +23,13 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* ReplicationEndpoint is a plugin which implements replication
@@ -55,7 +53,6 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
class Context {
private final Configuration localConf;
private final Configuration conf;
- private final FileSystem fs;
private final TableDescriptors tableDescriptors;
private final ReplicationPeer replicationPeer;
private final String peerId;
@@ -67,7 +64,6 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
public Context(
final Configuration localConf,
final Configuration conf,
- final FileSystem fs,
final String peerId,
final UUID clusterId,
final ReplicationPeer replicationPeer,
@@ -76,7 +72,6 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
final Abortable abortable) {
this.localConf = localConf;
this.conf = conf;
- this.fs = fs;
this.clusterId = clusterId;
this.peerId = peerId;
this.replicationPeer = replicationPeer;
@@ -90,9 +85,6 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
public Configuration getLocalConfiguration() {
return localConf;
}
- public FileSystem getFilesystem() {
- return fs;
- }
public UUID getClusterId() {
return clusterId;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java
new file mode 100644
index 0000000000..5b74058e6f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java
@@ -0,0 +1,325 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.PriorityBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
+ * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
+ * dequeues it and starts reading from the next.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class AbstractWALEntryStream implements WALEntryStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
+
+ protected Reader reader;
+ protected WALIdentity currentWAlIdentity;
+ // cache of next entry for hasNext()
+ protected Entry currentEntry;
+ // position for the current entry. As now we support peek, which means that the upper layer may
+ // choose to return before reading the current entry, so it is not safe to return the value below
+ // in getPosition.
+ protected long currentPositionOfEntry = 0;
+ // position after reading current entry
+ protected long currentPositionOfReader = 0;
+ protected final PriorityBlockingQueue logQueue;
+ protected final Configuration conf;
+ // which region server the WALs belong to
+ protected final ServerName serverName;
+ protected final MetricsSource metrics;
+
+ protected final WALProvider walProvider;
+
+ /**
+ * Create an entry stream over the given queue at the given start position
+ * @param logQueue the queue of WAL walIds
+ * @param conf {@link Configuration} to use to create {@link Reader} for this stream
+ * @param startPosition the position in the first WAL to start reading at
+ * @param serverName the server name which all WALs belong to
+ * @param metrics replication metrics
+ * @param walProvider wal provider
+ */
+ public AbstractWALEntryStream(PriorityBlockingQueue logQueue, Configuration conf,
+ long startPosition, ServerName serverName, MetricsSource metrics, WALProvider walProvider) {
+ this.logQueue = logQueue;
+ this.conf = conf;
+ this.currentPositionOfEntry = startPosition;
+ this.serverName = serverName;
+ this.metrics = metrics;
+ this.walProvider = walProvider;
+ }
+
+ /**
+ * @return true if there is another WAL {@link Entry}
+ */
+ @Override
+ public boolean hasNext() throws IOException {
+ if (currentEntry == null) {
+ try {
+ tryAdvanceEntry();
+ } catch (IOException e) {
+ handleIOException(logQueue.peek(), e);
+ }
+ }
+ return currentEntry != null;
+ }
+
+ /**
+ * Returns the next WAL entry in this stream but does not advance.
+ */
+ @Override
+ public Entry peek() throws IOException {
+ return hasNext() ? currentEntry : null;
+ }
+
+ /**
+ * Returns the next WAL entry in this stream and advance the stream.
+ */
+ @Override
+ public Entry next() throws IOException {
+ Entry save = peek();
+ currentPositionOfEntry = currentPositionOfReader;
+ currentEntry = null;
+ return save;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() throws IOException {
+ closeReader();
+ }
+
+ /**
+ * @return the position of the last Entry returned by next()
+ */
+ @Override
+ public long getPosition() {
+ return currentPositionOfEntry;
+ }
+
+ /**
+ * @return the {@link WALIdentity} of the current WAL
+ */
+ @Override
+ public WALIdentity getCurrentWalIdentity() {
+ return currentWAlIdentity;
+ }
+
+ protected String getCurrentWalIdStat() {
+ StringBuilder sb = new StringBuilder();
+ if (currentWAlIdentity != null) {
+ sb.append("currently replicating from: ").append(currentWAlIdentity).append(" at position: ")
+ .append(currentPositionOfEntry).append("\n");
+ } else {
+ sb.append("no replication ongoing, waiting for new log");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
+ * false)
+ */
+ @Override
+ public void reset() throws IOException {
+ if (reader != null && currentWAlIdentity != null) {
+ resetReader();
+ }
+ }
+
+ protected void setPosition(long position) {
+ currentPositionOfEntry = position;
+ }
+
+ private void setCurrentWalId(WALIdentity walId) {
+ this.currentWAlIdentity = walId;
+ }
+
+ private void tryAdvanceEntry() throws IOException {
+ if (checkReader()) {
+ boolean beingWritten = readNextEntryAndRecordReaderPosition();
+ if (currentEntry == null && !beingWritten) {
+ // no more entries in this log file, and the file is already closed, i.e, rolled
+ // Before dequeueing, we should always get one more attempt at reading.
+ // This is in case more entries came in after we opened the reader, and the log is rolled
+ // while we were reading. See HBASE-6758
+ resetReader();
+ readNextEntryAndRecordReaderPosition();
+ if (currentEntry == null) {
+ if (checkAllBytesParsed()) { // now we're certain we're done with this log file
+ dequeueCurrentLog();
+ if (openNextLog()) {
+ readNextEntryAndRecordReaderPosition();
+ }
+ }
+ }
+ }
+ // if currentEntry != null then just return
+ // if currentEntry == null but the file is still being written, then we should not switch to
+ // the next log either, just return here and try next time to see if there are more entries in
+ // the current file
+ }
+ // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
+ }
+
+ private void dequeueCurrentLog() throws IOException {
+ LOG.debug("Reached the end of log {}", currentWAlIdentity);
+ closeReader();
+ logQueue.remove();
+ setPosition(0);
+ metrics.decrSizeOfLogQueue();
+ }
+
+ private void closeReader() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ }
+
+ // if we don't have a reader, open a reader on the next log
+ private boolean checkReader() throws IOException {
+ if (reader == null) {
+ return openNextLog();
+ }
+ return true;
+ }
+
+ // open a reader on the next log in queue
+ private boolean openNextLog() throws IOException {
+ WALIdentity nextWalId = logQueue.peek();
+ if (nextWalId != null) {
+ openReader(nextWalId);
+ if (reader != null) {
+ return true;
+ }
+ } else {
+ // no more files in queue, this could happen for recovered queue, or for a wal group of a sync
+ // replication peer which has already been transited to DA or S.
+ setCurrentWalId(null);
+ }
+ return false;
+ }
+
+ protected void openReader(WALIdentity walId) throws IOException {
+ try {
+ // Detect if this is a new file, if so get a new reader else
+ // reset the current reader so that we see the new data
+ if (reader == null || !getCurrentWalIdentity().equals(walId)) {
+ closeReader();
+ reader = createReader(walId, conf);
+ seek();
+ setCurrentWalId(walId);
+ } else {
+ resetReader();
+ }
+ } catch (RemoteException re) {
+ IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
+ if (!(ioe instanceof FileNotFoundException)) throw ioe;
+ handleIOException(walId, ioe);
+ } catch (IOException ioe) {
+ handleIOException(walId, ioe);
+ } catch (NullPointerException npe) {
+ // Workaround for race condition in HDFS-4380
+ // which throws a NPE if we open a file before any data node has the most recent block
+ // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
+ LOG.warn("Got NPE opening reader, will retry.");
+ reader = null;
+ }
+ }
+
+ protected void resetReader() throws IOException {
+ try {
+ currentEntry = null;
+ reader.reset();
+ seek();
+ } catch (IOException io) {
+ handleIOException(currentWAlIdentity, io);
+ } catch (NullPointerException npe) {
+ throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
+ }
+ }
+
+ private void seek() throws IOException {
+ if (currentPositionOfEntry != 0) {
+ reader.seek(currentPositionOfEntry);
+ }
+ }
+
+ @Override
+ public Entry next(Entry reuse) throws IOException {
+ return reader.next(reuse);
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ reader.seek(pos);
+ }
+
+ /**
+ * Creates a reader for a wal
+ * @param walId Wal Identity
+ * @param conf configuration
+ * @return return a reader for the file
+ * @throws IOException IOException
+ */
+ protected abstract Reader createReader(WALIdentity walId, Configuration conf) throws IOException;
+
+ /**
+ * Implement for handling IO exceptions , throw back if doesn't need to be handled
+ * @param walId identity of the wal
+ * @param ioe IOException
+ * @throws IOException same exception thrown in case not handled
+ */
+ protected abstract void handleIOException(WALIdentity walId, IOException e) throws IOException;
+
+ /**
+ * @return
+ * @throws IOException IOException
+ */
+ protected abstract boolean readNextEntryAndRecordReaderPosition() throws IOException;
+
+ /**
+ * @return
+ * @throws IOException IOException
+ */
+ protected boolean checkAllBytesParsed() throws IOException {
+ return true;
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java
new file mode 100644
index 0000000000..425dbaad19
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.PriorityBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
+import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class FSRecoveredReplicationSource extends RecoveredReplicationSource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FSRecoveredReplicationSource.class);
+ private Path logDir;
+ private FileSystem fs;
+ private Path walRootDir;
+
+ @Override
+ public void init(Configuration conf, ReplicationSourceManager manager,
+ ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
+ String peerClusterZnode, UUID clusterId, MetricsSource metrics, WALProvider walProvider)
+ throws IOException {
+ super.init(conf, manager, queueStorage, replicationPeer, server, peerClusterZnode, clusterId,
+ metrics, walProvider);
+ this.walRootDir = FSUtils.getWALRootDir(conf);
+ this.logDir = new Path(walRootDir,
+ AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString()));
+ this.fs = FSUtils.getWALFileSystem(conf);
+ }
+
+ @Override
+ public void locateRecoveredWalIds(PriorityBlockingQueue queue) throws IOException {
+ boolean hasPathChanged = false;
+ PriorityBlockingQueue newWalIds =
+ new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator());
+ pathsLoop: for (WALIdentity wal : queue) {
+ hasPathChanged = true;
+ FSWALIdentity walId = ((FSWALIdentity) wal);
+
+ if (fs.exists(walId.getPath())) {
+ // still in same location, don't need to
+ // do anything
+ newWalIds.add(walId);
+ continue;
+ }
+ // Path changed - try to find the right path.
+
+ if (server instanceof ReplicationSyncUp.DummyServer) {
+ // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
+ // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
+ Path newPath = getReplSyncUpPath(((FSWALIdentity) walId).getPath());
+ newWalIds.add(new FSWALIdentity(newPath));
+ continue;
+ } else {
+ // See if Path exists in the dead RS folder (there could be a chain of failures
+ // to look at)
+ List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
+ LOG.info("NB dead servers : " + deadRegionServers.size());
+
+ for (ServerName curDeadServerName : deadRegionServers) {
+ final Path deadRsDirectory = new Path(walRootDir,
+ AbstractFSWALProvider.getWALDirectoryName(curDeadServerName.getServerName()));
+ Path[] locs = new Path[] { new Path(deadRsDirectory, walId.getName()), new Path(
+ deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), walId.getName()) };
+ for (Path possibleLogLocation : locs) {
+ LOG.info("Possible location " + possibleLogLocation.toUri().toString());
+ if (fs.exists(possibleLogLocation)) {
+ // We found the right new location
+ LOG.info("Log " + walId + " still exists at " + possibleLogLocation);
+ newWalIds.add(new FSWALIdentity(possibleLogLocation));
+ continue pathsLoop;
+ }
+ }
+ }
+ // didn't find a new location
+ LOG.error(
+ String.format("WAL Path %s doesn't exist and couldn't find its new location", walId));
+ newWalIds.add(walId);
+ }
+ }
+
+ if (hasPathChanged) {
+ if (newWalIds.size() != queue.size()) { // this shouldn't happen
+ LOG.error("Recovery queue size is incorrect");
+ throw new IOException("Recovery queue size error");
+ }
+ // put the correct locations in the queue
+ // since this is a recovered queue with no new incoming logs,
+ // there shouldn't be any concurrency issues
+ queue.clear();
+ for (WALIdentity walId : newWalIds) {
+ queue.add(walId);
+ }
+ }
+ }
+
+ // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
+ // area rather than to the wal area for a particular region server.
+ private Path getReplSyncUpPath(Path path) throws IOException {
+ FileStatus[] rss = fs.listStatus(this.logDir);
+ for (FileStatus rs : rss) {
+ Path p = rs.getPath();
+ FileStatus[] logs = fs.listStatus(p);
+ for (FileStatus log : logs) {
+ p = new Path(p, log.getPath().getName());
+ if (p.getName().equals(path.getName())) {
+ LOG.info("Log " + p.getName() + " found at " + p);
+ return p;
+ }
+ }
+ }
+ LOG.error("Didn't find path for: " + path.getName());
+ return path;
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java
new file mode 100644
index 0000000000..954530e3a5
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java
@@ -0,0 +1,253 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.OptionalLong;
+import java.util.concurrent.PriorityBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
+import org.apache.hadoop.hbase.wal.FSWALIdentity;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
+ * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it
+ * dequeues it and starts reading from the next.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class FSWALEntryStream extends AbstractWALEntryStream {
+ private static final Logger LOG = LoggerFactory.getLogger(FSWALEntryStream.class);
+
+ private FileSystem fs;
+
+ private boolean eofAutoRecovery;
+
+
+ public FSWALEntryStream(FileSystem fs, PriorityBlockingQueue logQueue,
+ Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics,
+ WALProvider walProvider) throws IOException {
+ super(logQueue, conf, startPosition, serverName, metrics, walProvider);
+ this.fs = fs;
+ this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
+ }
+
+ // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
+ protected boolean checkAllBytesParsed() throws IOException {
+ // -1 means the wal wasn't closed cleanly.
+ final long trailerSize = currentTrailerSize();
+ FileStatus stat = null;
+ try {
+ stat = fs.getFileStatus(((FSWALIdentity) this.currentWAlIdentity).getPath());
+ } catch (IOException exception) {
+ LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
+ currentWAlIdentity, trailerSize < 0 ? "was not" : "was", getCurrentWalIdStat());
+ metrics.incrUnknownFileLengthForClosedWAL();
+ }
+ // Here we use currentPositionOfReader instead of currentPositionOfEntry.
+ // We only call this method when currentEntry is null so usually they are the same, but there
+ // are two exceptions. One is we have nothing in the file but only a header, in this way
+ // the currentPositionOfEntry will always be 0 since we have no change to update it. The other
+ // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the
+ // last valid entry, and the currentPositionOfReader will usually point to the end of the file.
+ if (stat != null) {
+ if (trailerSize < 0) {
+ if (currentPositionOfReader < stat.getLen()) {
+ final long skippedBytes = stat.getLen() - currentPositionOfReader;
+ LOG.debug(
+ "Reached the end of WAL file '{}'. It was not closed cleanly,"
+ + " so we did not parse {} bytes of data. This is normally ok.",
+ currentWAlIdentity, skippedBytes);
+ metrics.incrUncleanlyClosedWALs();
+ metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
+ }
+ } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
+ LOG.warn("Processing end of WAL file '{}'. At position {}, which is too far away from"
+ + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
+ currentWAlIdentity, currentPositionOfReader, stat.getLen(), getCurrentWalIdStat());
+ setPosition(0);
+ resetReader();
+ metrics.incrRestartedWALReading();
+ metrics.incrRepeatedFileBytes(currentPositionOfReader);
+ return false;
+ }
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Reached the end of log " + this.currentWAlIdentity
+ + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
+ }
+ metrics.incrCompletedWAL();
+ return true;
+ }
+
+ private Path getArchivedLog(Path path) throws IOException {
+ Path rootDir = FSUtils.getRootDir(conf);
+
+ // Try found the log in old dir
+ Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ Path archivedLogLocation = new Path(oldLogDir, path.getName());
+ if (fs.exists(archivedLogLocation)) {
+ LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+ return archivedLogLocation;
+ }
+
+ // Try found the log in the seperate old log dir
+ oldLogDir = new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
+ .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
+ archivedLogLocation = new Path(oldLogDir, path.getName());
+ if (fs.exists(archivedLogLocation)) {
+ LOG.info("Log " + path + " was moved to " + archivedLogLocation);
+ return archivedLogLocation;
+ }
+
+ LOG.error("Couldn't locate log: " + path);
+ return path;
+ }
+
+ private void handleFileNotFound(FSWALIdentity walId, FileNotFoundException fnfe)
+ throws IOException {
+
+ // If the log was archived, continue reading from there
+ FSWALIdentity archivedLog = new FSWALIdentity(getArchivedLog(walId.getPath()));
+ if (!walId.equals(archivedLog)) {
+ openReader(archivedLog);
+ } else {
+ throw fnfe;
+ }
+ }
+
+ // For HBASE-15019
+ private void recoverLease(final Configuration conf, final Path path) {
+ try {
+ final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
+ FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
+ fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
+ @Override
+ public boolean progress() {
+ LOG.debug("recover WAL lease: " + path);
+ return true;
+ }
+ });
+ } catch (IOException e) {
+ LOG.warn("unable to recover lease for WAL: " + path, e);
+ }
+ }
+
+ @Override
+ protected void handleIOException(WALIdentity walId, IOException e) throws IOException {
+ try {
+ throw e;
+ } catch (FileNotFoundException fnfe) {
+ handleFileNotFound((FSWALIdentity) walId, fnfe);
+ } catch (EOFException eo) {
+ handleEofException(eo);
+ } catch (LeaseNotRecoveredException lnre) {
+ // HBASE-15019 the WAL was not closed due to some hiccup.
+ LOG.warn("Try to recover the WAL lease " + currentWAlIdentity, lnre);
+ recoverLease(conf, ((FSWALIdentity) currentWAlIdentity).getPath());
+ reader = null;
+ }
+ }
+
+ private long currentTrailerSize() {
+ long size = -1L;
+ if (reader instanceof ProtobufLogReader) {
+ final ProtobufLogReader pblr = (ProtobufLogReader) reader;
+ size = pblr.trailerSize();
+ }
+ return size;
+ }
+
+ // if we get an EOF due to a zero-length log, and there are other logs in queue
+ // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
+ // enabled, then dump the log
+ private void handleEofException(IOException e) {
+ if ((e instanceof EOFException || e.getCause() instanceof EOFException) && logQueue.size() > 1
+ && this.eofAutoRecovery) {
+ try {
+ if (fs.getFileStatus(((FSWALIdentity) logQueue.peek()).getPath()).getLen() == 0) {
+ LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
+ logQueue.remove();
+ currentPositionOfEntry = 0;
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Couldn't get file length information about log " + logQueue.peek());
+ }
+ }
+ }
+
+ @Override
+ protected Reader createReader(WALIdentity walId, Configuration conf) throws IOException {
+ Path path = ((FSWALIdentity) walId).getPath();
+ return WALFactory.createReader(fs, path, conf);
+ }
+
+ /**
+ * Returns whether the file is opened for writing.
+ */
+ protected boolean readNextEntryAndRecordReaderPosition() throws IOException {
+ Entry readEntry = reader.next();
+ long readerPos = reader.getPosition();
+ OptionalLong fileLength = getWALFileLengthProvider()
+ .getLogFileSizeIfBeingWritten(((FSWALIdentity) currentWAlIdentity).getPath());
+ if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
+ // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
+ // data, so we need to make sure that we do not read beyond the committed file length.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The provider tells us the valid length for " + currentWAlIdentity + " is "
+ + fileLength.getAsLong() + ", but we have advanced to " + readerPos);
+ }
+ resetReader();
+ return true;
+ }
+ if (readEntry != null) {
+ metrics.incrLogEditsRead();
+ metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
+ }
+ currentEntry = readEntry; // could be null
+ this.currentPositionOfReader = readerPos;
+ return fileLength.isPresent();
+ }
+
+ private WALFileLengthProvider getWALFileLengthProvider() {
+ return path -> this.walProvider.getWALs().stream()
+ .map(w -> w.getLogFileSizeIfBeingWritten(path)).filter(o -> o.isPresent()).findAny()
+ .orElse(OptionalLong.empty());
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 7db53aa7c1..d0bb50c1c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -86,7 +86,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
private ClusterConnection conn;
- private Configuration localConf;
private Configuration conf;
// How long should we sleep for each retry
private long sleepForRetries;
@@ -117,7 +116,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
public void init(Context context) throws IOException {
super.init(context);
this.conf = HBaseConfiguration.create(ctx.getConfiguration());
- this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
decorateConf();
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 4bb1fe3c5d..3a3b60d28d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -18,43 +18,33 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Class that handles the recovered source of a replication stream, which is transfered from
* another dead region server. This will be closed when all logs are pushed to peer cluster.
*/
@InterfaceAudience.Private
-public class RecoveredReplicationSource extends ReplicationSource {
-
- private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
+public abstract class RecoveredReplicationSource extends ReplicationSource {
private String actualPeerId;
@Override
- public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+ public void init(Configuration conf, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
- MetricsSource metrics) throws IOException {
- super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode,
- clusterId, walFileLengthProvider, metrics);
+ String peerClusterZnode, UUID clusterId, MetricsSource metrics, WALProvider walProvider)
+ throws IOException {
+ super.init(conf, manager, queueStorage, replicationPeer, server, peerClusterZnode, clusterId,
+ metrics, walProvider);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
@@ -64,88 +54,6 @@ public class RecoveredReplicationSource extends ReplicationSource {
return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
}
- public void locateRecoveredWalIds(PriorityBlockingQueue queue) throws IOException {
- boolean hasPathChanged = false;
- PriorityBlockingQueue newWalIds =
- new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator());
- pathsLoop: for (WALIdentity walId : queue) {
- if (fs.exists(((FSWALIdentity) walId).getPath())) {
- // still in same location, don't need to
- // do anything
- newWalIds.add(walId);
- continue;
- }
- // Path changed - try to find the right path.
- hasPathChanged = true;
- if (server instanceof ReplicationSyncUp.DummyServer) {
- // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
- // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
- Path newPath = getReplSyncUpPath(((FSWALIdentity)walId).getPath());
- newWalIds.add(new FSWALIdentity(newPath));
- continue;
- } else {
- // See if Path exists in the dead RS folder (there could be a chain of failures
- // to look at)
- List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
- LOG.info("NB dead servers : " + deadRegionServers.size());
- final Path walDir = FSUtils.getWALRootDir(conf);
- for (ServerName curDeadServerName : deadRegionServers) {
- final Path deadRsDirectory =
- new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName
- .getServerName()));
- Path[] locs = new Path[] { new Path(deadRsDirectory, walId.getName()), new Path(
- deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), walId.getName()) };
- for (Path possibleLogLocation : locs) {
- LOG.info("Possible location " + possibleLogLocation.toUri().toString());
- if (manager.getFs().exists(possibleLogLocation)) {
- // We found the right new location
- LOG.info("Log " + walId + " still exists at " + possibleLogLocation);
- newWalIds.add(new FSWALIdentity(possibleLogLocation));
- continue pathsLoop;
- }
- }
- }
- // didn't find a new location
- LOG.error(
- String.format("WAL Path %s doesn't exist and couldn't find its new location", walId));
- newWalIds.add(walId);
- }
- }
-
- if (hasPathChanged) {
- if (newWalIds.size() != queue.size()) { // this shouldn't happen
- LOG.error("Recovery queue size is incorrect");
- throw new IOException("Recovery queue size error");
- }
- // put the correct locations in the queue
- // since this is a recovered queue with no new incoming logs,
- // there shouldn't be any concurrency issues
- queue.clear();
- for (WALIdentity walId : newWalIds) {
- queue.add(walId);
- }
- }
- }
-
- // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
- // area rather than to the wal area for a particular region server.
- private Path getReplSyncUpPath(Path path) throws IOException {
- FileStatus[] rss = fs.listStatus(manager.getLogDir());
- for (FileStatus rs : rss) {
- Path p = rs.getPath();
- FileStatus[] logs = fs.listStatus(p);
- for (FileStatus log : logs) {
- p = new Path(p, log.getPath().getName());
- if (p.getName().equals(path.getName())) {
- LOG.info("Log " + p.getName() + " found at " + p);
- return p;
- }
- }
- }
- LOG.error("Didn't find path for: " + path.getName());
- return path;
- }
-
void tryFinish() {
if (workerThreads.isEmpty()) {
this.getSourceMetrics().clear();
@@ -167,4 +75,12 @@ public class RecoveredReplicationSource extends ReplicationSource {
public boolean isRecovered() {
return true;
}
+
+ /**
+ * Get the updated queue of the wals if the wals are moved to another location.
+ * @param queue Updated queue with the new walIds
+ * @throws IOException IOException
+ */
+ public abstract void locateRecoveredWalIds(PriorityBlockingQueue queue)
+ throws IOException;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 799d9750ed..eef9dddf2b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -20,13 +20,11 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
@@ -86,7 +84,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
}
@Override
- public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir,
+ public void initialize(Server server,
WALProvider walProvider) throws IOException {
this.server = server;
this.conf = this.server.getConfiguration();
@@ -125,9 +123,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
}
SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
- replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
- walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
- mapping);
+ replicationTracker, conf, this.server, clusterId, mapping, walProvider);
this.syncReplicationPeerInfoProvider =
new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
PeerActionListener peerActionListener = PeerActionListener.DUMMY;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 12c63fd6a9..6359a2eac7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath;
-
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
@@ -37,7 +34,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -61,9 +57,9 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,7 +99,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected Server server;
// How long should we sleep for each retry
private long sleepForRetries;
- protected FileSystem fs;
// id of this cluster
private UUID clusterId;
// total number of edits we replicated
@@ -126,7 +121,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
private ReplicationThrottler throttler;
private long defaultBandwidth;
private long currentBandwidth;
- private WALFileLengthProvider walFileLengthProvider;
@VisibleForTesting
protected final ConcurrentHashMap workerThreads =
new ConcurrentHashMap<>();
@@ -139,6 +133,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
private int waitOnEndpointSeconds = -1;
private Thread initThread;
+ protected WALProvider walProvider;
/**
* Instantiation method used by region servers
@@ -151,10 +146,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
* @param metrics metrics for replication source
*/
@Override
- public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+ public void init(Configuration conf, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
- MetricsSource metrics) throws IOException {
+ String queueId, UUID clusterId, MetricsSource metrics, WALProvider walProvider)
+ throws IOException {
this.server = server;
this.conf = HBaseConfiguration.create(conf);
this.waitOnEndpointSeconds =
@@ -170,7 +165,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.manager = manager;
this.metrics = metrics;
this.clusterId = clusterId;
- this.fs = fs;
this.queueId = queueId;
this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
@@ -179,7 +173,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
- this.walFileLengthProvider = walFileLengthProvider;
+ this.walProvider = walProvider;
LOG.info("queueId={}, ReplicationSource : {}, currentBandwidth={}", queueId,
replicationPeer.getId(), this.currentBandwidth);
}
@@ -283,7 +277,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
replicationEndpoint
- .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs,
+ .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(),
replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
replicationEndpoint.start();
replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
@@ -320,7 +314,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
@Override
public Map getWalGroupStatus() {
Map sourceReplicationStatus = new TreeMap<>();
- long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize;
+ long lastTimeStamp, ageOfLastShippedOp, replicationDelay;
+ long logSize = -1;
for (Map.Entry walGroupShipper : workerThreads.entrySet()) {
String walGroupId = walGroupShipper.getKey();
ReplicationSourceShipper shipper = walGroupShipper.getValue();
@@ -330,19 +325,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
replicationDelay =
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
WALIdentity currentPath = shipper.getCurrentWALIdentity();
- try {
- fileSize = getFileSize(((FSWALIdentity)currentPath).getPath());
- } catch (IOException e) {
- LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e);
- fileSize = -1;
- }
+ //TODO Fix log size
ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
statusBuilder.withPeerId(this.getPeerId())
.withQueueSize(queueSize)
.withWalGroup(walGroupId)
.withCurrentWalId(currentPath)
.withCurrentPosition(shipper.getCurrentPosition())
- .withFileSize(fileSize)
+ .withFileSize(logSize)
.withAgeOfLastShippedOp(ageOfLastShippedOp)
.withReplicationDelay(replicationDelay);
sourceReplicationStatus.put(this.getPeerId() + "=>" + walGroupId, statusBuilder.build());
@@ -350,16 +340,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
return sourceReplicationStatus;
}
- private long getFileSize(Path currentPath) throws IOException {
- long fileSize;
- try {
- fileSize = fs.getContentSummary(currentPath).getLength();
- } catch (FileNotFoundException e) {
- currentPath = getArchivedLogPath(currentPath, conf);
- fileSize = fs.getContentSummary(currentPath).getLength();
- }
- return fileSize;
- }
protected ReplicationSourceShipper createNewShipper(String walGroupId,
PriorityBlockingQueue queue) {
@@ -369,8 +349,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
private ReplicationSourceWALReader createNewWALReader(String walGroupId,
PriorityBlockingQueue queue, long startPosition) {
return replicationPeer.getPeerConfig().isSerial()
- ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
- : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
+ ? new SerialReplicationSourceWALReader(conf, queue, startPosition, walEntryFilter, this)
+ : new ReplicationSourceWALReader(conf, queue, startPosition, walEntryFilter, this);
}
protected final void uncaughtException(Thread t, Throwable e) {
@@ -671,11 +651,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
totalBufferUsed.addAndGet(-batchSize);
}
- @Override
- public WALFileLengthProvider getWALFileLengthProvider() {
- return walFileLengthProvider;
- }
-
@Override
public ServerName getServerWALsBelongTo() {
return server.getServerName();
@@ -697,4 +672,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
void removeWorker(ReplicationSourceShipper worker) {
workerThreads.remove(worker.walGroupId, worker);
}
+
+ public WALProvider getWalProvider() {
+ return walProvider;
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index d613049d38..dff249893e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -23,6 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.wal.WALProvider;
/**
* Constructs a {@link ReplicationSourceInterface}
@@ -32,14 +33,15 @@ public class ReplicationSourceFactory {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class);
- static ReplicationSourceInterface create(Configuration conf, String queueId) {
+ static ReplicationSourceInterface create(Configuration conf, String queueId,
+ WALProvider walProvider) {
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
ReplicationSourceInterface src;
try {
- String defaultReplicationSourceImpl =
- isQueueRecovered ? RecoveredReplicationSource.class.getCanonicalName()
- : ReplicationSource.class.getCanonicalName();
+ String defaultReplicationSourceImpl = isQueueRecovered
+ ? walProvider.getRecoveredReplicationSource().getClass().getCanonicalName()
+ : ReplicationSource.class.getCanonicalName();
Class> c = Class.forName(
conf.get("replication.replicationsource.implementation", defaultReplicationSourceImpl));
src = c.asSubclass(ReplicationSourceInterface.class).getDeclaredConstructor().newInstance();
@@ -47,7 +49,8 @@ public class ReplicationSourceFactory {
LOG.warn("Passed replication source implementation throws errors, "
+ "defaulting to ReplicationSource",
e);
- src = isQueueRecovered ? new RecoveredReplicationSource() : new ReplicationSource();
+ src =
+ isQueueRecovered ? walProvider.getRecoveredReplicationSource() : new ReplicationSource();
}
return src;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 3058fcc5a6..66861e3dfa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -23,9 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@@ -37,6 +35,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -48,14 +47,20 @@ public interface ReplicationSourceInterface {
/**
* Initializer for the source
* @param conf the configuration to use
- * @param fs the file system to use
* @param manager the manager to use
+ * @param queueStorage replication queue storage
+ * @param replicationPeer Replication Peer
* @param server the server for this region server
+ * @param queueId Id of the queue
+ * @param clusterId id of the cluster
+ * @param metrics metric source for publishing replication metrics
+ * @param walProvider wal provider
+ * @throws IOException IOException
*/
- void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+ void init(Configuration conf, ReplicationSourceManager manager,
ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
- String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
- MetricsSource metrics) throws IOException;
+ String queueId, UUID clusterId, MetricsSource metrics, WALProvider walProvider)
+ throws IOException;
/**
* Add a log to the list of logs to replicate
@@ -159,11 +164,6 @@ public interface ReplicationSourceInterface {
*/
ReplicationSourceManager getSourceManager();
- /**
- * @return the wal file length provider
- */
- WALFileLengthProvider getWALFileLengthProvider();
-
/**
* Try to throttle when the peer config with a bandwidth
* @param batchSize entries size will be pushed
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b948d7e2df..1a59d9fb52 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -61,11 +61,12 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -148,14 +149,8 @@ public class ReplicationSourceManager implements ReplicationListener {
private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;
private final Configuration conf;
- private final FileSystem fs;
// The paths to the latest log of each wal group, for new coming peers
private final Map latestWalIds;
- // Path to the wals directories
- private final Path logDir;
- // Path to the wal archive
- private final Path oldLogDir;
- private final WALFileLengthProvider walFileLengthProvider;
// The number of ms that we wait before moving znodes, HBASE-3596
private final long sleepBeforeFailover;
// Homemade executer service for replication
@@ -171,6 +166,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// Maximum number of retries before taking bold actions when deleting remote wal files for sync
// replication peer.
private final int maxRetriesMultiplier;
+ private final WALProvider walProvider;
/**
* Creates a replication manager and sets the watch on all the other registered region servers
@@ -182,13 +178,14 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param fs the file system to use
* @param logDir the directory that contains all wal directories of live RSs
* @param oldLogDir the directory where old logs are archived
- * @param clusterId
+ * @param clusterId id of the cluster
+ * @param walProvider Wal Provider
*/
public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
- Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
- WALFileLengthProvider walFileLengthProvider,
- SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) throws IOException {
+ Server server, UUID clusterId,
+ SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, WALProvider walProvider)
+ throws IOException {
this.sources = new ConcurrentHashMap<>();
this.queueStorage = queueStorage;
this.replicationPeers = replicationPeers;
@@ -198,13 +195,10 @@ public class ReplicationSourceManager implements ReplicationListener {
this.walsByIdRecoveredQueues = new ConcurrentHashMap<>();
this.oldsources = new ArrayList<>();
this.conf = conf;
- this.fs = fs;
- this.logDir = logDir;
- this.oldLogDir = oldLogDir;
+ this.walProvider = walProvider;
// 30 seconds
this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000);
this.clusterId = clusterId;
- this.walFileLengthProvider = walFileLengthProvider;
this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager;
this.replicationTracker.registerListener(this);
// It's preferable to failover 1 RS at a time, but with good zk servers
@@ -347,12 +341,12 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
throws IOException {
- ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
+ ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId, walProvider);
MetricsSource metrics = new MetricsSource(queueId);
// init replication source
- src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
- walFileLengthProvider, metrics);
+ src.init(conf, this, queueStorage, replicationPeer, server, queueId, clusterId,
+ metrics, walProvider);
return src;
}
@@ -486,7 +480,8 @@ public class ReplicationSourceManager implements ReplicationListener {
toRemove.terminate(terminateMessage);
}
for (NavigableSet walsByGroup : walsById.get(peerId).values()) {
- walsByGroup.forEach(wal -> src.enqueueLog(new FSWALIdentity(new Path(this.logDir, wal))));
+ walsByGroup.forEach(
+ wal -> src.enqueueLog(walProvider.createWalIdentity(server.getServerName(), wal, false)));
}
}
LOG.info("Startup replication source for " + src.getPeerId());
@@ -507,7 +502,8 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationSourceInterface replicationSource = createSource(queueId, peer);
this.oldsources.add(replicationSource);
for (SortedSet walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
- walsByGroup.forEach(wal -> src.enqueueLog(new FSWALIdentity(wal)));
+ walsByGroup.forEach(wal -> src
+ .enqueueLog(walProvider.createWalIdentity(server.getServerName(), wal, false)));
}
toStartup.add(replicationSource);
}
@@ -674,6 +670,8 @@ public class ReplicationSourceManager implements ReplicationListener {
private void removeRemoteWALs(String peerId, String remoteWALDir, Collection wals)
throws IOException {
Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
+ //Currently Sync replication is only supported on FS based WALProvider
+ //TODO: Abstract FileSystem once all APIs for Sync replication is conciled for remote calls.
FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
for (String wal : wals) {
Path walFile = new Path(remoteWALDirForPeer, wal);
@@ -963,7 +961,7 @@ public class ReplicationSourceManager implements ReplicationListener {
}
oldsources.add(src);
for (String wal : walsSet) {
- src.enqueueLog(new FSWALIdentity(new Path(oldLogDir, wal)));
+ src.enqueueLog(walProvider.createWalIdentity(server.getServerName(), wal, true));
}
src.startup();
}
@@ -1058,30 +1056,6 @@ public class ReplicationSourceManager implements ReplicationListener {
return totalBufferUsed;
}
- /**
- * Get the directory where wals are archived
- * @return the directory where wals are archived
- */
- public Path getOldLogDir() {
- return this.oldLogDir;
- }
-
- /**
- * Get the directory where wals are stored by their RSs
- * @return the directory where wals are stored by their RSs
- */
- public Path getLogDir() {
- return this.logDir;
- }
-
- /**
- * Get the handle on the local file system
- * @return Handle on the local file system
- */
- public FileSystem getFs() {
- return this.fs;
- }
-
/**
* Get the ReplicationPeers used by this ReplicationSourceManager
* @return the ReplicationPeers used by this ReplicationSourceManager
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 8ecd5bdc8e..5a1edd0bbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -290,7 +290,7 @@ public class ReplicationSourceShipper extends Thread {
public void startup(UncaughtExceptionHandler handler) {
String name = Thread.currentThread().getName();
Threads.setDaemonThreadRunning(this,
- name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler);
+ name + ".replicationSource.shipper " + walGroupId + "," + source.getQueueId(), handler);
}
WALIdentity getCurrentWALIdentity() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index a0b2ecd10f..babeab7219 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import java.io.EOFException;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
@@ -27,14 +26,12 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.wal.FSWALIdentity;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALIdentity;
@@ -57,7 +54,6 @@ class ReplicationSourceWALReader extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
private final PriorityBlockingQueue logQueue;
- private final FileSystem fs;
private final Configuration conf;
private final WALEntryFilter filter;
private final ReplicationSource source;
@@ -71,7 +67,6 @@ class ReplicationSourceWALReader extends Thread {
private long currentPosition;
private final long sleepForRetries;
private final int maxRetriesMultiplier;
- private final boolean eofAutoRecovery;
//Indicates whether this particular worker is running
private boolean isReaderRunning = true;
@@ -82,19 +77,16 @@ class ReplicationSourceWALReader extends Thread {
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
- * @param fs the files system to use
* @param conf configuration to use
* @param logQueue The WAL queue to read off of
* @param startPosition position in the first WAL to start reading from
* @param filter The filter to use while reading
* @param source replication source
*/
- public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
- PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter,
- ReplicationSource source) {
+ public ReplicationSourceWALReader(Configuration conf, PriorityBlockingQueue logQueue,
+ long startPosition, WALEntryFilter filter, ReplicationSource source) {
this.logQueue = logQueue;
this.currentPosition = startPosition;
- this.fs = fs;
this.conf = conf;
this.filter = filter;
this.source = source;
@@ -111,7 +103,6 @@ class ReplicationSourceWALReader extends Thread {
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
- this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
LOG.info("peerClusterZnode=" + source.getQueueId()
+ ", ReplicationSourceWALReaderThread : " + source.getPeerId()
@@ -124,10 +115,9 @@ class ReplicationSourceWALReader extends Thread {
public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
- try (WALEntryStream entryStream =
- new WALEntryStream(logQueue, fs, conf, currentPosition,
- source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
- source.getSourceMetrics())) {
+ try (WALEntryStream entryStream = this.source.getWalProvider().getWalStream(logQueue, conf,
+ currentPosition, source.getServerWALsBelongTo(),
+ source.getSourceMetrics())) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
@@ -153,9 +143,6 @@ class ReplicationSourceWALReader extends Thread {
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: " + e);
sleepMultiplier++;
- } else {
- LOG.error("Failed to read stream of replication entries", e);
- handleEofException(e);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
@@ -242,24 +229,6 @@ class ReplicationSourceWALReader extends Thread {
}
}
- // if we get an EOF due to a zero-length log, and there are other logs in queue
- // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
- // enabled, then dump the log
- private void handleEofException(IOException e) {
- if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
- logQueue.size() > 1 && this.eofAutoRecovery) {
- try {
- if (fs.getFileStatus(((FSWALIdentity)logQueue.peek()).getPath()).getLen() == 0) {
- LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
- logQueue.remove();
- currentPosition = 0;
- }
- } catch (IOException ioe) {
- LOG.warn("Couldn't get file length information about log " + logQueue.peek());
- }
- }
- }
-
public WALIdentity getCurrentWalId() {
// if we've read some WAL entries, get the walId we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index c7bccb3d67..2448403143 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -21,17 +21,15 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -75,14 +73,12 @@ public class ReplicationSyncUp extends Configured implements Tool {
Configuration conf = getConf();
try (ZKWatcher zkw =
new ZKWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable, true)) {
- Path walRootDir = FSUtils.getWALRootDir(conf);
- FileSystem fs = FSUtils.getWALFileSystem(conf);
- Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
-
System.out.println("Start Replication Server start");
+ DummyServer dummyServer = new DummyServer(zkw);
+ WALFactory factory =
+ new WALFactory(conf, dummyServer.getServerName().toString());
Replication replication = new Replication();
- replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null);
+ replication.initialize(dummyServer, factory.getWALProvider());
ReplicationSourceManager manager = replication.getReplicationManager();
manager.init().get();
while (manager.activeFailoverTaskCount() > 0) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 5f33e73d95..d2464994cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
@@ -43,10 +42,10 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
private final SerialReplicationChecker checker;
- public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
+ public SerialReplicationSourceWALReader(Configuration conf,
PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter,
ReplicationSource source) {
- super(fs, conf, logQueue, startPosition, filter, source);
+ super(conf, logQueue, startPosition, filter, source);
checker = new SerialReplicationChecker(conf, source);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 3d90153080..4d89190b74 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -18,31 +18,12 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import java.io.Closeable;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.OptionalLong;
-import java.util.concurrent.PriorityBlockingQueue;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
-import org.apache.hadoop.hbase.wal.FSWALIdentity;
+import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALIdentity;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Streaming access to WAL entries. This class is given a queue of WAL {@link WALIdentity}, and
@@ -51,385 +32,19 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-class WALEntryStream implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
-
- private Reader reader;
- private WALIdentity currentWAlIdentity;
- // cache of next entry for hasNext()
- private Entry currentEntry;
- // position for the current entry. As now we support peek, which means that the upper layer may
- // choose to return before reading the current entry, so it is not safe to return the value below
- // in getPosition.
- private long currentPositionOfEntry = 0;
- // position after reading current entry
- private long currentPositionOfReader = 0;
- private final PriorityBlockingQueue logQueue;
- private final FileSystem fs;
- private final Configuration conf;
- private final WALFileLengthProvider walFileLengthProvider;
- // which region server the WALs belong to
- private final ServerName serverName;
- private final MetricsSource metrics;
-
+public interface WALEntryStream extends WAL.Reader {
/**
- * Create an entry stream over the given queue at the given start position
- * @param logQueue the queue of WAL walIds
- * @param fs {@link FileSystem} to use to create {@link Reader} for this stream
- * @param conf {@link Configuration} to use to create {@link Reader} for this stream
- * @param startPosition the position in the first WAL to start reading at
- * @param serverName the server name which all WALs belong to
- * @param metrics replication metrics
- * @throws IOException
+ * @return the {@link WALIdentity} of the current WAL
*/
- public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs,
- Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider,
- ServerName serverName, MetricsSource metrics) throws IOException {
- this.logQueue = logQueue;
- this.fs = fs;
- this.conf = conf;
- this.currentPositionOfEntry = startPosition;
- this.walFileLengthProvider = walFileLengthProvider;
- this.serverName = serverName;
- this.metrics = metrics;
- }
+ public WALIdentity getCurrentWalIdentity();
/**
* @return true if there is another WAL {@link Entry}
*/
- public boolean hasNext() throws IOException {
- if (currentEntry == null) {
- tryAdvanceEntry();
- }
- return currentEntry != null;
- }
+ public boolean hasNext() throws IOException;
/**
* Returns the next WAL entry in this stream but does not advance.
*/
- public Entry peek() throws IOException {
- return hasNext() ? currentEntry: null;
- }
-
- /**
- * Returns the next WAL entry in this stream and advance the stream.
- */
- public Entry next() throws IOException {
- Entry save = peek();
- currentPositionOfEntry = currentPositionOfReader;
- currentEntry = null;
- return save;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void close() throws IOException {
- closeReader();
- }
-
- /**
- * @return the position of the last Entry returned by next()
- */
- public long getPosition() {
- return currentPositionOfEntry;
- }
-
- /**
- * @return the {@link WALIdentity} of the current WAL
- */
- public WALIdentity getCurrentWalIdentity() {
- return currentWAlIdentity;
- }
-
- private String getCurrentWalIdStat() {
- StringBuilder sb = new StringBuilder();
- if (currentWAlIdentity != null) {
- sb.append("currently replicating from: ").append(currentWAlIdentity).append(" at position: ")
- .append(currentPositionOfEntry).append("\n");
- } else {
- sb.append("no replication ongoing, waiting for new log");
- }
- return sb.toString();
- }
-
- /**
- * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned
- * false)
- */
- public void reset() throws IOException {
- if (reader != null && currentWAlIdentity != null) {
- resetReader();
- }
- }
-
- private void setPosition(long position) {
- currentPositionOfEntry = position;
- }
-
- private void setCurrentWalId(WALIdentity walId) {
- this.currentWAlIdentity = walId;
- }
-
- private void tryAdvanceEntry() throws IOException {
- if (checkReader()) {
- boolean beingWritten = readNextEntryAndRecordReaderPosition();
- if (currentEntry == null && !beingWritten) {
- // no more entries in this log file, and the file is already closed, i.e, rolled
- // Before dequeueing, we should always get one more attempt at reading.
- // This is in case more entries came in after we opened the reader, and the log is rolled
- // while we were reading. See HBASE-6758
- resetReader();
- readNextEntryAndRecordReaderPosition();
- if (currentEntry == null) {
- if (checkAllBytesParsed()) { // now we're certain we're done with this log file
- dequeueCurrentLog();
- if (openNextLog()) {
- readNextEntryAndRecordReaderPosition();
- }
- }
- }
- }
- // if currentEntry != null then just return
- // if currentEntry == null but the file is still being written, then we should not switch to
- // the next log either, just return here and try next time to see if there are more entries in
- // the current file
- }
- // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue)
- }
-
- // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file
- private boolean checkAllBytesParsed() throws IOException {
- // -1 means the wal wasn't closed cleanly.
- final long trailerSize = currentTrailerSize();
- FileStatus stat = null;
- try {
- stat = fs.getFileStatus(((FSWALIdentity)this.currentWAlIdentity).getPath());
- } catch (IOException exception) {
- LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}",
- currentWAlIdentity, trailerSize < 0 ? "was not" : "was", getCurrentWalIdStat());
- metrics.incrUnknownFileLengthForClosedWAL();
- }
- // Here we use currentPositionOfReader instead of currentPositionOfEntry.
- // We only call this method when currentEntry is null so usually they are the same, but there
- // are two exceptions. One is we have nothing in the file but only a header, in this way
- // the currentPositionOfEntry will always be 0 since we have no change to update it. The other
- // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the
- // last valid entry, and the currentPositionOfReader will usually point to the end of the file.
- if (stat != null) {
- if (trailerSize < 0) {
- if (currentPositionOfReader < stat.getLen()) {
- final long skippedBytes = stat.getLen() - currentPositionOfReader;
- LOG.debug(
- "Reached the end of WAL file '{}'. It was not closed cleanly," +
- " so we did not parse {} bytes of data. This is normally ok.",
- currentWAlIdentity, skippedBytes);
- metrics.incrUncleanlyClosedWALs();
- metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
- }
- } else if (currentPositionOfReader + trailerSize < stat.getLen()) {
- LOG.warn(
- "Processing end of WAL file '{}'. At position {}, which is too far away from" +
- " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}",
- currentWAlIdentity, currentPositionOfReader, stat.getLen(), getCurrentWalIdStat());
- setPosition(0);
- resetReader();
- metrics.incrRestartedWALReading();
- metrics.incrRepeatedFileBytes(currentPositionOfReader);
- return false;
- }
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Reached the end of log " + this.currentWAlIdentity
- + ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
- }
- metrics.incrCompletedWAL();
- return true;
- }
-
- private void dequeueCurrentLog() throws IOException {
- LOG.debug("Reached the end of log {}", currentWAlIdentity);
- closeReader();
- logQueue.remove();
- setPosition(0);
- metrics.decrSizeOfLogQueue();
- }
-
- /**
- * Returns whether the file is opened for writing.
- */
- private boolean readNextEntryAndRecordReaderPosition() throws IOException {
- Entry readEntry = reader.next();
- long readerPos = reader.getPosition();
- OptionalLong fileLength =
- walFileLengthProvider.getLogFileSizeIfBeingWritten(currentWAlIdentity);
- if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
- // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted
- // data, so we need to make sure that we do not read beyond the committed file length.
- if (LOG.isDebugEnabled()) {
- LOG.debug("The provider tells us the valid length for " + currentWAlIdentity + " is " +
- fileLength.getAsLong() + ", but we have advanced to " + readerPos);
- }
- resetReader();
- return true;
- }
- if (readEntry != null) {
- metrics.incrLogEditsRead();
- metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry);
- }
- currentEntry = readEntry; // could be null
- this.currentPositionOfReader = readerPos;
- return fileLength.isPresent();
- }
-
- private void closeReader() throws IOException {
- if (reader != null) {
- reader.close();
- reader = null;
- }
- }
-
- // if we don't have a reader, open a reader on the next log
- private boolean checkReader() throws IOException {
- if (reader == null) {
- return openNextLog();
- }
- return true;
- }
-
- // open a reader on the next log in queue
- private boolean openNextLog() throws IOException {
- WALIdentity nextWalId = logQueue.peek();
- if (nextWalId != null) {
- openReader((FSWALIdentity)nextWalId);
- if (reader != null) {
- return true;
- }
- } else {
- // no more files in queue, this could happen for recovered queue, or for a wal group of a sync
- // replication peer which has already been transited to DA or S.
- setCurrentWalId(null);
- }
- return false;
- }
-
- private Path getArchivedLog(Path path) throws IOException {
- Path rootDir = FSUtils.getRootDir(conf);
-
- // Try found the log in old dir
- Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- Path archivedLogLocation = new Path(oldLogDir, path.getName());
- if (fs.exists(archivedLogLocation)) {
- LOG.info("Log " + path + " was moved to " + archivedLogLocation);
- return archivedLogLocation;
- }
-
- // Try found the log in the seperate old log dir
- oldLogDir =
- new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
- .append(Path.SEPARATOR).append(serverName.getServerName()).toString());
- archivedLogLocation = new Path(oldLogDir, path.getName());
- if (fs.exists(archivedLogLocation)) {
- LOG.info("Log " + path + " was moved to " + archivedLogLocation);
- return archivedLogLocation;
- }
-
- LOG.error("Couldn't locate log: " + path);
- return path;
- }
-
- private void handleFileNotFound(FSWALIdentity walId, FileNotFoundException fnfe)
- throws IOException {
- // If the log was archived, continue reading from there
- FSWALIdentity archivedLog = new FSWALIdentity(getArchivedLog(walId.getPath()));
- if (!walId.equals(archivedLog)) {
- openReader(archivedLog);
- } else {
- throw fnfe;
- }
- }
-
- private void openReader(FSWALIdentity walId) throws IOException {
- try {
- // Detect if this is a new file, if so get a new reader else
- // reset the current reader so that we see the new data
- if (reader == null || !getCurrentWalIdentity().equals(walId)) {
- closeReader();
- reader = WALFactory.createReader(fs, walId.getPath(), conf);
- seek();
- setCurrentWalId(walId);
- } else {
- resetReader();
- }
- } catch (FileNotFoundException fnfe) {
- handleFileNotFound(walId, fnfe);
- } catch (RemoteException re) {
- IOException ioe = re.unwrapRemoteException(FileNotFoundException.class);
- if (!(ioe instanceof FileNotFoundException)) throw ioe;
- handleFileNotFound(walId, (FileNotFoundException)ioe);
- } catch (LeaseNotRecoveredException lnre) {
- // HBASE-15019 the WAL was not closed due to some hiccup.
- LOG.warn("Try to recover the WAL lease " + currentWAlIdentity, lnre);
- recoverLease(conf, ((FSWALIdentity)currentWAlIdentity).getPath());
- reader = null;
- } catch (NullPointerException npe) {
- // Workaround for race condition in HDFS-4380
- // which throws a NPE if we open a file before any data node has the most recent block
- // Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
- LOG.warn("Got NPE opening reader, will retry.");
- reader = null;
- }
- }
-
- // For HBASE-15019
- private void recoverLease(final Configuration conf, final Path path) {
- try {
- final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
- FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
- fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
- @Override
- public boolean progress() {
- LOG.debug("recover WAL lease: " + path);
- return true;
- }
- });
- } catch (IOException e) {
- LOG.warn("unable to recover lease for WAL: " + path, e);
- }
- }
-
- private void resetReader() throws IOException {
- try {
- currentEntry = null;
- reader.reset();
- seek();
- } catch (FileNotFoundException fnfe) {
- // If the log was archived, continue reading from there
- FSWALIdentity archivedLog =
- new FSWALIdentity(getArchivedLog(((FSWALIdentity) currentWAlIdentity).getPath()));
- if (!currentWAlIdentity.equals(archivedLog)) {
- openReader(archivedLog);
- } else {
- throw fnfe;
- }
- } catch (NullPointerException npe) {
- throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
- }
- }
-
- private void seek() throws IOException {
- if (currentPositionOfEntry != 0) {
- reader.seek(currentPositionOfEntry);
- }
- }
-
- private long currentTrailerSize() {
- long size = -1L;
- if (reader instanceof ProtobufLogReader) {
- final ProtobufLogReader pblr = (ProtobufLogReader) reader;
- size = pblr.trailerSize();
- }
- return size;
- }
+ public Entry peek() throws IOException;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
index d0b63cc244..08bff9b56f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
@@ -18,8 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.OptionalLong;
-
-import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -30,5 +29,5 @@ import org.apache.yetus.audience.InterfaceAudience;
@FunctionalInterface
public interface WALFileLengthProvider {
- OptionalLong getLogFileSizeIfBeingWritten(WALIdentity walId);
+ OptionalLong getLogFileSizeIfBeingWritten(Path walId);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 85ed2ae630..eddffb8910 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -1798,4 +1798,5 @@ public abstract class FSUtils extends CommonFSUtils {
}
return traversedPaths;
}
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 1f24548cb9..c66cb56c0c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -23,12 +23,12 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -36,15 +36,23 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.FSRecoveredReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.LogsComparator;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -62,6 +70,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@InterfaceStability.Evolving
public abstract class AbstractFSWALProvider> implements WALProvider {
+ // Path to the wals directories
+ // Path to the wal archive
private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWALProvider.class);
/** Separate old log into different dir by regionserver name **/
@@ -94,6 +104,10 @@ public abstract class AbstractFSWALProvider> implemen
*/
private final ReadWriteLock walCreateLock = new ReentrantReadWriteLock();
+ private Path rootDir;
+
+ private Path oldLogDir;
+
/**
* @param factory factory that made us, identity used for FS layout. may not be null
* @param conf may not be null
@@ -118,6 +132,9 @@ public abstract class AbstractFSWALProvider> implemen
}
}
logPrefix = sb.toString();
+ rootDir = FSUtils.getRootDir(conf);
+ oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
+
doInit(conf);
}
@@ -554,4 +571,30 @@ public abstract class AbstractFSWALProvider> implemen
public static long getWALStartTimeFromWALName(String name) {
return Long.parseLong(getWALNameGroupFromWALName(name, 2));
}
+
+ @Override
+ public WALEntryStream getWalStream(PriorityBlockingQueue logQueue,
+ Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics)
+ throws IOException {
+ return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition,
+ serverName, metrics, this);
+ }
+
+ @Override
+ public RecoveredReplicationSource getRecoveredReplicationSource() {
+ return new FSRecoveredReplicationSource();
+ }
+
+ @Override
+ public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) {
+ Path walPath;
+ if (isArchive) {
+ walPath = new Path(oldLogDir, walName);
+ } else {
+ Path logDir =
+ new Path(rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
+ walPath = new Path(logDir, walName);
+ }
+ return new FSWALIdentity(walPath);
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
index 8dee0121cd..85ba41ba29 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java
@@ -24,18 +24,24 @@ import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.replication.regionserver.AbstractWALEntryStream;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,29 +69,8 @@ class DisabledWALProvider implements WALProvider {
if (null == providerId) {
providerId = "defaultDisabled";
}
- final Path path = new Path(FSUtils.getWALRootDir(conf), providerId);
- disabled = new DisabledWAL(new WALIdentity() {
-
- @Override
- public int compareTo(WALIdentity o) {
- return 0;
- }
-
- @Override
- public String getName() {
- return path.getName();
- }
-
- @Override
- public boolean equals(Object obj) {
- return true;
- }
-
- @Override
- public int hashCode() {
- return 0;
- }
- }, conf, null);
+ disabled = new DisabledWAL(createWalIdentity(null,
+ new Path(FSUtils.getWALRootDir(conf), providerId).toString(), false), conf, null);
}
@Override
@@ -265,7 +250,7 @@ class DisabledWALProvider implements WALProvider {
}
@Override
- public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity path) {
+ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
return OptionalLong.empty();
}
}
@@ -284,4 +269,65 @@ class DisabledWALProvider implements WALProvider {
public void addWALActionsListener(WALActionsListener listener) {
disabled.registerWALActionsListener(listener);
}
+
+ @Override
+ public WALEntryStream getWalStream(PriorityBlockingQueue logQueue,
+ Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics)
+ throws IOException {
+ return new AbstractWALEntryStream(logQueue, conf, startPosition, serverName, metrics, this) {
+
+ @Override
+ protected boolean readNextEntryAndRecordReaderPosition() throws IOException {
+ return false;
+ }
+
+ @Override
+ protected void handleIOException(WALIdentity walId, IOException e) throws IOException {
+
+ }
+
+ @Override
+ protected Reader createReader(WALIdentity walId, Configuration conf) throws IOException {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public RecoveredReplicationSource getRecoveredReplicationSource() {
+ return new RecoveredReplicationSource() {
+
+ @Override
+ public void locateRecoveredWalIds(PriorityBlockingQueue queue)
+ throws IOException {
+ }
+ };
+ }
+
+ @Override
+ public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) {
+ return new WALIdentity() {
+
+ @Override
+ public int compareTo(WALIdentity o) {
+ return 0;
+ }
+
+ @Override
+ public String getName() {
+ return walName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof WALIdentity;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+ };
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index 0b7b8dad6b..0bbe01b2d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -26,15 +26,25 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
// imports for classes still in regionserver.wal
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.FSRecoveredReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -88,6 +98,7 @@ public class RegionGroupingProvider implements WALProvider {
}
}
+
/**
* instantiate a strategy from a config property.
* requires conf to have already been set (as well as anything the provider might need to read).
@@ -137,6 +148,8 @@ public class RegionGroupingProvider implements WALProvider {
private List listeners = new ArrayList<>();
private String providerId;
private Class extends WALProvider> providerClass;
+ private Path rootDir;
+ private Path oldLogDir;
@Override
public void init(WALFactory factory, Configuration conf, String providerId) throws IOException {
@@ -156,6 +169,8 @@ public class RegionGroupingProvider implements WALProvider {
this.providerId = sb.toString();
this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER);
+ rootDir = FSUtils.getRootDir(conf);
+ oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
}
private WALProvider createProvider(String group) throws IOException {
@@ -285,4 +300,31 @@ public class RegionGroupingProvider implements WALProvider {
// extra code actually works, then we will have other big problems. So leave it as is.
listeners.add(listener);
}
+
+ @Override
+ public WALEntryStream getWalStream(PriorityBlockingQueue logQueue,
+ Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics)
+ throws IOException {
+ return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition,
+ serverName, metrics, this);
+ }
+
+ @Override
+ public RecoveredReplicationSource getRecoveredReplicationSource() {
+ return new FSRecoveredReplicationSource();
+ }
+
+ @Override
+ public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) {
+ Path walPath;
+ if (isArchive) {
+ walPath = new Path(oldLogDir, walName);
+ } else {
+ Path logDir =
+ new Path(rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
+ walPath = new Path(logDir, walName);
+ }
+ return new FSWALIdentity(walPath);
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 9859c20464..02eb776815 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.function.BiPredicate;
@@ -36,16 +37,25 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.replication.regionserver.FSRecoveredReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@@ -99,6 +109,10 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private final KeyLocker createLock = new KeyLocker<>();
+ private Path rootDir;
+
+ private Path oldLogDir;
+
SyncReplicationWALProvider(WALProvider provider) {
this.provider = provider;
}
@@ -119,6 +133,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf);
eventLoopGroup = eventLoopGroupAndChannelClass.getFirst();
channelClass = eventLoopGroupAndChannelClass.getSecond();
+ rootDir = FSUtils.getRootDir(conf);
+ oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
}
// Use a timestamp to make it identical. That means, after we transit the peer to DA/S and then
@@ -349,4 +365,30 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
return provider;
}
+ @Override
+ public WALEntryStream getWalStream(PriorityBlockingQueue logQueue,
+ Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics)
+ throws IOException {
+ return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition,
+ serverName, metrics, this);
+ }
+
+ @Override
+ public RecoveredReplicationSource getRecoveredReplicationSource() {
+ return new FSRecoveredReplicationSource();
+ }
+
+ @Override
+ public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) {
+ Path walPath;
+ if (isArchive) {
+ walPath = new Path(oldLogDir, walName);
+ } else {
+ Path logDir =
+ new Path(rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
+ walPath = new Path(logDir, walName);
+ }
+ return new FSWALIdentity(walPath);
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index 244a636226..3b8033b290 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -21,12 +21,15 @@ package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
-import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -109,8 +112,32 @@ public interface WALProvider {
*/
void addWALActionsListener(WALActionsListener listener);
- default WALFileLengthProvider getWALFileLengthProvider() {
- return path -> getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path))
- .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
- }
+ /**
+ * Streaming implementation to retrieve WAL entries from given set of Wals. This class is given a
+ * queue of WAL
+ * @param logQueue Queue of wals
+ * @param conf configuration
+ * @param startPosition start position for the first wal in the queue
+ * @param serverName name of the server
+ * @param metrics metric source
+ * @return
+ * @throws IOException IOException
+ */
+ WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf,
+ long startPosition, ServerName serverName, MetricsSource metrics) throws IOException;
+
+ /**
+ * Replication source to replicate edits of a dead regionserver
+ * @return
+ */
+ RecoveredReplicationSource getRecoveredReplicationSource();
+
+ /**
+ * @param serverName regionserver name
+ * @param walName Name of the wal
+ * @param isArchive where it is archived
+ * @return
+ */
+ WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive);
+
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index ed71e6e483..b169732b40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -21,9 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@@ -31,10 +29,10 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
-import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALIdentity;
+import org.apache.hadoop.hbase.wal.WALProvider;
/**
* Source that does nothing at all, helpful to test ReplicationSourceManager
@@ -46,18 +44,15 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
private String peerClusterId;
private WALIdentity currentWalId;
private MetricsSource metrics;
- private WALFileLengthProvider walFileLengthProvider;
private AtomicBoolean startup = new AtomicBoolean(false);
@Override
- public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
- ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
- UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
- throws IOException {
+ public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage rq,
+ ReplicationPeer rp, Server server, String peerClusterId, UUID clusterId,
+ MetricsSource metrics, WALProvider walProvider) throws IOException {
this.manager = manager;
this.peerClusterId = peerClusterId;
this.metrics = metrics;
- this.walFileLengthProvider = walFileLengthProvider;
this.replicationPeer = rp;
}
@@ -147,11 +142,6 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public void postShipEdits(List entries, int batchSize) {
}
- @Override
- public WALFileLengthProvider getWALFileLengthProvider() {
- return walFileLengthProvider;
- }
-
@Override
public ServerName getServerWALsBelongTo() {
return null;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 23202058c1..6322f7903b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.IOException;
-import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -48,12 +47,6 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
-import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -176,8 +169,7 @@ public class TestReplicationSource {
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
- source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
- p -> OptionalLong.empty(), null);
+ source.init(testConf, manager, null, mockPeer, null, "testPeer", null, null, null);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future> future = executor.submit(new Runnable() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index def737eb06..f816ea694e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -77,6 +77,7 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.DummyServer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@@ -196,7 +198,10 @@ public abstract class TestReplicationSourceManager {
logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME);
remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME);
replication = new Replication();
- replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
+ DummyServer dummyServer = new DummyServer();
+ WALFactory factory =
+ new WALFactory(conf, dummyServer.getServerName().toString());
+ replication.initialize(dummyServer, factory.getWALProvider());
managerOfCluster = getManagerFromCluster();
if (managerOfCluster != null) {
// After replication procedure, we need to add peer by hand (other than by receiving
@@ -822,10 +827,9 @@ public abstract class TestReplicationSourceManager {
static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy {
@Override
- public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+ public void init(Configuration conf, ReplicationSourceManager manager,
ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId,
- UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
- throws IOException {
+ UUID clusterId, MetricsSource metrics, WALProvider provider) throws IOException {
throw new IOException("Failing deliberately");
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index d22f96ab7b..a40cb6eca7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -28,7 +28,6 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.NavigableMap;
-import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
@@ -62,6 +61,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALIdentity;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
@@ -106,6 +106,8 @@ public class TestWALEntryStream {
public TestName tn = new TestName();
private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+ private WALProvider walProvider;
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
@@ -125,9 +127,9 @@ public class TestWALEntryStream {
public void setUp() throws Exception {
walQueue = new PriorityBlockingQueue<>();
pathWatcher = new PathWatcher();
- final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
- wals.getWALProvider().addWALActionsListener(pathWatcher);
- log = wals.getWAL(info);
+ WALFactory wals = new WALFactory(CONF, tn.getMethodName());
+ walProvider = wals.getWALProvider();
+ walProvider.addWALActionsListener(pathWatcher);
}
@After
@@ -156,8 +158,8 @@ public class TestWALEntryStream {
log.rollWriter();
- try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 0, null,
+ new MetricsSource("1"), this.walProvider)) {
int i = 0;
while (entryStream.hasNext()) {
assertNotNull(entryStream.next());
@@ -184,7 +186,7 @@ public class TestWALEntryStream {
appendToLogAndSync();
long oldPos;
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, null, new MetricsSource("1"), walProvider)) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.peek();
@@ -198,8 +200,8 @@ public class TestWALEntryStream {
appendToLogAndSync();
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos,
- log, null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, oldPos, null,
+ new MetricsSource("1"), walProvider)) {
// Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
@@ -212,8 +214,8 @@ public class TestWALEntryStream {
log.rollWriter();
appendToLogAndSync();
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos,
- log, null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, oldPos, null,
+ new MetricsSource("1"), walProvider)) {
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
@@ -238,7 +240,7 @@ public class TestWALEntryStream {
appendToLog("1");
appendToLog("2");// 2
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, null, new MetricsSource("1"), walProvider)) {
assertEquals("1", getRow(entryStream.next()));
appendToLog("3"); // 3 - comes in after reader opened
@@ -263,7 +265,7 @@ public class TestWALEntryStream {
public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1");
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, null, new MetricsSource("1"), walProvider)) {
entryStream.next(); // we've hit the end of the stream at this point
// some new entries come in while we're streaming
@@ -286,15 +288,15 @@ public class TestWALEntryStream {
long lastPosition = 0;
appendToLog("1");
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, null, new MetricsSource("1"), walProvider)) {
entryStream.next(); // we've hit the end of the stream at this point
appendToLog("2");
appendToLog("3");
lastPosition = entryStream.getPosition();
}
// next stream should picks up where we left off
- try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, lastPosition, null,
+ new MetricsSource("1"), walProvider)) {
assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext()); // done
@@ -311,14 +313,14 @@ public class TestWALEntryStream {
long lastPosition = 0;
appendEntriesToLogAndSync(3);
// read only one element
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, lastPosition,
- log, null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, lastPosition, null,
+ new MetricsSource("1"), walProvider)) {
entryStream.next();
lastPosition = entryStream.getPosition();
}
// there should still be two more entries from where we left off
- try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, lastPosition, null,
+ new MetricsSource("1"), walProvider)) {
assertNotNull(entryStream.next());
assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext());
@@ -329,7 +331,7 @@ public class TestWALEntryStream {
@Test
public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, null, new MetricsSource("1"), walProvider)) {
assertFalse(entryStream.hasNext());
}
}
@@ -341,9 +343,9 @@ public class TestWALEntryStream {
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
- when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
+ when(source.getWalProvider()).thenReturn(this.walProvider);
return source;
}
@@ -351,7 +353,7 @@ public class TestWALEntryStream {
ReplicationSource source = mockReplicationSource(recovered, conf);
when(source.isPeerEnabled()).thenReturn(true);
ReplicationSourceWALReader reader =
- new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source);
+ new ReplicationSourceWALReader(conf, walQueue, 0, getDummyFilter(), source);
reader.start();
return reader;
}
@@ -362,7 +364,7 @@ public class TestWALEntryStream {
// get ending position
long position;
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ new FSWALEntryStream(fs, walQueue, CONF, 0, null, new MetricsSource("1"), walProvider)) {
entryStream.next();
entryStream.next();
entryStream.next();
@@ -476,8 +478,8 @@ public class TestWALEntryStream {
appendEntriesToLogAndSync(3);
// get ending position
long position;
- try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 0, null,
+ new MetricsSource("1"), this.walProvider)) {
entryStream.next();
entryStream.next();
entryStream.next();
@@ -495,7 +497,7 @@ public class TestWALEntryStream {
});
ReplicationSourceWALReader reader =
- new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source);
+ new ReplicationSourceWALReader(CONF, walQueue, 0, getDummyFilter(), source);
reader.start();
Future future = ForkJoinPool.commonPool().submit(() -> {
return reader.take();
@@ -591,10 +593,11 @@ public class TestWALEntryStream {
public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
appendToLog("1");
appendToLog("2");
- long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
+ long size =
+ log.getLogFileSizeIfBeingWritten(((FSWALIdentity) walQueue.peek()).getPath()).getAsLong();
AtomicLong fileLength = new AtomicLong(size - 1);
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 0,
- p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 0,
+ null, new MetricsSource("1"),walProvider)) {
assertTrue(entryStream.hasNext());
assertNotNull(entryStream.next());
// can not get log 2
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index d062c77cb3..bb8c2915ed 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -27,18 +27,25 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
// imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.regionserver.FSRecoveredReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream;
+import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
+import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
+import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream;
import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -92,6 +99,11 @@ public class IOTestProvider implements WALProvider {
protected AtomicBoolean initialized = new AtomicBoolean(false);
private List listeners = new ArrayList<>();
+
+ private Path oldLogDir;
+
+ private Path rootDir;
+
/**
* @param factory factory that made us, identity used for FS layout. may not be null
* @param conf may not be null
@@ -106,6 +118,8 @@ public class IOTestProvider implements WALProvider {
this.factory = factory;
this.conf = conf;
this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID;
+ rootDir = FSUtils.getRootDir(conf);
+ oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
}
@Override
@@ -288,4 +302,31 @@ public class IOTestProvider implements WALProvider {
// TODO Implement WALProvider.addWALActionLister
}
+
+ @Override
+ public WALEntryStream getWalStream(PriorityBlockingQueue logQueue,
+ Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics)
+ throws IOException {
+ return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition,
+ serverName, metrics, this);
+ }
+
+ @Override
+ public RecoveredReplicationSource getRecoveredReplicationSource() {
+ return new FSRecoveredReplicationSource();
+ }
+
+ @Override
+ public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) {
+ Path walPath;
+ if (isArchive) {
+ walPath = new Path(oldLogDir, walName);
+ } else {
+ Path logDir =
+ new Path(rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
+ walPath = new Path(logDir, walName);
+ }
+ return new FSWALIdentity(walPath);
+ }
+
}