diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index 899c633..1e1a2fa 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -1050,7 +1050,7 @@ public abstract class CommonFSUtils { * Helper exception for those cases where the place where we need to check a stream capability * is not where we have the needed context to explain the impact and mitigation for a lack. */ - public static class StreamLacksCapabilityException extends Exception { + public static class StreamLacksCapabilityException extends IOException { public StreamLacksCapabilityException(String message, Throwable cause) { super(message, cause); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c10a824..4312f15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -174,6 +174,7 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; import org.apache.hadoop.io.MultipleIOException; @@ -327,6 +328,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final LongAdder flushesQueued = new LongAdder(); private final WAL wal; + private final WALProvider walProvider; private final HRegionFileSystem fs; protected final Configuration conf; private final Configuration baseConf; @@ -778,6 +780,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.rsServices = rsServices; setHTableSpecificConf(); this.scannerReadPoints = new ConcurrentHashMap<>(); + try { + WALFactory factory = new WALFactory(conf, getRegionInfo().getRegionNameAsString()); + this.walProvider = factory.getWALProvider(); + } catch (IOException ioe) { + throw new IllegalArgumentException("cannot construct WALFactory for " + + getRegionInfo().getRegionNameAsString(), ioe); + } this.busyWaitDuration = conf.getLong( "hbase.busy.wait.duration", DEFAULT_BUSY_WAIT_DURATION); @@ -4699,7 +4708,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Opening recovered edits"); WAL.Reader reader = null; try { - reader = WALFactory.createReader(fs, edits, conf); + reader = walProvider.createReader(walProvider.createWALIdentity(edits.toString()), null, + true); long currentEditSeqId = -1; long currentReplaySeqId = -1; long firstSeqIdInLog = -1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 4a9712c..7761ec0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -77,17 +77,19 @@ public class SplitLogWorker implements Runnable { // thread pool which executes recovery work private SplitLogWorkerCoordination coordination; private RegionServerServices server; + private WALFactory factory; public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, - TaskExecutor splitTaskExecutor) { + WALFactory factory, TaskExecutor splitTaskExecutor) { this.server = server; + this.factory = factory; this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination(); coordination.init(server, conf, splitTaskExecutor, this); } public SplitLogWorker(Configuration conf, RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) { - this(server, conf, server, (f, p) -> splitLog(f, p, conf, server, sequenceIdChecker, factory)); + this(server, conf, server, factory, (f, p) -> splitLog(f, p, conf, server, sequenceIdChecker, factory)); } // returns whether we need to continue the split work @@ -175,7 +177,8 @@ public class SplitLogWorker implements Runnable { // interrupted or has encountered a transient error and when it has // encountered a bad non-retry-able persistent error. try { - if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf, + if (!WALSplitter.splitLogFile(walDir, factory.getWALProvider().createWALIdentity( + new Path(walDir, name).toString()), fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) { return Status.PREEMPTED; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 7915ac3..58c2b07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALPrettyPrinter; import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; @@ -994,7 +995,7 @@ public abstract class AbstractFSWAL implements WAL { * https://issues.apache.org/jira/browse/HBASE-14004 for more details. */ @Override - public OptionalLong getLogFileSizeIfBeingWritten(Path path) { + public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity path) { rollWriterLock.lock(); try { Path currentPath = getOldPath(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index ae084a4..dcbca04 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -49,6 +49,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.EncryptionTest; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.FSWALIdentity; +import org.apache.hadoop.hbase.wal.WALIdentity; /** * Base class for Protobuf log writer. @@ -153,8 +155,10 @@ public abstract class AbstractProtobufLogWriter { return doCompress; } - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, - long blocksize) throws IOException, StreamLacksCapabilityException { + public void init(WALIdentity walId, Configuration conf, boolean overwritable, + long blocksize) throws IOException { + Path path = ((FSWALIdentity)walId).getPath(); + FileSystem fs = path.getFileSystem(conf); this.conf = conf; boolean doCompress = initializeCompressionContext(conf, path); this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java index 13f5d6e..32c23f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java @@ -69,10 +69,12 @@ public class Compressor { throws IOException { Configuration conf = HBaseConfiguration.create(); - FileSystem inFS = input.getFileSystem(conf); FileSystem outFS = output.getFileSystem(conf); + WALFactory factory = new WALFactory(conf, "compressor"); + WALProvider provider = factory.getWALProvider(); - WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf); + WAL.Reader in = provider.createReader(provider.createWALIdentity(input.toString()), null, + false); WALProvider.Writer out = null; try { @@ -82,7 +84,7 @@ public class Compressor { } boolean compress = ((ReaderBase)in).hasCompression(); conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress); - out = WALFactory.createWALWriter(outFS, output, conf); + out = WALFactory.createWALWriter(provider.createWALIdentity(output.toString()), conf); WAL.Entry e = null; while ((e = in.next()) != null) out.append(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index baa87a4..2224756 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -50,7 +50,9 @@ import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; @@ -271,7 +273,9 @@ public class FSHLog extends AbstractFSWAL { */ @Override protected Writer createWriterInstance(final Path path) throws IOException { - Writer writer = FSHLogProvider.createWriter(conf, fs, path, false, this.blocksize); + WALFactory factory = new WALFactory(conf, "FSHLog"); + WALProvider provider = factory.getWALProvider(); + Writer writer = provider.createWriter(conf, provider.createWALIdentity(path.toString()), false); if (writer instanceof ProtobufLogWriter) { preemptiveSync((ProtobufLogWriter) writer); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index f1bb538..b5eaaf7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -18,19 +18,17 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; -import java.util.List; import java.util.UUID; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,89 +38,36 @@ import org.slf4j.LoggerFactory; * another dead region server. This will be closed when all logs are pushed to peer cluster. */ @InterfaceAudience.Private -public class RecoveredReplicationSource extends ReplicationSource { +public abstract class RecoveredReplicationSource extends ReplicationSource { private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class); private String actualPeerId; @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { - super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode, - clusterId, walFileLengthProvider, metrics); + MetricsSource metrics, WALProvider walProvider) throws IOException { + super.init(conf, manager, queueStorage, replicationPeer, server, peerClusterZnode, + clusterId, walFileLengthProvider, metrics, walProvider); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } @Override protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId, - PriorityBlockingQueue queue) { + PriorityBlockingQueue queue) { return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); } - public void locateRecoveredPaths(PriorityBlockingQueue queue) throws IOException { - boolean hasPathChanged = false; - PriorityBlockingQueue newPaths = - new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator()); - pathsLoop: for (Path path : queue) { - if (fs.exists(path)) { // still in same location, don't need to do anything - newPaths.add(path); - continue; - } - // Path changed - try to find the right path. - hasPathChanged = true; - if (server instanceof ReplicationSyncUp.DummyServer) { - // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data - // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists - Path newPath = getReplSyncUpPath(path); - newPaths.add(newPath); - continue; - } else { - // See if Path exists in the dead RS folder (there could be a chain of failures - // to look at) - List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); - LOG.info("NB dead servers : " + deadRegionServers.size()); - final Path walDir = FSUtils.getWALRootDir(conf); - for (ServerName curDeadServerName : deadRegionServers) { - final Path deadRsDirectory = - new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName - .getServerName())); - Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path( - deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) }; - for (Path possibleLogLocation : locs) { - LOG.info("Possible location " + possibleLogLocation.toUri().toString()); - if (manager.getFs().exists(possibleLogLocation)) { - // We found the right new location - LOG.info("Log " + path + " still exists at " + possibleLogLocation); - newPaths.add(possibleLogLocation); - continue pathsLoop; - } - } - } - // didn't find a new location - LOG.error( - String.format("WAL Path %s doesn't exist and couldn't find its new location", path)); - newPaths.add(path); - } - } - - if (hasPathChanged) { - if (newPaths.size() != queue.size()) { // this shouldn't happen - LOG.error("Recovery queue size is incorrect"); - throw new IOException("Recovery queue size error"); - } - // put the correct locations in the queue - // since this is a recovered queue with no new incoming logs, - // there shouldn't be any concurrency issues - queue.clear(); - for (Path path : newPaths) { - queue.add(path); - } - } - } - + /** + * Get the updated queue of the wals if the wals are moved to another location. + * @param queue Updated queue with the new WALIdentity(paths or stream) if wals are archived + * @throws IOException + */ + public abstract void locateRecoveredWALIdentities(PriorityBlockingQueue queue) + throws IOException; + // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal // area rather than to the wal area for a particular region server. private Path getReplSyncUpPath(Path path) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index b0d4db0..9d0bbd3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +40,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper private final ReplicationQueueStorage replicationQueues; public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, - PriorityBlockingQueue queue, RecoveredReplicationSource source, + PriorityBlockingQueue queue, RecoveredReplicationSource source, ReplicationQueueStorage queueStorage) { super(conf, walGroupId, queue, source); this.source = source; @@ -58,7 +58,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper int numRetries = 0; while (numRetries <= maxRetriesMultiplier) { try { - source.locateRecoveredPaths(queue); + source.locateRecoveredWALIdentities(queue); break; } catch (IOException e) { LOG.error("Error while locating recovered queue paths, attempt #" + numRetries); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java index 24963f1..75d1eff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,9 +140,11 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { private Reader getReader(String wal) throws IOException { Path path = new Path(rs.getWALRootDir(), wal); long length = rs.getWALFileSystem().getFileStatus(path).getLen(); + WALFactory factory = new WALFactory(conf, "replay-sync-replication"); + WALProvider walProvider = factory.getWALProvider(); try { FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf); - return WALFactory.createReader(rs.getWALFileSystem(), path, rs.getConfiguration()); + return walProvider.createReader(walProvider.createWALIdentity(path.toString()), null, true); } catch (EOFException e) { if (length <= 0) { LOG.warn("File is empty. Could not open {} for reading because {}", path, e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index b04f0cb..15e6bae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -126,7 +126,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), - mapping); + mapping, walProvider); this.syncReplicationPeerInfoProvider = new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping); PeerActionListener peerActionListener = PeerActionListener.DUMMY; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 10fa50f..814c1f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath; - -import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; @@ -35,6 +32,8 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -62,6 +61,8 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +87,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); // Queues of logs to process, entry in format of walGroupId->queue, // each presents a queue for one wal group - private Map> queues = new HashMap<>(); + private Map> queues = new HashMap<>(); // per group queue size, keep no more than this number of logs in each wal group protected int queueSizePerGroup; protected ReplicationQueueStorage queueStorage; @@ -137,6 +138,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private int waitOnEndpointSeconds = -1; private Thread initThread; + protected WALProvider walProvider; /** * Instantiation method used by region servers @@ -149,10 +151,10 @@ public class ReplicationSource implements ReplicationSourceInterface { * @param metrics metrics for replication source */ @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { + MetricsSource metrics, WALProvider walProvider) throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); this.waitOnEndpointSeconds = @@ -191,11 +193,11 @@ public class ReplicationSource implements ReplicationSourceInterface { } @Override - public void enqueueLog(Path log) { + public void enqueueLog(WALIdentity log) { String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName()); - PriorityBlockingQueue queue = queues.get(logPrefix); + PriorityBlockingQueue queue = queues.get(logPrefix); if (queue == null) { - queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); + queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator(walProvider)); // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise // the shipper may quit immediately queue.put(log); @@ -300,7 +302,7 @@ public class ReplicationSource implements ReplicationSourceInterface { this.walEntryFilter = new ChainWALEntryFilter(filters); } - private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { + private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { @@ -328,13 +330,8 @@ public class ReplicationSource implements ReplicationSourceInterface { int queueSize = queues.get(walGroupId).size(); replicationDelay = ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); - Path currentPath = shipper.getCurrentPath(); - try { - fileSize = getFileSize(currentPath); - } catch (IOException e) { - LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e); - fileSize = -1; - } + WALIdentity currentPath = shipper.getCurrentWALIdentity(); + fileSize = -1; ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); statusBuilder.withPeerId(this.getPeerId()) .withQueueSize(queueSize) @@ -349,24 +346,13 @@ public class ReplicationSource implements ReplicationSourceInterface { return sourceReplicationStatus; } - private long getFileSize(Path currentPath) throws IOException { - long fileSize; - try { - fileSize = fs.getContentSummary(currentPath).getLength(); - } catch (FileNotFoundException e) { - currentPath = getArchivedLogPath(currentPath, conf); - fileSize = fs.getContentSummary(currentPath).getLength(); - } - return fileSize; - } - protected ReplicationSourceShipper createNewShipper(String walGroupId, - PriorityBlockingQueue queue) { + PriorityBlockingQueue queue) { return new ReplicationSourceShipper(conf, walGroupId, queue, this); } private ReplicationSourceWALReader createNewWALReader(String walGroupId, - PriorityBlockingQueue queue, long startPosition) { + PriorityBlockingQueue queue, long startPosition) { return replicationPeer.getPeerConfig().isSerial() ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); @@ -374,7 +360,8 @@ public class ReplicationSource implements ReplicationSourceInterface { protected final void uncaughtException(Thread t, Throwable e) { RSRpcServices.exitIfOOME(e); - LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e); + LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentWALIdentity(), + e); server.abort("Unexpected exception in " + t.getName(), e); } @@ -497,9 +484,9 @@ public class ReplicationSource implements ReplicationSourceInterface { initializeWALEntryFilter(peerClusterId); // start workers - for (Map.Entry> entry : queues.entrySet()) { + for (Map.Entry> entry : queues.entrySet()) { String walGroupId = entry.getKey(); - PriorityBlockingQueue queue = entry.getValue(); + PriorityBlockingQueue queue = entry.getValue(); tryStartNewShipper(walGroupId, queue); } } @@ -593,11 +580,11 @@ public class ReplicationSource implements ReplicationSourceInterface { } @Override - public Path getCurrentPath() { + public WALIdentity getCurrentWALIdentity() { // only for testing for (ReplicationSourceShipper worker : workerThreads.values()) { - if (worker.getCurrentPath() != null) { - return worker.getCurrentPath(); + if (worker.getCurrentWALIdentity() != null) { + return worker.getCurrentWALIdentity(); } } return null; @@ -611,25 +598,16 @@ public class ReplicationSource implements ReplicationSourceInterface { /** * Comparator used to compare logs together based on their start time */ - public static class LogsComparator implements Comparator { - - @Override - public int compare(Path o1, Path o2) { - return Long.compare(getTS(o1), getTS(o2)); + public static class LogsComparator implements Comparator { + private static Pattern WAL_FILE_NAME_PATTERN = + Pattern.compile("(.+)\\.(\\d+)(\\.[0-9A-Za-z]+)?"); + private WALProvider walProvider; + public LogsComparator(WALProvider walProvider) { + this.walProvider = walProvider; } - - /** - *

- * Split a path to get the start time - *

- *

- * For example: 10.20.20.171%3A60020.1277499063250 - *

- * @param p path to split - * @return start time - */ - private static long getTS(Path p) { - return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName()); + @Override + public int compare(WALIdentity o1, WALIdentity o2) { + return Long.compare(walProvider.getWALStartTime(o1), walProvider.getWALStartTime(o2)); } } @@ -642,7 +620,7 @@ public class ReplicationSource implements ReplicationSourceInterface { String walGroupId = entry.getKey(); ReplicationSourceShipper worker = entry.getValue(); long position = worker.getCurrentPosition(); - Path currentPath = worker.getCurrentPath(); + WALIdentity currentPath = worker.getCurrentWALIdentity(); sb.append("walGroup [").append(walGroupId).append("]: "); if (currentPath != null) { sb.append("currently replicating from: ").append(currentPath).append(" at position: ") @@ -695,4 +673,7 @@ public class ReplicationSource implements ReplicationSourceInterface { void removeWorker(ReplicationSourceShipper worker) { workerThreads.remove(worker.walGroupId, worker); } + public WALProvider getWalProvider() { + return walProvider; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java index d613049..65241e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java @@ -47,7 +47,7 @@ public class ReplicationSourceFactory { LOG.warn("Passed replication source implementation throws errors, " + "defaulting to ReplicationSource", e); - src = isQueueRecovered ? new RecoveredReplicationSource() : new ReplicationSource(); + src = isQueueRecovered ? new FSRecoveredReplicationSource() : new ReplicationSource(); } return src; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index df7a8cc..fb09c31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.UUID; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; @@ -36,6 +35,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; /** @@ -51,16 +52,16 @@ public interface ReplicationSourceInterface { * @param manager the manager to use * @param server the server for this region server */ - void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException; + MetricsSource metrics, WALProvider walProvider) throws IOException; /** * Add a log to the list of logs to replicate * @param log path to the log to replicate */ - void enqueueLog(Path log); + void enqueueLog(WALIdentity log); /** * Add hfile names to the queue to be replicated. @@ -95,7 +96,7 @@ public interface ReplicationSourceInterface { * Get the current log that's replicated * @return the current log */ - Path getCurrentPath(); + WALIdentity getCurrentWALIdentity(); /** * Get the queue id that the source is replicating to diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 5756cbc..d50bde9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -65,6 +65,8 @@ import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -148,7 +150,7 @@ public class ReplicationSourceManager implements ReplicationListener { private final Configuration conf; private final FileSystem fs; // The paths to the latest log of each wal group, for new coming peers - private final Set latestPaths; + private final Set latestPaths; // Path to the wals directories private final Path logDir; // Path to the wal archive @@ -169,6 +171,8 @@ public class ReplicationSourceManager implements ReplicationListener { // Maximum number of retries before taking bold actions when deleting remote wal files for sync // replication peer. private final int maxRetriesMultiplier; + private final WALProvider walProvider; + private final ServerName serverName; /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -186,7 +190,8 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) throws IOException { + SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, + WALProvider walProvider) throws IOException { this.sources = new ConcurrentHashMap<>(); this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; @@ -196,6 +201,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); this.oldsources = new ArrayList<>(); this.conf = conf; + this.walProvider = walProvider; this.fs = fs; this.logDir = logDir; this.oldLogDir = oldLogDir; @@ -212,11 +218,12 @@ public class ReplicationSourceManager implements ReplicationListener { // even if we fail, other region servers can take care of it this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + this.serverName = this.server.getServerName(); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); - this.latestPaths = new HashSet(); + this.latestPaths = new HashSet<>(); this.replicationForBulkLoadDataEnabled = conf.getBoolean( HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); @@ -349,8 +356,8 @@ public class ReplicationSourceManager implements ReplicationListener { MetricsSource metrics = new MetricsSource(queueId); // init replication source - src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, - walFileLengthProvider, metrics); + src.init(conf, this, queueStorage, replicationPeer, server, queueId, clusterId, + walFileLengthProvider, metrics, walProvider); return src; } @@ -372,7 +379,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.walsById.put(peerId, walsByGroup); // Add the latest wal to that source's queue if (this.latestPaths.size() > 0) { - for (Path logPath : latestPaths) { + for (WALIdentity logPath : latestPaths) { String name = logPath.getName(); String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); NavigableSet logs = new TreeSet<>(); @@ -485,7 +492,8 @@ public class ReplicationSourceManager implements ReplicationListener { toRemove.terminate(terminateMessage); } for (NavigableSet walsByGroup : walsById.get(peerId).values()) { - walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); + walsByGroup.forEach(wal -> src.enqueueLog( + ((SyncReplicationWALProvider)this.walProvider).getFullPath(serverName, wal))); } } LOG.info("Startup replication source for " + src.getPeerId()); @@ -506,7 +514,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface replicationSource = createSource(queueId, peer); this.oldsources.add(replicationSource); for (SortedSet walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { - walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal))); + walsByGroup.forEach(wal -> src.enqueueLog(this.walProvider.createWALIdentity(wal))); } toStartup.add(replicationSource); } @@ -780,15 +788,15 @@ public class ReplicationSourceManager implements ReplicationListener { } // Add to latestPaths - Iterator iterator = latestPaths.iterator(); + Iterator iterator = latestPaths.iterator(); while (iterator.hasNext()) { - Path path = iterator.next(); + WALIdentity path = iterator.next(); if (path.getName().contains(logPrefix)) { iterator.remove(); break; } } - this.latestPaths.add(newLog); + this.latestPaths.add(walProvider.createWALIdentity(newLog.toString())); } } @@ -797,7 +805,7 @@ public class ReplicationSourceManager implements ReplicationListener { public void postLogRoll(Path newLog) throws IOException { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources.values()) { - source.enqueueLog(newLog); + source.enqueueLog(walProvider.createWALIdentity(newLog.toString())); } } @@ -970,7 +978,11 @@ public class ReplicationSourceManager implements ReplicationListener { } oldsources.add(src); for (String wal : walsSet) { - src.enqueueLog(new Path(oldLogDir, wal)); + WALIdentity archivedWal = ((SyncReplicationWALProvider)walProvider) + .getWalFromArchivePath(wal); + if (archivedWal != null) { + src.enqueueLog(archivedWal); + } } src.startup(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 5d6198e..4b517ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; @@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,14 +56,14 @@ public class ReplicationSourceShipper extends Thread { private final Configuration conf; protected final String walGroupId; - protected final PriorityBlockingQueue queue; + protected final PriorityBlockingQueue queue; private final ReplicationSource source; // Last position in the log that we sent to ZooKeeper // It will be accessed by the stats thread so make it volatile private volatile long currentPosition = -1; // Path of the current log - private Path currentPath; + private WALIdentity currentPath; // Current state of the worker thread private volatile WorkerState state; protected ReplicationSourceWALReader entryReader; @@ -76,7 +76,7 @@ public class ReplicationSourceShipper extends Thread { private final int getEntriesTimeout; public ReplicationSourceShipper(Configuration conf, String walGroupId, - PriorityBlockingQueue queue, ReplicationSource source) { + PriorityBlockingQueue queue, ReplicationSource source) { this.conf = conf; this.walGroupId = walGroupId; this.queue = queue; @@ -293,7 +293,7 @@ public class ReplicationSourceShipper extends Thread { name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); } - Path getCurrentPath() { + WALIdentity getCurrentWALIdentity() { return entryReader.getCurrentPath(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index b3bdb02..ee410e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import java.io.EOFException; import java.io.IOException; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -28,7 +27,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; @@ -37,6 +35,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -55,7 +54,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript class ReplicationSourceWALReader extends Thread { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); - private final PriorityBlockingQueue logQueue; + private final PriorityBlockingQueue logQueue; private final FileSystem fs; private final Configuration conf; private final WALEntryFilter filter; @@ -89,7 +88,7 @@ class ReplicationSourceWALReader extends Thread { * @param source replication source */ public ReplicationSourceWALReader(FileSystem fs, Configuration conf, - PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, + PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, ReplicationSource source) { this.logQueue = logQueue; this.currentPosition = startPosition; @@ -124,8 +123,8 @@ class ReplicationSourceWALReader extends Thread { int sleepMultiplier = 1; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream try (WALEntryStream entryStream = - new WALEntryStream(logQueue, fs, conf, currentPosition, - source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), + this.source.getWalProvider().getWalStream(logQueue, conf, + currentPosition, source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), source.getSourceMetrics())) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!source.isPeerEnabled()) { @@ -152,9 +151,6 @@ class ReplicationSourceWALReader extends Thread { if (sleepMultiplier < maxRetriesMultiplier) { LOG.debug("Failed to read stream of replication entries: " + e); sleepMultiplier++; - } else { - LOG.error("Failed to read stream of replication entries", e); - handleEofException(e); } Threads.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { @@ -181,14 +177,14 @@ class ReplicationSourceWALReader extends Thread { batch.getNbEntries() >= replicationBatchCountCapacity; } - protected static final boolean switched(WALEntryStream entryStream, Path path) { - Path newPath = entryStream.getCurrentPath(); + protected static final boolean switched(WALEntryStream entryStream, WALIdentity path) { + WALIdentity newPath = entryStream.getCurrentWALIdentity(); return newPath == null || !path.getName().equals(newPath.getName()); } protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { - Path currentPath = entryStream.getCurrentPath(); + WALIdentity currentPath = entryStream.getCurrentWALIdentity(); if (!entryStream.hasNext()) { // check whether we have switched a file if (currentPath != null && switched(entryStream, currentPath)) { @@ -203,7 +199,7 @@ class ReplicationSourceWALReader extends Thread { } } else { // when reading from the entry stream first time we will enter here - currentPath = entryStream.getCurrentPath(); + currentPath = entryStream.getCurrentWALIdentity(); } WALEntryBatch batch = createBatch(entryStream); for (;;) { @@ -241,25 +237,7 @@ class ReplicationSourceWALReader extends Thread { } } - // if we get an EOF due to a zero-length log, and there are other logs in queue - // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is - // enabled, then dump the log - private void handleEofException(IOException e) { - if ((e instanceof EOFException || e.getCause() instanceof EOFException) && - logQueue.size() > 1 && this.eofAutoRecovery) { - try { - if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { - LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); - logQueue.remove(); - currentPosition = 0; - } - } catch (IOException ioe) { - LOG.warn("Couldn't get file length information about log " + logQueue.peek()); - } - } - } - - public Path getCurrentPath() { + public WALIdentity getCurrentPath() { // if we've read some WAL entries, get the Path we read from WALEntryBatch batchQueueHead = entryBatchQueue.peek(); if (batchQueueHead != null) { @@ -280,7 +258,7 @@ class ReplicationSourceWALReader extends Thread { } protected final WALEntryBatch createBatch(WALEntryStream entryStream) { - return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); + return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentWALIdentity()); } protected final Entry filterEntry(Entry entry) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java index 10d6cd5..94393cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java @@ -17,14 +17,14 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public final class ReplicationStatus { private final String peerId; private final String walGroup; - private final Path currentPath; + private final WALIdentity currentPath; private final int queueSize; private final long ageOfLastShippedOp; private final long replicationDelay; @@ -70,7 +70,7 @@ public final class ReplicationStatus { return replicationDelay; } - public Path getCurrentPath() { + public WALIdentity getCurrentPath() { return currentPath; } @@ -81,7 +81,7 @@ public final class ReplicationStatus { public static class ReplicationStatusBuilder { private String peerId = "UNKNOWN"; private String walGroup = "UNKNOWN"; - private Path currentPath = new Path("UNKNOWN"); + private WALIdentity currentPath = null; private int queueSize = -1; private long ageOfLastShippedOp = -1; private long replicationDelay = -1; @@ -103,7 +103,7 @@ public final class ReplicationStatus { return this; } - public ReplicationStatusBuilder withCurrentPath(Path currentPath) { + public ReplicationStatusBuilder withCurrentPath(WALIdentity currentPath) { this.currentPath = currentPath; return this; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index 9edcc8a..7a4d496 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -21,11 +21,11 @@ import java.io.IOException; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; /** @@ -44,7 +44,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader private final SerialReplicationChecker checker; public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf, - PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, + PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, ReplicationSource source) { super(fs, conf, logQueue, startPosition, filter, source); checker = new SerialReplicationChecker(conf, source); @@ -53,7 +53,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader @Override protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { - Path currentPath = entryStream.getCurrentPath(); + WALIdentity currentPath = entryStream.getCurrentWALIdentity(); if (!entryStream.hasNext()) { // check whether we have switched a file if (currentPath != null && switched(entryStream, currentPath)) { @@ -68,7 +68,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader } } else { // when reading from the entry stream first time we will enter here - currentPath = entryStream.getCurrentPath(); + currentPath = entryStream.getCurrentWALIdentity(); } long positionBefore = entryStream.getPosition(); WALEntryBatch batch = createBatch(entryStream); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index 22b2de7..b3b0978 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; /** @@ -36,7 +37,7 @@ class WALEntryBatch { private List walEntries; // last WAL that was read - private Path lastWalPath; + private WALIdentity lastWalPath; // position in WAL of last entry in this batch private long lastWalPosition = 0; // number of distinct row keys in this batch @@ -53,13 +54,13 @@ class WALEntryBatch { /** * @param lastWalPath Path of the WAL the last entry in this batch was read from */ - WALEntryBatch(int maxNbEntries, Path lastWalPath) { + WALEntryBatch(int maxNbEntries, WALIdentity lastWalPath) { this.walEntries = new ArrayList<>(maxNbEntries); this.lastWalPath = lastWalPath; } - static WALEntryBatch endOfFile(Path lastWalPath) { + static WALEntryBatch endOfFile(WALIdentity lastWalPath) { WALEntryBatch batch = new WALEntryBatch(0, lastWalPath); batch.setLastWalPosition(-1L); batch.setEndOfFile(true); @@ -80,7 +81,7 @@ class WALEntryBatch { /** * @return the path of the last WAL that was read. */ - public Path getLastWalPath() { + public WALIdentity getLastWalPath() { return lastWalPath; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 0393af4..074aca2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -18,29 +18,14 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import java.io.Closeable; -import java.io.FileNotFoundException; +import org.apache.hadoop.hbase.wal.WALIdentity; + import java.io.IOException; -import java.util.OptionalLong; -import java.util.concurrent.PriorityBlockingQueue; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; -import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; + +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WAL.Reader; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually @@ -49,382 +34,19 @@ import org.slf4j.LoggerFactory; */ @InterfaceAudience.Private @InterfaceStability.Evolving -class WALEntryStream implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class); - - private Reader reader; - private Path currentPath; - // cache of next entry for hasNext() - private Entry currentEntry; - // position for the current entry. As now we support peek, which means that the upper layer may - // choose to return before reading the current entry, so it is not safe to return the value below - // in getPosition. - private long currentPositionOfEntry = 0; - // position after reading current entry - private long currentPositionOfReader = 0; - private final PriorityBlockingQueue logQueue; - private final FileSystem fs; - private final Configuration conf; - private final WALFileLengthProvider walFileLengthProvider; - // which region server the WALs belong to - private final ServerName serverName; - private final MetricsSource metrics; - +public interface WALEntryStream extends WAL.Reader { /** - * Create an entry stream over the given queue at the given start position - * @param logQueue the queue of WAL paths - * @param fs {@link FileSystem} to use to create {@link Reader} for this stream - * @param conf {@link Configuration} to use to create {@link Reader} for this stream - * @param startPosition the position in the first WAL to start reading at - * @param serverName the server name which all WALs belong to - * @param metrics replication metrics - * @throws IOException + * @return the {@link WALIdentity} of the current WAL */ - public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs, Configuration conf, - long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, - MetricsSource metrics) throws IOException { - this.logQueue = logQueue; - this.fs = fs; - this.conf = conf; - this.currentPositionOfEntry = startPosition; - this.walFileLengthProvider = walFileLengthProvider; - this.serverName = serverName; - this.metrics = metrics; - } + public WALIdentity getCurrentWALIdentity(); /** * @return true if there is another WAL {@link Entry} */ - public boolean hasNext() throws IOException { - if (currentEntry == null) { - tryAdvanceEntry(); - } - return currentEntry != null; - } + public boolean hasNext() throws IOException; /** * Returns the next WAL entry in this stream but does not advance. */ - public Entry peek() throws IOException { - return hasNext() ? currentEntry: null; - } - - /** - * Returns the next WAL entry in this stream and advance the stream. - */ - public Entry next() throws IOException { - Entry save = peek(); - currentPositionOfEntry = currentPositionOfReader; - currentEntry = null; - return save; - } - - /** - * {@inheritDoc} - */ - @Override - public void close() throws IOException { - closeReader(); - } - - /** - * @return the position of the last Entry returned by next() - */ - public long getPosition() { - return currentPositionOfEntry; - } - - /** - * @return the {@link Path} of the current WAL - */ - public Path getCurrentPath() { - return currentPath; - } - - private String getCurrentPathStat() { - StringBuilder sb = new StringBuilder(); - if (currentPath != null) { - sb.append("currently replicating from: ").append(currentPath).append(" at position: ") - .append(currentPositionOfEntry).append("\n"); - } else { - sb.append("no replication ongoing, waiting for new log"); - } - return sb.toString(); - } - - /** - * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned - * false) - */ - public void reset() throws IOException { - if (reader != null && currentPath != null) { - resetReader(); - } - } - - private void setPosition(long position) { - currentPositionOfEntry = position; - } - - private void setCurrentPath(Path path) { - this.currentPath = path; - } - - private void tryAdvanceEntry() throws IOException { - if (checkReader()) { - boolean beingWritten = readNextEntryAndRecordReaderPosition(); - if (currentEntry == null && !beingWritten) { - // no more entries in this log file, and the file is already closed, i.e, rolled - // Before dequeueing, we should always get one more attempt at reading. - // This is in case more entries came in after we opened the reader, and the log is rolled - // while we were reading. See HBASE-6758 - resetReader(); - readNextEntryAndRecordReaderPosition(); - if (currentEntry == null) { - if (checkAllBytesParsed()) { // now we're certain we're done with this log file - dequeueCurrentLog(); - if (openNextLog()) { - readNextEntryAndRecordReaderPosition(); - } - } - } - } - // if currentEntry != null then just return - // if currentEntry == null but the file is still being written, then we should not switch to - // the next log either, just return here and try next time to see if there are more entries in - // the current file - } - // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) - } - - // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file - private boolean checkAllBytesParsed() throws IOException { - // -1 means the wal wasn't closed cleanly. - final long trailerSize = currentTrailerSize(); - FileStatus stat = null; - try { - stat = fs.getFileStatus(this.currentPath); - } catch (IOException exception) { - LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", - currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat()); - metrics.incrUnknownFileLengthForClosedWAL(); - } - // Here we use currentPositionOfReader instead of currentPositionOfEntry. - // We only call this method when currentEntry is null so usually they are the same, but there - // are two exceptions. One is we have nothing in the file but only a header, in this way - // the currentPositionOfEntry will always be 0 since we have no change to update it. The other - // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the - // last valid entry, and the currentPositionOfReader will usually point to the end of the file. - if (stat != null) { - if (trailerSize < 0) { - if (currentPositionOfReader < stat.getLen()) { - final long skippedBytes = stat.getLen() - currentPositionOfReader; - LOG.debug( - "Reached the end of WAL file '{}'. It was not closed cleanly," + - " so we did not parse {} bytes of data. This is normally ok.", - currentPath, skippedBytes); - metrics.incrUncleanlyClosedWALs(); - metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); - } - } else if (currentPositionOfReader + trailerSize < stat.getLen()) { - LOG.warn( - "Processing end of WAL file '{}'. At position {}, which is too far away from" + - " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", - currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); - setPosition(0); - resetReader(); - metrics.incrRestartedWALReading(); - metrics.incrRepeatedFileBytes(currentPositionOfReader); - return false; - } - } - if (LOG.isTraceEnabled()) { - LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + - (stat == null ? "N/A" : stat.getLen())); - } - metrics.incrCompletedWAL(); - return true; - } - - private void dequeueCurrentLog() throws IOException { - LOG.debug("Reached the end of log {}", currentPath); - closeReader(); - logQueue.remove(); - setPosition(0); - metrics.decrSizeOfLogQueue(); - } - - /** - * Returns whether the file is opened for writing. - */ - private boolean readNextEntryAndRecordReaderPosition() throws IOException { - Entry readEntry = reader.next(); - long readerPos = reader.getPosition(); - OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); - if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { - // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted - // data, so we need to make sure that we do not read beyond the committed file length. - if (LOG.isDebugEnabled()) { - LOG.debug("The provider tells us the valid length for " + currentPath + " is " + - fileLength.getAsLong() + ", but we have advanced to " + readerPos); - } - resetReader(); - return true; - } - if (readEntry != null) { - metrics.incrLogEditsRead(); - metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); - } - currentEntry = readEntry; // could be null - this.currentPositionOfReader = readerPos; - return fileLength.isPresent(); - } - - private void closeReader() throws IOException { - if (reader != null) { - reader.close(); - reader = null; - } - } - - // if we don't have a reader, open a reader on the next log - private boolean checkReader() throws IOException { - if (reader == null) { - return openNextLog(); - } - return true; - } - - // open a reader on the next log in queue - private boolean openNextLog() throws IOException { - Path nextPath = logQueue.peek(); - if (nextPath != null) { - openReader(nextPath); - if (reader != null) { - return true; - } - } else { - // no more files in queue, this could happen for recovered queue, or for a wal group of a sync - // replication peer which has already been transited to DA or S. - setCurrentPath(null); - } - return false; - } - - private Path getArchivedLog(Path path) throws IOException { - Path rootDir = FSUtils.getRootDir(conf); - - // Try found the log in old dir - Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); - Path archivedLogLocation = new Path(oldLogDir, path.getName()); - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } - - // Try found the log in the seperate old log dir - oldLogDir = - new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) - .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); - archivedLogLocation = new Path(oldLogDir, path.getName()); - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } - - LOG.error("Couldn't locate log: " + path); - return path; - } - - private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { - // If the log was archived, continue reading from there - Path archivedLog = getArchivedLog(path); - if (!path.equals(archivedLog)) { - openReader(archivedLog); - } else { - throw fnfe; - } - } - - private void openReader(Path path) throws IOException { - try { - // Detect if this is a new file, if so get a new reader else - // reset the current reader so that we see the new data - if (reader == null || !getCurrentPath().equals(path)) { - closeReader(); - reader = WALFactory.createReader(fs, path, conf); - seek(); - setCurrentPath(path); - } else { - resetReader(); - } - } catch (FileNotFoundException fnfe) { - handleFileNotFound(path, fnfe); - } catch (RemoteException re) { - IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); - if (!(ioe instanceof FileNotFoundException)) throw ioe; - handleFileNotFound(path, (FileNotFoundException)ioe); - } catch (LeaseNotRecoveredException lnre) { - // HBASE-15019 the WAL was not closed due to some hiccup. - LOG.warn("Try to recover the WAL lease " + currentPath, lnre); - recoverLease(conf, currentPath); - reader = null; - } catch (NullPointerException npe) { - // Workaround for race condition in HDFS-4380 - // which throws a NPE if we open a file before any data node has the most recent block - // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. - LOG.warn("Got NPE opening reader, will retry."); - reader = null; - } - } - - // For HBASE-15019 - private void recoverLease(final Configuration conf, final Path path) { - try { - final FileSystem dfs = FSUtils.getCurrentFileSystem(conf); - FSUtils fsUtils = FSUtils.getInstance(dfs, conf); - fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { - @Override - public boolean progress() { - LOG.debug("recover WAL lease: " + path); - return true; - } - }); - } catch (IOException e) { - LOG.warn("unable to recover lease for WAL: " + path, e); - } - } - - private void resetReader() throws IOException { - try { - currentEntry = null; - reader.reset(); - seek(); - } catch (FileNotFoundException fnfe) { - // If the log was archived, continue reading from there - Path archivedLog = getArchivedLog(currentPath); - if (!currentPath.equals(archivedLog)) { - openReader(archivedLog); - } else { - throw fnfe; - } - } catch (NullPointerException npe) { - throw new IOException("NPE resetting reader, likely HDFS-4380", npe); - } - } - - private void seek() throws IOException { - if (currentPositionOfEntry != 0) { - reader.seek(currentPositionOfEntry); - } - } - - private long currentTrailerSize() { - long size = -1L; - if (reader instanceof ProtobufLogReader) { - final ProtobufLogReader pblr = (ProtobufLogReader) reader; - size = pblr.trailerSize(); - } - return size; - } + public Entry peek() throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java index 010fa69..40772ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.OptionalLong; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.wal.WALIdentity; import org.apache.yetus.audience.InterfaceAudience; /** @@ -30,5 +30,5 @@ import org.apache.yetus.audience.InterfaceAudience; @FunctionalInterface public interface WALFileLengthProvider { - OptionalLong getLogFileSizeIfBeingWritten(Path path); + OptionalLong getLogFileSizeIfBeingWritten(WALIdentity path); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index ccdc95f..b9e55b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.wal; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -39,8 +41,17 @@ import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -84,7 +95,11 @@ public abstract class AbstractFSWALProvider> implemen protected AtomicBoolean initialized = new AtomicBoolean(false); // for default wal provider, logPrefix won't change protected String logPrefix; - + private Class logReaderClass; + /** + * How long to attempt opening in-recovery wals + */ + private int timeoutMillis; /** * we synchronized on walCreateLock to prevent wal recreation in different threads */ @@ -101,6 +116,9 @@ public abstract class AbstractFSWALProvider> implemen if (!initialized.compareAndSet(false, true)) { throw new IllegalStateException("WALProvider.init should only be called once."); } + timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); + logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, + AbstractFSWALProvider.Reader.class); this.factory = factory; this.conf = conf; this.providerId = providerId; @@ -448,19 +466,20 @@ public abstract class AbstractFSWALProvider> implemen * @throws IOException */ public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf) - throws IOException - - { + throws IOException { long retryInterval = 2000; // 2 sec int maxAttempts = 30; int attempt = 0; Exception ee = null; org.apache.hadoop.hbase.wal.WAL.Reader reader = null; + WALFactory factory = new WALFactory(conf, "pretty-printer"); + WALProvider provider = factory.getWALProvider(); + while (reader == null && attempt++ < maxAttempts) { try { // Detect if this is a new file, if so get a new reader else // reset the current reader so that we see the new data - reader = WALFactory.createReader(path.getFileSystem(conf), path, conf); + reader = provider.createReader(provider.createWALIdentity(path.toString()), null, true); return reader; } catch (FileNotFoundException fnfe) { // If the log was archived, continue reading from there @@ -537,7 +556,139 @@ public abstract class AbstractFSWALProvider> implemen return getWALNameGroupFromWALName(name, 1); } + @Override + public long getWALStartTime(WALIdentity id) { + return getWALStartTimeFromWALName(id.getName()); + } + + /** + * Public because of FSHLog. Should be package-private + * @param overwritable if the created writer can overwrite. For recovered edits, it is true and + * for WAL it is false. Thus we can distinguish WAL and recovered edits by this. + */ + public Writer createWriter(final Configuration conf, final WALIdentity walId, + final boolean overwritable) throws IOException { + Path path = ((FSWALIdentity)walId).getPath(); + FileSystem fs = path.getFileSystem(conf); + // Configuration already does caching for the Class lookup. + Class logWriterClass = + conf.getClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class, + Writer.class); + Writer writer = null; + try { + writer = logWriterClass.getDeclaredConstructor().newInstance(); + long blocksize = WALUtil.getWALBlockSize(conf, fs, path, overwritable); + writer.init(walId, conf, overwritable, blocksize); + return writer; + } catch (Exception e) { + if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { + LOG.error("The RegionServer write ahead log provider for FileSystem implementations " + + "relies on the ability to call " + e.getMessage() + " for proper operation during " + + "component failures, but the current FileSystem does not support doing so. Please " + + "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " + + "it points to a FileSystem mount that has suitable capabilities for output streams."); + } else { + LOG.debug("Error instantiating log writer.", e); + } + if (writer != null) { + try{ + writer.close(); + } catch(IOException ee){ + LOG.error("cannot close log writer", ee); + } + } + throw new IOException("cannot get log writer", e); + } + } + public static long getWALStartTimeFromWALName(String name) { return Long.parseLong(getWALNameGroupFromWALName(name, 2)); } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + walFileLengthProvider, serverName, metrics, this); + } + @Override + public void preRecovery(WALIdentity walId, Configuration conf, CancelableProgressable reporter) + throws IOException { + FileSystem walFS = CommonFSUtils.getWALFileSystem(conf); + FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, + ((FSWALIdentity)walId).getPath(), conf, reporter); + } + + @Override + public Reader createReader(final WALIdentity walId, CancelableProgressable reporter, + boolean allowCustom) throws IOException { + Class lrClass = + allowCustom ? logReaderClass : ProtobufLogReader.class; + try { + // A wal file could be under recovery, so it may take several + // tries to get it open. Instead of claiming it is corrupted, retry + // to open it up to 5 minutes by default. + long startWaiting = EnvironmentEdgeManager.currentTime(); + long openTimeout = timeoutMillis + startWaiting; + int nbAttempt = 0; + Path path = ((FSWALIdentity)walId).getPath(); + FileSystem fs = path.getFileSystem(conf); + AbstractFSWALProvider.Reader reader = null; + while (true) { + try { + reader = lrClass.getDeclaredConstructor().newInstance(); + reader.init(fs, path, conf, null); + return reader; + } catch (IOException e) { + if (reader != null) { + try { + reader.close(); + } catch (IOException exception) { + LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); + LOG.debug("exception details", exception); + } + } + + String msg = e.getMessage(); + if (msg != null + && (msg.contains("Cannot obtain block length") + || msg.contains("Could not obtain the last block") || msg + .matches("Blocklist for [^ ]* has changed.*"))) { + if (++nbAttempt == 1) { + LOG.warn("Lease should have recovered. This is not expected. Will retry", e); + } + if (reporter != null && !reporter.progress()) { + throw new InterruptedIOException("Operation is cancelled"); + } + if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { + LOG.error("Can't open after " + nbAttempt + " attempts and " + + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); + } else { + try { + Thread.sleep(nbAttempt < 3 ? 500 : 1000); + continue; // retry + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + throw new LeaseNotRecoveredException(e); + } else { + throw e; + } + } + } + } catch (IOException ie) { + throw ie; + } catch (Exception e) { + throw new IOException("Cannot get log reader", e); + } + } + + @Override + public WALIdentity createWALIdentity(String wal) { + return new FSWALIdentity(wal); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 062b368..5e821d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -52,11 +51,9 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { public interface AsyncWriter extends WALProvider.AsyncWriter { /** * @throws IOException if something goes wrong initializing an output stream - * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that - * meet the needs of the given Writer implementation. */ - void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) - throws IOException, CommonFSUtils.StreamLacksCapabilityException; + void init(WALIdentity walId, Configuration c, boolean overwritable, long blocksize) + throws IOException; } private EventLoopGroup eventLoopGroup; @@ -99,10 +96,12 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { // Configuration already does caching for the Class lookup. Class logWriterClass = conf.getClass( "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class); + WALFactory factory = new WALFactory(conf, "AsyncFSWAL"); + WALProvider provider = factory.getWALProvider(); try { AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) .newInstance(eventLoopGroup, channelClass); - writer.init(fs, path, conf, overwritable, blocksize); + writer.init(provider.createWALIdentity(path.toString()), conf, overwritable, blocksize); return writer; } catch (Exception e) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 75439fe..99a8c95 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -31,11 +32,19 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -243,7 +252,7 @@ class DisabledWALProvider implements WALProvider { } @Override - public OptionalLong getLogFileSizeIfBeingWritten(Path path) { + public OptionalLong getLogFileSizeIfBeingWritten(WALIdentity path) { return OptionalLong.empty(); } } @@ -262,4 +271,38 @@ class DisabledWALProvider implements WALProvider { public void addWALActionsListener(WALActionsListener listener) { disabled.registerWALActionsListener(listener); } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileSizeProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + walFileSizeProvider, serverName, metrics, this); + } + @Override + public void preRecovery(WALIdentity walId, Configuration conf, CancelableProgressable reporter) + throws IOException { + } + + @Override + public Reader createReader(final WALIdentity path, CancelableProgressable reporter, + boolean allowCustom) throws IOException { + return null; + } + + @Override + public Writer createWriter(final Configuration conf, final WALIdentity walId, + final boolean overwritable) throws IOException { + return null; + } + + @Override + public long getWALStartTime(WALIdentity id) { + return 0; + } + + @Override + public WALIdentity createWALIdentity(String wal) { + return new FSWALIdentity(wal); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index 7cd39ea..9275798 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -20,14 +20,8 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; -import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; -import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -42,64 +36,6 @@ public class FSHLogProvider extends AbstractFSWALProvider { private static final Logger LOG = LoggerFactory.getLogger(FSHLogProvider.class); - // Only public so classes back in regionserver.wal can access - public interface Writer extends WALProvider.Writer { - /** - * @throws IOException if something goes wrong initializing an output stream - * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that - * meet the needs of the given Writer implementation. - */ - void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) - throws IOException, CommonFSUtils.StreamLacksCapabilityException; - } - - /** - * Public because of FSHLog. Should be package-private - * @param overwritable if the created writer can overwrite. For recovered edits, it is true and - * for WAL it is false. Thus we can distinguish WAL and recovered edits by this. - */ - public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, - final boolean overwritable) throws IOException { - return createWriter(conf, fs, path, overwritable, - WALUtil.getWALBlockSize(conf, fs, path, overwritable)); - } - - /** - * Public because of FSHLog. Should be package-private - */ - public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, - final boolean overwritable, long blocksize) throws IOException { - // Configuration already does caching for the Class lookup. - Class logWriterClass = - conf.getClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class, - Writer.class); - Writer writer = null; - try { - writer = logWriterClass.getDeclaredConstructor().newInstance(); - FileSystem rootFs = FileSystem.get(path.toUri(), conf); - writer.init(rootFs, path, conf, overwritable, blocksize); - return writer; - } catch (Exception e) { - if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { - LOG.error("The RegionServer write ahead log provider for FileSystem implementations " + - "relies on the ability to call " + e.getMessage() + " for proper operation during " + - "component failures, but the current FileSystem does not support doing so. Please " + - "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " + - "it points to a FileSystem mount that has suitable capabilities for output streams."); - } else { - LOG.debug("Error instantiating log writer.", e); - } - if (writer != null) { - try{ - writer.close(); - } catch(IOException ee){ - LOG.error("cannot close log writer", ee); - } - } - throw new IOException("cannot get log writer", e); - } - } - @Override protected FSHLog createWAL() throws IOException { return new FSHLog(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index 0b7b8da..52852a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -26,16 +26,25 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; // imports for classes still in regionserver.wal import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +97,7 @@ public class RegionGroupingProvider implements WALProvider { } } + static private final String GROUP_ID = "RegionGrouping"; /** * instantiate a strategy from a config property. * requires conf to have already been set (as well as anything the provider might need to read). @@ -285,4 +295,57 @@ public class RegionGroupingProvider implements WALProvider { // extra code actually works, then we will have other big problems. So leave it as is. listeners.add(listener); } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + walFileLengthProvider, serverName, metrics, this); + } + @Override + public void preRecovery(WALIdentity walId, Configuration conf, CancelableProgressable reporter) + throws IOException { + if (cached.isEmpty()) { + getWAL(GROUP_ID); + } + cached.values().iterator().next().preRecovery(walId, conf, reporter); + } + @Override + public Reader createReader(final WALIdentity path, CancelableProgressable reporter, + boolean allowCustom) throws IOException { + if (cached.isEmpty()) { + getWAL(GROUP_ID); + } + return cached.values().iterator().next().createReader(path, reporter, allowCustom); + } + + private void ensureWALPresent() { + try { + if (cached.isEmpty()) { + getWAL(GROUP_ID); + } + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + + @Override + public Writer createWriter(final Configuration conf, final WALIdentity walId, + final boolean overwritable) throws IOException { + ensureWALPresent(); + return cached.values().iterator().next().createWriter(conf, walId, overwritable); + } + + @Override + public long getWALStartTime(WALIdentity id) { + ensureWALPresent(); + return cached.values().iterator().next().getWALStartTime(id); + } + + @Override + public WALIdentity createWALIdentity(String wal) { + ensureWALPresent(); + return cached.values().iterator().next().createWALIdentity(wal); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 9859c20..3a1326d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.function.BiPredicate; @@ -36,18 +37,28 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener; import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +101,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen private Class channelClass; private AtomicBoolean initialized = new AtomicBoolean(false); + private Path oldLogDir; + private Path walRootDir; // when switching from A to DA, we will put a Optional.empty into this map if there is no WAL for // the peer yet. When getting WAL from this map the caller should know that it should not use @@ -115,6 +128,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen provider.init(factory, conf, providerId); this.conf = conf; this.factory = factory; + walRootDir = FSUtils.getWALRootDir(conf); + this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); Pair> eventLoopGroupAndChannelClass = NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); @@ -349,4 +364,47 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen return provider; } + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, + ServerName serverName, MetricsSource metrics) + throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + walFileLengthProvider, serverName, metrics, this); + } + + @Override + public long getWALStartTime(WALIdentity id) { + return provider.getWALStartTime(id); + } + + @Override + public WALIdentity createWALIdentity(String wal) { + return provider.createWALIdentity(wal); + } + + @Override + public Writer createWriter(final Configuration conf, final WALIdentity walId, + final boolean overwritable) throws IOException { + return provider.createWriter(conf, walId, overwritable); + } + + @Override + public void preRecovery(WALIdentity walId, Configuration conf, CancelableProgressable reporter) + throws IOException { + provider.preRecovery(walId, conf, reporter); + } + + @Override + public Reader createReader(final WALIdentity path, CancelableProgressable reporter, + boolean allowCustom) throws IOException { + return provider.createReader(path, reporter, allowCustom); + } + public WALIdentity getWalFromArchivePath(String wal) { + return new FSWALIdentity(new Path(oldLogDir, wal)); + } + public WALIdentity getFullPath(ServerName serverName, String wal) { + Path walWithServerName = new Path(getWALDirectoryName(serverName.toString()), wal); + return new FSWALIdentity(new Path(walRootDir,walWithServerName)); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 0e6e365..fe57f8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -18,18 +18,13 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; -import java.io.InterruptedIOException; import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.yetus.audience.InterfaceAudience; @@ -103,7 +98,7 @@ public class WALFactory { private final Configuration conf; // Used for the singleton WALFactory, see below. - private WALFactory(Configuration conf) { + private WALFactory(Configuration conf) throws IOException { // this code is duplicated here so we can keep our members final. // until we've moved reader/writer construction down into providers, this initialization must // happen prior to provider initialization, in case they need to instantiate a reader/writer. @@ -115,8 +110,18 @@ public class WALFactory { // end required early initialization // this instance can't create wals, just reader/writers. - provider = null; factoryId = SINGLETON_ID; + if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { + WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); + provider.init(this, conf, null); + provider.addWALActionsListener(new MetricsWAL()); + this.provider = provider; + } else { + // special handling of existing configuration behavior. + LOG.warn("Running with WAL disabled."); + provider = new DisabledWALProvider(); + provider.init(this, conf, factoryId); + } } @VisibleForTesting @@ -286,10 +291,6 @@ public class WALFactory { } } - public Reader createReader(final FileSystem fs, final Path path) throws IOException { - return createReader(fs, path, (CancelableProgressable)null); - } - /** * Create a reader for the WAL. If you are reading from a file that's being written to and need * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method @@ -297,85 +298,10 @@ public class WALFactory { * @return A WAL reader. Close when done with it. * @throws IOException */ - public Reader createReader(final FileSystem fs, final Path path, + public Reader createReader(final WALIdentity walId, CancelableProgressable reporter) throws IOException { - return createReader(fs, path, reporter, true); - } - - public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, - boolean allowCustom) throws IOException { - Class lrClass = - allowCustom ? logReaderClass : ProtobufLogReader.class; - try { - // A wal file could be under recovery, so it may take several - // tries to get it open. Instead of claiming it is corrupted, retry - // to open it up to 5 minutes by default. - long startWaiting = EnvironmentEdgeManager.currentTime(); - long openTimeout = timeoutMillis + startWaiting; - int nbAttempt = 0; - AbstractFSWALProvider.Reader reader = null; - while (true) { - try { - reader = lrClass.getDeclaredConstructor().newInstance(); - reader.init(fs, path, conf, null); - return reader; - } catch (IOException e) { - if (reader != null) { - try { - reader.close(); - } catch (IOException exception) { - LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); - LOG.debug("exception details", exception); - } - } - - String msg = e.getMessage(); - if (msg != null - && (msg.contains("Cannot obtain block length") - || msg.contains("Could not obtain the last block") || msg - .matches("Blocklist for [^ ]* has changed.*"))) { - if (++nbAttempt == 1) { - LOG.warn("Lease should have recovered. This is not expected. Will retry", e); - } - if (reporter != null && !reporter.progress()) { - throw new InterruptedIOException("Operation is cancelled"); - } - if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { - LOG.error("Can't open after " + nbAttempt + " attempts and " - + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); - } else { - try { - Thread.sleep(nbAttempt < 3 ? 500 : 1000); - continue; // retry - } catch (InterruptedException ie) { - InterruptedIOException iioe = new InterruptedIOException(); - iioe.initCause(ie); - throw iioe; - } - } - throw new LeaseNotRecoveredException(e); - } else { - throw e; - } - } - } - } catch (IOException ie) { - throw ie; - } catch (Exception e) { - throw new IOException("Cannot get log reader", e); - } - } - - /** - * Create a writer for the WAL. - * Uses defaults. - *

- * Should be package-private. public only for tests and - * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} - * @return A WAL writer. Close when done with it. - */ - public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, false); + WALProvider provider = getWALProvider(); + return provider.createReader(walId, reporter, true); } /** @@ -384,9 +310,10 @@ public class WALFactory { * @return an overwritable writer for recovered edits. caller should close. */ @VisibleForTesting - public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) + public Writer createRecoveredEditsWriter(final WALIdentity walId) throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, true); + WALProvider provider = getWALProvider(); + return provider.createWriter(conf, walId, true); } // These static methods are currently used where it's impractical to @@ -400,7 +327,12 @@ public class WALFactory { public static WALFactory getInstance(Configuration configuration) { WALFactory factory = singleton.get(); if (null == factory) { - WALFactory temp = new WALFactory(configuration); + WALFactory temp = null; + try { + temp = new WALFactory(configuration); + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } if (singleton.compareAndSet(null, temp)) { factory = temp; } else { @@ -421,41 +353,10 @@ public class WALFactory { * If you already have a WALFactory, you should favor the instance method. * @return a WAL Reader, caller must close. */ - public static Reader createReader(final FileSystem fs, final Path path, + public static Reader createReader(final WALIdentity walId, final Configuration configuration) throws IOException { - return getInstance(configuration).createReader(fs, path); - } - - /** - * Create a reader for the given path, accept custom reader classes from conf. - * If you already have a WALFactory, you should favor the instance method. - * @return a WAL Reader, caller must close. - */ - static Reader createReader(final FileSystem fs, final Path path, - final Configuration configuration, final CancelableProgressable reporter) throws IOException { - return getInstance(configuration).createReader(fs, path, reporter); - } - - /** - * Create a reader for the given path, ignore custom reader classes from conf. - * If you already have a WALFactory, you should favor the instance method. - * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} - * @return a WAL Reader, caller must close. - */ - public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path, - final Configuration configuration) throws IOException { - return getInstance(configuration).createReader(fs, path, null, false); - } - - /** - * If you already have a WALFactory, you should favor the instance method. - * Uses defaults. - * @return a Writer that will overwrite files. Caller must close. - */ - static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, - final Configuration configuration) - throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, true); + WALProvider provider = getInstance(configuration).getWALProvider(); + return provider.createReader(walId, null, true); } /** @@ -464,10 +365,10 @@ public class WALFactory { * @return a writer that won't overwrite files. Caller must close. */ @VisibleForTesting - public static Writer createWALWriter(final FileSystem fs, final Path path, - final Configuration configuration) - throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, false); + public static Writer createWALWriter(final WALIdentity walId, + final Configuration configuration) throws IOException { + WALProvider provider = getInstance(configuration).getWALProvider(); + return provider.createWriter(configuration, walId, false); } public final WALProvider getWALProvider() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java index 281f3c9..9192160 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java @@ -259,7 +259,9 @@ public class WALPrettyPrinter { throw new IOException(p + " is not a file"); } - WAL.Reader log = WALFactory.createReader(fs, p, conf); + WALFactory factory = new WALFactory(conf, "pretty-printer"); + WALProvider provider = factory.getWALProvider(); + WAL.Reader log = provider.createReader(provider.createWALIdentity(p.toString()), null, true); if (log instanceof ProtobufLogReader) { List writerClsNames = ((ProtobufLogReader) log).getWriterClsNames(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 244a636..3775000 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -23,10 +23,17 @@ import java.io.IOException; import java.util.List; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.PriorityBlockingQueue; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.yetus.audience.InterfaceAudience; /** @@ -82,6 +89,11 @@ public interface WALProvider { void sync(boolean forceSync) throws IOException; void append(WAL.Entry entry) throws IOException; + /** + * @throws IOException if something goes wrong initializing an output stream + */ + void init(WALIdentity path, Configuration c, boolean overwritable, long blocksize) + throws IOException; } interface AsyncWriter extends WriterBase { @@ -113,4 +125,61 @@ public interface WALProvider { return path -> getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)) .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); } + + /** + * Streaming implementation to retrieve WAL entries from given set of Wals. This class is given a queue of WAL + * @param logQueue Queue of wals + * @param conf configuration + * @param startPosition start position for the first wal in the queue + * @param walFileSizeProvider + * @param serverName name of the server + * @param metrics metric source + * @return WALEntryStream instance + * @throws IOException + */ + WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException; + + /** + * Create a reader for the given WAL + * @param walId identity of the WAL + * @param reporter progress reporter + * @param allowCustom true if accepting custom reader classes from conf. + * @return a WAL Reader, caller must close. + */ + Reader createReader(final WALIdentity walId, CancelableProgressable reporter, + boolean allowCustom) throws IOException; + + /** + * Hook for performing recovery work needed for the given WAL. + * The implementation must prevent further appends to the WAL, identified by id, + * after this method returns. Otherwise there may be dataloss if some region server + * continues to write to the WAL. + * + * @param walId identity of the WAL + * @param conf Configuration instance + * @param reporter progress reporter + */ + void preRecovery(WALIdentity id, Configuration conf, CancelableProgressable reporter) + throws IOException; + + /** + * Creates WALIdentity for WAL path/name. + * The name should be uniquely identifying a WAL in this WALProvider. + * + * Note: this method is subject to change / removal upon future WAL refactoring + * + * @param wal the WAL + * @return WALIdentity instance for the WAL + */ + WALIdentity createWALIdentity(String wal); + + public Writer createWriter(final Configuration conf, final WALIdentity walId, + final boolean overwritable) throws IOException; + + /* + * @return start time of the underlying WAL + */ + long getWALStartTime(WALIdentity id); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index bc67d98..4c16d3e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -140,7 +140,7 @@ public class WALSplitter { protected Map> regionMaxSeqIdInStores = new ConcurrentHashMap<>(); // the file being split currently - private FileStatus fileBeingSplit; + private WALIdentity fileBeingSplit; // if we limit the number of writers opened for sinking recovered edits private final boolean splitWriterCreationBounded; @@ -150,14 +150,18 @@ public class WALSplitter { @VisibleForTesting WALSplitter(final WALFactory factory, Configuration conf, Path walDir, - FileSystem walFS, LastSequenceId idChecker, + LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.walDir = walDir; - this.walFS = walFS; + try { + this.walFS = walDir.getFileSystem(conf); + } catch (IOException ioe) { + throw new IllegalArgumentException("unable to get FS for " + walDir, ioe); + } this.sequenceIdChecker = idChecker; this.splitLogWorkerCoordination = splitLogWorkerCoordination; @@ -187,11 +191,11 @@ public class WALSplitter { *

* @return false if it is interrupted by the progress-able. */ - public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, + public static boolean splitLogFile(Path walDir, WALIdentity logfile, FileSystem walFS, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory) throws IOException { - WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, + WALSplitter s = new WALSplitter(factory, conf, walDir, idChecker, splitLogWorkerCoordination); return s.splitLogFile(logfile, reporter); } @@ -206,10 +210,11 @@ public class WALSplitter { final FileStatus[] logfiles = SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null); List splits = new ArrayList<>(); + WALProvider provider = factory.getWALProvider(); if (ArrayUtils.isNotEmpty(logfiles)) { for (FileStatus logfile: logfiles) { - WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null); - if (s.splitLogFile(logfile, null)) { + WALSplitter s = new WALSplitter(factory, conf, rootDir, null, null); + if (s.splitLogFile(provider.createWALIdentity(logfile.getPath().toString()), null)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { splits.addAll(s.outputSink.splits); @@ -228,27 +233,26 @@ public class WALSplitter { * @param logfile should be an actual log file. */ @VisibleForTesting - boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException { + boolean splitLogFile(WALIdentity logfile, CancelableProgressable reporter) throws IOException { Preconditions.checkState(status == null); - Preconditions.checkArgument(logfile.isFile(), - "passed in file status is for something other than a regular file."); boolean isCorrupted = false; boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", SPLIT_SKIP_ERRORS_DEFAULT); int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); - Path logPath = logfile.getPath(); + String logPath = logfile.getName(); boolean outputSinkStarted = false; boolean progress_failed = false; int editsCount = 0; int editsSkipped = 0; status = TaskMonitor.get().createStatus( - "Splitting log file " + logfile.getPath() + "into a temporary staging area."); + "Splitting log file " + logPath + "into a temporary staging area."); Reader logFileReader = null; this.fileBeingSplit = logfile; try { - long logLength = logfile.getLen(); - LOG.info("Splitting WAL={}, length={}", logPath, logLength); + // long logLength = logfile.getLen(); + LOG.info("Splitting WAL={}, length={}", logPath //, logLength + ); status.setStatus("Opening log file"); if (reporter != null && !reporter.progress()) { progress_failed = true; @@ -266,7 +270,7 @@ public class WALSplitter { outputSinkStarted = true; Entry entry; Long lastFlushedSequenceId = -1L; - while ((entry = getNextLogLine(logFileReader, logPath, skipErrors)) != null) { + while ((entry = getNextLogLine(logFileReader, logfile, skipErrors)) != null) { byte[] region = entry.getKey().getEncodedRegionName(); String encodedRegionNameAsStr = Bytes.toString(region); lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); @@ -323,10 +327,10 @@ public class WALSplitter { LOG.warn("Could not parse, corrupted WAL={}", logPath, e); if (splitLogWorkerCoordination != null) { // Some tests pass in a csm of null. - splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), walFS); + splitLogWorkerCoordination.markCorrupted(walDir, logPath, walFS); } else { // for tests only - ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), walFS); + ZKSplitLog.markCorrupted(walDir, logPath, walFS); } isCorrupted = true; } catch (IOException e) { @@ -352,7 +356,7 @@ public class WALSplitter { String msg = "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions() + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath + - ", length=" + logfile.getLen() + // See if length got updated post lease recovery + //", length=" + logfile.getLen() + // See if length got updated post lease recovery ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed; LOG.info(msg); status.markComplete(msg); @@ -715,39 +719,37 @@ public class WALSplitter { * @throws IOException * @throws CorruptedLogFileException */ - protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter) + protected Reader getReader(WALIdentity file, boolean skipErrors, CancelableProgressable reporter) throws IOException, CorruptedLogFileException { - Path path = file.getPath(); - long length = file.getLen(); Reader in; // Check for possibly empty file. With appends, currently Hadoop reports a // zero length even if the file has been sync'd. Revisit if HDFS-376 or // HDFS-878 is committed. + /* long length = file.getLen(); if (length <= 0) { LOG.warn("File {} might be still open, length is 0", path); - } + } */ try { - FSUtils.getInstance(walFS, conf).recoverFileLease(walFS, path, conf, reporter); try { - in = getReader(path, reporter); + in = getReader(file, reporter); } catch (EOFException e) { - if (length <= 0) { + /* if (length <= 0) { // TODO should we ignore an empty, not-last log file if skip.errors // is false? Either way, the caller should decide what to do. E.g. // ignore if this is the last log in sequence. // TODO is this scenario still possible if the log has been // recovered (i.e. closed) LOG.warn("Could not open {} for reading. File is empty", path, e); - } + } */ // EOFException being ignored return null; } } catch (IOException e) { if (e instanceof FileNotFoundException) { // A wal file may not exist anymore. Nothing can be recovered so move on - LOG.warn("File {} does not exist anymore", path, e); + LOG.warn("File {} does not exist anymore", file, e); return null; } if (!skipErrors || e instanceof InterruptedIOException) { @@ -755,14 +757,14 @@ public class WALSplitter { } CorruptedLogFileException t = new CorruptedLogFileException("skipErrors=true Could not open wal " + - path + " ignoring"); + file + " ignoring"); t.initCause(e); throw t; } return in; } - static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) + static private Entry getNextLogLine(Reader in, WALIdentity path, boolean skipErrors) throws CorruptedLogFileException, IOException { try { return in.next(); @@ -796,15 +798,19 @@ public class WALSplitter { */ protected Writer createWriter(Path logfile) throws IOException { - return walFactory.createRecoveredEditsWriter(walFS, logfile); + WALProvider provider = walFactory.getWALProvider(); + return walFactory.createRecoveredEditsWriter(provider.createWALIdentity(logfile.toString())); } /** * Create a new {@link Reader} for reading logs to split. * @return new Reader instance, caller should close */ - protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { - return walFactory.createReader(walFS, curLogFile, reporter); + protected Reader getReader(WALIdentity curLogFile, CancelableProgressable reporter) + throws IOException { + WALProvider provider = walFactory.getWALProvider(); + provider.preRecovery(curLogFile, conf, reporter); + return provider.createReader(curLogFile, reporter, true); } /** @@ -1282,7 +1288,9 @@ public class WALSplitter { private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException { long dstMinLogSeqNum = -1L; - try (WAL.Reader reader = walFactory.createReader(walFS, dst)) { + WALProvider provider = walFactory.getWALProvider(); + try (WAL.Reader reader = provider.createReader(provider.createWALIdentity(dst.toString()), + null, true)) { WAL.Entry entry = reader.next(); if (entry != null) { dstMinLogSeqNum = entry.getKey().getSequenceId(); @@ -1508,7 +1516,7 @@ public class WALSplitter { String tmpDirName = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); Path regionedits = getRegionSplitEditsPath(entry, - fileBeingSplit.getPath().getName(), tmpDirName, conf); + fileBeingSplit.getName(), tmpDirName, conf); if (regionedits == null) { return null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java new file mode 100644 index 0000000..cef0a75 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALIdentity.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience; + +/* + * This is distributed FS oriented implementation for WALIdentity + */ +@InterfaceAudience.Private +public class FSWALIdentity implements WALIdentity{ + private String name; + private Path path; + + public FSWALIdentity(String name) { + this.path = new Path(name); + this.name = path.getName(); + } + + public FSWALIdentity(Path path) { + if (path == null) { + throw new IllegalArgumentException("path cannot be null"); + } + this.path = path; + this.name = path.getName(); + } + + @Override + public String getName() { + return name; + } + + /** + * @return {@link Path} object of the name encapsulated in WALIdentity + */ + public Path getPath() { + return path; + } + + @Override + public int compareTo(WALIdentity o) { + FSWALIdentity that = (FSWALIdentity)o; + return this.path.compareTo(that.getPath()); + } + + @Override + public String toString() { + return this.path.toString(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof FSWALIdentity)) { + return false; + } + FSWALIdentity that = (FSWALIdentity) obj; + return this.path.equals(that.getPath()); + } + @Override + public int hashCode() { + return this.path.hashCode(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java new file mode 100644 index 0000000..fa7d2fa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALIdentity.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * This interface defines the identification of WAL for both stream based and distributed FileSystem + * based environments. + * See {@link #getName()} method. + */ +@InterfaceAudience.Private +public interface WALIdentity extends Comparable { + + /** + * WALIdentity is uniquely identifying a WAL stored in this WALProvider. + * This name can be thought of as a human-readable, serialized form of the WALIdentity. + * + * The same value should be returned across calls to this method. + * + * @return name of the wal + */ + String getName(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java new file mode 100644 index 0000000..6707b8e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java @@ -0,0 +1,300 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.OptionalLong; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Streaming access to WAL entries. This class is given a queue of WAL {@link WALIdentity}, and continually + * iterates through all the WAL {@link Entry} in the queue. When it's done reading from an Entry, it + * dequeues and starts reading from the next Entry. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class AbstractWALEntryStream implements WALEntryStream { + private static final Logger LOG = LoggerFactory.getLogger(AbstractWALEntryStream.class); + + protected Reader reader; + protected WALIdentity currentPath; + // cache of next entry for hasNext() + protected Entry currentEntry; + // position for the current entry. As now we support peek, which means that the upper layer may + // choose to return before reading the current entry, so it is not safe to return the value below + // in getPosition. + protected long currentPositionOfEntry = 0; + // position after reading current entry + protected long currentPositionOfReader = 0; + protected final PriorityBlockingQueue logQueue; + protected final Configuration conf; + protected final WALFileLengthProvider WALFileLengthProvider; + // which region server the WALs belong to + protected final ServerName serverName; + protected final MetricsSource metrics; + + protected boolean eofAutoRecovery; + private WALProvider provider; + + /** + * Create an entry stream over the given queue at the given start position + * @param logQueue the queue of WAL paths + * @param conf {@link Configuration} to use to create {@link Reader} for this stream + * @param startPosition the position in the first WAL to start reading at + * @param serverName the server name which all WALs belong to + * @param metrics replication metrics + * @throws IOException + */ + public AbstractWALEntryStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider WALFileLengthProvider, ServerName serverName, + MetricsSource metrics, WALProvider provider) throws IOException { + this.logQueue = logQueue; + this.conf = conf; + this.currentPositionOfEntry = startPosition; + this.WALFileLengthProvider = WALFileLengthProvider; + this.serverName = serverName; + this.metrics = metrics; + this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); + this.provider = provider; + } + + @Override + public boolean hasNext() throws IOException { + if (currentEntry == null) { + try { + tryAdvanceEntry(); + } catch (IOException e) { + handleIOException(logQueue.peek(), e); + } + } + return currentEntry != null; + } + + @Override + public Entry peek() throws IOException { + return hasNext() ? currentEntry: null; + } + + @Override + public void seek(long pos) throws IOException { + reader.seek(pos); + } + + @Override + public Entry next(Entry reuse) throws IOException { + return next(); + } + + @Override + public Entry next() throws IOException { + Entry save = peek(); + currentPositionOfEntry = currentPositionOfReader; + currentEntry = null; + return save; + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + closeReader(); + } + + @Override + public WALIdentity getCurrentWALIdentity() { + return currentPath; + } + + @Override + public long getPosition() { + return currentPositionOfEntry; + } + + @Override + public void reset() throws IOException { + if (reader != null && currentPath != null) { + resetReader(); + } + } + + protected void setPosition(long position) { + currentPositionOfEntry = position; + } + + abstract void setCurrentPath(WALIdentity path); + + private void tryAdvanceEntry() throws IOException { + if (checkReader()) { + boolean beingWritten = readNextEntryAndRecordReaderPosition(); + if (currentEntry == null && !beingWritten) { + // no more entries in this log file, and the file is already closed, i.e, rolled + // Before dequeueing, we should always get one more attempt at reading. + // This is in case more entries came in after we opened the reader, and the log is rolled + // while we were reading. See HBASE-6758 + resetReader(); + readNextEntryAndRecordReaderPosition(); + if (currentEntry == null) { + if (checkAllBytesParsed()) { // now we're certain we're done with this log file + dequeueCurrentLog(); + if (openNextLog()) { + readNextEntryAndRecordReaderPosition(); + } + } + } + } + // if currentEntry != null then just return + // if currentEntry == null but the file is still being written, then we should not switch to + // the next log either, just return here and try next time to see if there are more entries in + // the current file + } + // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) + } + + + + private void dequeueCurrentLog() throws IOException { + LOG.debug("Reached the end of log {}", currentPath); + closeReader(); + logQueue.remove(); + setPosition(0); + metrics.decrSizeOfLogQueue(); + } + + /** + * Returns whether the file is opened for writing. + */ + private boolean readNextEntryAndRecordReaderPosition() throws IOException { + Entry readEntry = reader.next(); + long readerPos = reader.getPosition(); + OptionalLong fileLength = WALFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); + if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { + // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted + // data, so we need to make sure that we do not read beyond the committed file length. + if (LOG.isDebugEnabled()) { + LOG.debug("The provider tells us the valid length for " + currentPath + " is " + + fileLength.getAsLong() + ", but we have advanced to " + readerPos); + } + resetReader(); + return true; + } + if (readEntry != null) { + metrics.incrLogEditsRead(); + metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); + } + currentEntry = readEntry; // could be null + this.currentPositionOfReader = readerPos; + return fileLength.isPresent(); + } + + private void closeReader() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + } + + // if we don't have a reader, open a reader on the next log + private boolean checkReader() throws IOException { + if (reader == null) { + return openNextLog(); + } + return true; + } + + // open a reader on the next log in queue + abstract boolean openNextLog() throws IOException; + + protected void openReader(WALIdentity path) throws IOException { + try { + // Detect if this is a new file, if so get a new reader else + // reset the current reader so that we see the new data + if (reader == null || !currentPath.equals(path)) { + closeReader(); + reader = createReader(path, conf); + seek(); + setCurrentPath(path); + } else { + resetReader(); + } + } catch (RemoteException re) { + IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); + handleIOException (path, ioe); + } catch (IOException ioe) { + handleIOException(path, ioe); + } + } + + /** + * Creates a reader for a wal info + * + * @param WALIdentity walIdentity for FS based or stream name for stream based wal provider + * @param conf + * @return return a reader for the file + * @throws IOException + */ + protected Reader createReader(WALIdentity walId, Configuration conf) + throws IOException { + return provider.createReader(walId, null, false); + } + + protected void resetReader() throws IOException { + try { + currentEntry = null; + reader.reset(); + seek(); + } catch (NullPointerException npe) { + throw new IOException("NPE resetting reader, likely HDFS-4380", npe); + } catch (IOException e) { + handleIOException(currentPath, e); + } + } + + /** + * Implement for handling IO exceptions , throw back if doesn't need to be handled + * @param WALIdentity + * @param ioe IOException + * @throws IOException + */ + protected abstract void handleIOException(WALIdentity WALIdentity, IOException e) throws IOException; + + protected void seek() throws IOException { + if (currentPositionOfEntry != 0) { + reader.seek(currentPositionOfEntry); + } + } + + + protected boolean checkAllBytesParsed() throws IOException { + return true; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java new file mode 100644 index 0000000..3bc5fd5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALIdentity; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class FSRecoveredReplicationSource extends RecoveredReplicationSource { + + private static final Logger LOG = LoggerFactory.getLogger(FSRecoveredReplicationSource.class); + private String logDir; + + @Override + public void init(Configuration conf, ReplicationSourceManager manager, + ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, + String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + MetricsSource metrics, WALProvider walProvider) throws IOException { + super.init(conf, manager, queueStorage, replicationPeer, server, peerClusterZnode, clusterId, + walFileLengthProvider, metrics, walProvider); + this.logDir = AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString()); + } + + @Override + public void locateRecoveredWALIdentities(PriorityBlockingQueue queue) + throws IOException { + boolean hasWALIdentityChanged = false; + PriorityBlockingQueue newWALIdentities = + new PriorityBlockingQueue(queueSizePerGroup, + new LogsComparator(this.walProvider)); + FileSystem fs = null; + WALIdentityLoop: + for (WALIdentity walIdentity : queue) { + FSWALIdentity fsWal = (FSWALIdentity) walIdentity; + if (fs == null) { + fs = fsWal.getPath().getFileSystem(conf); + } + if (fs.exists(fsWal.getPath())) { + // still in same location, don't need to do anything + newWALIdentities.add(walIdentity); + continue; + } + // WALIdentity changed - try to find the right WALIdentity. + hasWALIdentityChanged = true; + if (server instanceof ReplicationSyncUp.DummyServer) { + // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data + // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists + WALIdentity newWALIdentity = getReplSyncUpPath(fs, fsWal.getPath()); + newWALIdentities.add(newWALIdentity); + continue; + } else { + // See if Path exists in the dead RS folder (there could be a chain of failures + // to look at) + List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); + LOG.info("NB dead servers : " + deadRegionServers.size()); + final Path walDir = FSUtils.getWALRootDir(conf); + for (ServerName curDeadServerName : deadRegionServers) { + final Path deadRsDirectory = + new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName + .getServerName())); + Path[] locs = new Path[] { new Path(deadRsDirectory, walIdentity.getName()), new Path( + deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), walIdentity.getName()) }; + for (Path possibleLogLocation : locs) { + LOG.info("Possible location " + possibleLogLocation.toUri().toString()); + if (fs.exists(possibleLogLocation)) { + // We found the right new location + LOG.info("Log " + walIdentity + " still exists at " + possibleLogLocation); + newWALIdentities.add(new FSWALIdentity(possibleLogLocation)); + continue WALIdentityLoop; + } + } + } + // didn't find a new location + LOG.error(String.format("WAL Path %s doesn't exist and couldn't find its new location", + walIdentity)); + newWALIdentities.add(walIdentity); + } + } + + if (hasWALIdentityChanged) { + if (newWALIdentities.size() != queue.size()) { // this shouldn't happen + LOG.error("Recovery queue size is incorrect"); + throw new IOException("Recovery queue size error"); + } + // put the correct locations in the queue + // since this is a recovered queue with no new incoming logs, + // there shouldn't be any concurrency issues + queue.clear(); + for (WALIdentity WALIdentity : newWALIdentities) { + queue.add(WALIdentity); + } + } + } + + // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal + // area rather than to the wal area for a particular region server. + private WALIdentity getReplSyncUpPath(FileSystem fs, Path path) throws IOException { + FileStatus[] rss = fs.listStatus(manager.getLogDir()); + for (FileStatus rs : rss) { + Path p = rs.getPath(); + FileStatus[] logs = fs.listStatus(p); + for (FileStatus log : logs) { + p = new Path(p, log.getPath().getName()); + if (p.getName().equals(path.getName())) { + LOG.info("Log " + p.getName() + " found at " + p); + return this.walProvider.createWALIdentity(p.toString()); + } + } + } + LOG.error("Didn't find path for: " + path.getName()); + return this.walProvider.createWALIdentity(path.toString()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java new file mode 100644 index 0000000..4ef009d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java @@ -0,0 +1,253 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.wal.FSWALIdentity; +import org.apache.hadoop.hbase.wal.WALIdentity; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Streaming access to WAL entries. This class is given a queue of WAL {@link WALIdentity}, and continually + * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it + * dequeues it and starts reading from the next. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FSWALEntryStream extends AbstractWALEntryStream { + private static final Logger LOG = LoggerFactory.getLogger(FSWALEntryStream.class); + + private FileSystem fs; + + public FSWALEntryStream(FileSystem fs, PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics, WALProvider provider) throws IOException { + super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, provider); + this.fs = fs; + } + + @Override + // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file + protected boolean checkAllBytesParsed() throws IOException { + // -1 means the wal wasn't closed cleanly. + final long trailerSize = currentTrailerSize(); + FileStatus stat = null; + try { + stat = fs.getFileStatus(((FSWALIdentity)this.currentPath).getPath()); + } catch (IOException exception) { + LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", + currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat()); + metrics.incrUnknownFileLengthForClosedWAL(); + } + // Here we use currentPositionOfReader instead of currentPositionOfEntry. + // We only call this method when currentEntry is null so usually they are the same, but there + // are two exceptions. One is we have nothing in the file but only a header, in this way + // the currentPositionOfEntry will always be 0 since we have no change to update it. The other + // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the + // last valid entry, and the currentPositionOfReader will usually point to the end of the file. + if (stat != null) { + if (trailerSize < 0) { + if (currentPositionOfReader < stat.getLen()) { + final long skippedBytes = stat.getLen() - currentPositionOfReader; + LOG.debug( + "Reached the end of WAL file '{}'. It was not closed cleanly," + + " so we did not parse {} bytes of data. This is normally ok.", + currentPath, skippedBytes); + metrics.incrUncleanlyClosedWALs(); + metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); + } + } else if (currentPositionOfReader + trailerSize < stat.getLen()) { + LOG.warn( + "Processing end of WAL file '{}'. At position {}, which is too far away from" + + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", + currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); + setPosition(0); + resetReader(); + metrics.incrRestartedWALReading(); + metrics.incrRepeatedFileBytes(currentPositionOfReader); + return false; + } + } + if (LOG.isTraceEnabled()) { + LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + + (stat == null ? "N/A" : stat.getLen())); + } + metrics.incrCompletedWAL(); + return true; + } + + private Path getArchivedLog(Path path) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + + // Try found the log in old dir + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } + + // Try found the log in the seperate old log dir + oldLogDir = new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) + .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); + archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } + + LOG.error("Couldn't locate log: " + path); + return path; + } + + @Override + void setCurrentPath(WALIdentity path) { + this.currentPath = path; + } + + @Override + // open a reader on the next log in queue + boolean openNextLog() throws IOException { + WALIdentity nextPath = logQueue.peek(); + if (nextPath != null) { + openReader(nextPath); + if (reader != null) { + return true; + } + } else { + // no more files in queue, this could happen for recovered queue, or for a wal group of a sync + // replication peer which has already been transited to DA or S. + setCurrentPath(null); + } + return false; + } + + @Override + protected void openReader(WALIdentity walId) throws IOException { + try { + super.openReader(walId); + } catch (NullPointerException npe) { + // Workaround for race condition in HDFS-4380 + // which throws a NPE if we open a file before any data node has the most recent block + // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. + LOG.warn("Got NPE opening reader, will retry."); + reader = null; + } + } + + private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { + // If the log was archived, continue reading from there + Path archivedLog = getArchivedLog(path); + if (!path.equals(archivedLog)) { + openReader(new FSWALIdentity(archivedLog)); + } else { + throw fnfe; + } + } + + // For HBASE-15019 + private void recoverLease(final Configuration conf, final Path path) { + try { + final FileSystem dfs = FSUtils.getCurrentFileSystem(conf); + FSUtils fsUtils = FSUtils.getInstance(dfs, conf); + fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { + @Override + public boolean progress() { + LOG.debug("recover WAL lease: " + path); + return true; + } + }); + } catch (IOException e) { + LOG.warn("unable to recover lease for WAL: " + path, e); + } + } + + @Override + protected void handleIOException(WALIdentity path, IOException e) throws IOException { + try { + throw e; + } catch (FileNotFoundException fnfe) { + handleFileNotFound(((FSWALIdentity)path).getPath(), fnfe); + } catch (EOFException eo) { + handleEofException(eo); + } catch (LeaseNotRecoveredException lnre) { + // HBASE-15019 the WAL was not closed due to some hiccup. + LOG.warn("Try to recover the WAL lease " + currentPath, lnre); + recoverLease(conf, ((FSWALIdentity)currentPath).getPath()); + reader = null; + } + } + + private long currentTrailerSize() { + long size = -1L; + if (reader instanceof ProtobufLogReader) { + final ProtobufLogReader pblr = (ProtobufLogReader) reader; + size = pblr.trailerSize(); + } + return size; + } + + // if we get an EOF due to a zero-length log, and there are other logs in queue + // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is + // enabled, then dump the log + private void handleEofException(IOException e) { + if ((e instanceof EOFException || e.getCause() instanceof EOFException) && logQueue.size() > 1 + && this.eofAutoRecovery) { + try { + if (fs.getFileStatus(((FSWALIdentity)logQueue.peek()).getPath()).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); + logQueue.remove(); + setPosition(0); + } + } catch (IOException ioe) { + LOG.warn("Couldn't get file length information about log " + logQueue.peek()); + } + } + } + + private String getCurrentPathStat() { + StringBuilder sb = new StringBuilder(); + if (currentPath != null) { + sb.append("currently replicating from: ").append(currentPath).append(" at position: ") + .append(currentPositionOfEntry).append("\n"); + } else { + sb.append("no replication ongoing, waiting for new log"); + } + return sb.toString(); + } + +}