diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4b6da53edd..0dde6c80fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4698,7 +4698,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Opening recovered edits"); WAL.Reader reader = null; try { - reader = WALFactory.createReader(fs, edits, conf); + reader = WALFactory.getInstance(conf).createReader(fs, edits, null, true); long currentEditSeqId = -1; long currentReplaySeqId = -1; long firstSeqIdInLog = -1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 7c7b4cc02e..8f7f50d371 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2136,12 +2136,12 @@ public class HRegionServer extends HasThread implements @Override public List getWALs() throws IOException { - return walFactory.getWALs(); + return walFactory.getWALProvider().getWALs(); } @Override public WAL getWAL(RegionInfo regionInfo) throws IOException { - WAL wal = walFactory.getWAL(regionInfo); + WAL wal = walFactory.getWALProvider().getWAL(regionInfo); if (this.walRoller != null) { this.walRoller.addWAL(wal); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java index 13f5d6ef35..f1ebce5cef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java @@ -72,7 +72,7 @@ public class Compressor { FileSystem inFS = input.getFileSystem(conf); FileSystem outFS = output.getFileSystem(conf); - WAL.Reader in = WALFactory.createReaderIgnoreCustomClass(inFS, input, conf); + WAL.Reader in = WALFactory.getInstance(conf).createReader(inFS, input, null, false); WALProvider.Writer out = null; try { @@ -82,7 +82,7 @@ public class Compressor { } boolean compress = ((ReaderBase)in).hasCompression(); conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress); - out = WALFactory.createWALWriter(outFS, output, conf); + out = WALFactory.getInstance(conf).createWALWriter(outFS, output, false); WAL.Entry e = null; while ((e = in.next()) != null) out.append(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java index f544802582..3c30619591 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java @@ -215,7 +215,7 @@ public class FSWALEntryStream extends AbstractWALEntryStream { @Override protected Reader createReader(WALIdentity walId, Configuration conf) throws IOException { Path path = ((FSWALIdentity) walId).getPath(); - return WALFactory.createReader(fs, path, conf); + return walProvider.createReader(fs, path, null, true); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java index 24963f1ad0..97446d1fca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java @@ -141,7 +141,7 @@ public class ReplaySyncReplicationWALCallable implements RSProcedureCallable { long length = rs.getWALFileSystem().getFileStatus(path).getLen(); try { FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf); - return WALFactory.createReader(rs.getWALFileSystem(), path, rs.getConfiguration()); + return WALFactory.getInstance(conf).createReader(rs.getWALFileSystem(), path, null, true); } catch (EOFException e) { if (length <= 0) { LOG.warn("File is empty. Could not open {} for reading because {}", path, e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 1b33ce3f82..790db798a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.wal; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.regionserver.FSRecoveredReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; @@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSour import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.yetus.audience.InterfaceAudience; @@ -106,6 +109,16 @@ public abstract class AbstractFSWALProvider> implemen private Path rootDir; private Path oldLogDir; + + /** + * How long to attempt opening in-recovery wals + */ + private int timeoutMillis; + + /** + * Configuration-specified WAL Reader used when a custom reader is requested + */ + private Class logReaderClass; /** * @param factory factory that made us, identity used for FS layout. may not be null @@ -133,6 +146,10 @@ public abstract class AbstractFSWALProvider> implemen logPrefix = sb.toString(); rootDir = FSUtils.getRootDir(conf); oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); + /* TODO Both of these are probably specific to the fs wal provider */ + logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, + AbstractFSWALProvider.Reader.class); doInit(conf); } @@ -476,7 +493,7 @@ public abstract class AbstractFSWALProvider> implemen * @return WAL Reader instance * @throws IOException */ - public static org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf) + public org.apache.hadoop.hbase.wal.WAL.Reader openReader(Path path, Configuration conf) throws IOException { @@ -489,7 +506,7 @@ public abstract class AbstractFSWALProvider> implemen try { // Detect if this is a new file, if so get a new reader else // reset the current reader so that we see the new data - reader = WALFactory.createReader(path.getFileSystem(conf), path, conf); + reader = createReader(path.getFileSystem(conf), path, null, true); return reader; } catch (FileNotFoundException fnfe) { // If the log was archived, continue reading from there @@ -595,4 +612,69 @@ public abstract class AbstractFSWALProvider> implemen } return new FSWALIdentity(walPath); } + + @Override + public org.apache.hadoop.hbase.wal.WAL.Reader createReader(FileSystem fs, Path path, + CancelableProgressable reporter, boolean allowCustom) throws IOException { + Class lrClass = + allowCustom ? logReaderClass : ProtobufLogReader.class; + try { + // A wal file could be under recovery, so it may take several + // tries to get it open. Instead of claiming it is corrupted, retry + // to open it up to 5 minutes by default. + long startWaiting = EnvironmentEdgeManager.currentTime(); + long openTimeout = timeoutMillis + startWaiting; + int nbAttempt = 0; + AbstractFSWALProvider.Reader reader = null; + while (true) { + try { + reader = lrClass.getDeclaredConstructor().newInstance(); + reader.init(fs, path, conf, null); + return reader; + } catch (IOException e) { + if (reader != null) { + try { + reader.close(); + } catch (IOException exception) { + LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); + LOG.debug("exception details", exception); + } + } + + String msg = e.getMessage(); + if (msg != null + && (msg.contains("Cannot obtain block length") + || msg.contains("Could not obtain the last block") || msg + .matches("Blocklist for [^ ]* has changed.*"))) { + if (++nbAttempt == 1) { + LOG.warn("Lease should have recovered. This is not expected. Will retry", e); + } + if (reporter != null && !reporter.progress()) { + throw new InterruptedIOException("Operation is cancelled"); + } + if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { + LOG.error("Can't open after " + nbAttempt + " attempts and " + + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); + } else { + try { + Thread.sleep(nbAttempt < 3 ? 500 : 1000); + continue; // retry + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + throw new LeaseNotRecoveredException(e); + } else { + throw e; + } + } + } + } catch (IOException ie) { + throw ie; + } catch (Exception e) { + throw new IOException("Cannot get log reader", e); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 062b3688d3..9311e7aee7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -132,4 +132,11 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { return false; } } + + @Override + public Writer createWriter(Configuration conf, FileSystem fs, Path path, boolean overwritable) + throws IOException { + return FSHLogProvider.createWriter(conf, fs, path, overwritable, + WALUtil.getWALBlockSize(conf, fs, path, overwritable)); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DelegateWalProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DelegateWalProvider.java new file mode 100644 index 0000000000..b2ca2cf297 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DelegateWalProvider.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Public +public class DelegateWalProvider implements WALProvider { + protected WALProvider delegateProvider; + + public DelegateWalProvider(WALProvider provider) { + this.delegateProvider = provider; + } + + @Override + public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { + delegateProvider.init(factory, conf, providerId); + } + + @Override + public WAL getWAL(RegionInfo region) throws IOException { + return delegateProvider.getWAL(region); + } + + @Override + public List getWALs() { + return delegateProvider.getWALs(); + } + + @Override + public void shutdown() throws IOException { + delegateProvider.shutdown(); + } + + @Override + public void close() throws IOException { + delegateProvider.close(); + } + + @Override + public long getNumLogFiles() { + return delegateProvider.getNumLogFiles(); + } + + @Override + public long getLogFileSize() { + return delegateProvider.getLogFileSize(); + } + + @Override + public void addWALActionsListener(WALActionsListener listener) { + delegateProvider.addWALActionsListener(listener); + } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, + Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics) + throws IOException { + return delegateProvider.getWalStream(logQueue, conf, startPosition, serverName, metrics); + } + + @Override + public RecoveredReplicationSource getRecoveredReplicationSource() { + return delegateProvider.getRecoveredReplicationSource(); + } + + @Override + public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) { + return delegateProvider.createWalIdentity(serverName, walName, isArchive); + } + + @Override + public Writer createWriter(Configuration conf, FileSystem fs, Path path, boolean overwritable) + throws IOException { + return delegateProvider.createWriter(conf, fs, path, overwritable); + } + + @Override + public Reader createReader(FileSystem fs, Path path, CancelableProgressable reporter, + boolean allowCustom) throws IOException { + return delegateProvider.createReader(fs, path, reporter, allowCustom); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 85ba41ba29..c8d300e8c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -27,6 +27,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; @@ -40,7 +41,9 @@ import org.apache.hadoop.hbase.replication.regionserver.AbstractWALEntryStream; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -330,4 +333,68 @@ class DisabledWALProvider implements WALProvider { }; } + @Override + public Writer createWriter(Configuration conf, FileSystem fs, Path path, boolean overwritable) + throws IOException { + return new Writer() { + + @Override + public void close() throws IOException { + + } + + @Override + public long getLength() { + return 0; + } + + @Override + public void sync(boolean forceSync) throws IOException { + + } + + @Override + public void append(Entry entry) throws IOException { + + } + }; + } + + @Override + public Reader createReader(FileSystem fs, Path path, CancelableProgressable reporter, + boolean allowCustom) throws IOException { + return new Reader() { + + @Override + public void close() throws IOException { + + } + + @Override + public void seek(long pos) throws IOException { + + } + + @Override + public void reset() throws IOException { + + } + + @Override + public Entry next(Entry reuse) throws IOException { + return null; + } + + @Override + public Entry next() throws IOException { + return null; + } + + @Override + public long getPosition() throws IOException { + return 0; + } + }; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index b02a4d30fd..721997afbc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -58,7 +58,7 @@ public class FSHLogProvider extends AbstractFSWALProvider { * @param overwritable if the created writer can overwrite. For recovered edits, it is true and * for WAL it is false. Thus we can distinguish WAL and recovered edits by this. */ - public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, + public Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, final boolean overwritable) throws IOException { return createWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path, overwritable)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index 0bbe01b2d2..9e476e83a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -26,25 +26,15 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; // imports for classes still in regionserver.wal import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.replication.regionserver.FSRecoveredReplicationSource; -import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; -import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; -import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; -import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -67,7 +57,11 @@ import org.slf4j.LoggerFactory; * Optionally, a FQCN to a custom implementation may be given. */ @InterfaceAudience.Private -public class RegionGroupingProvider implements WALProvider { +public class RegionGroupingProvider extends DelegateWalProvider { + public RegionGroupingProvider(WALProvider provider) { + super(provider); + } + private static final Logger LOG = LoggerFactory.getLogger(RegionGroupingProvider.class); /** @@ -147,9 +141,7 @@ public class RegionGroupingProvider implements WALProvider { private Configuration conf; private List listeners = new ArrayList<>(); private String providerId; - private Class providerClass; - private Path rootDir; - private Path oldLogDir; + private WALProvider delegateProvider; @Override public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { @@ -169,13 +161,12 @@ public class RegionGroupingProvider implements WALProvider { this.providerId = sb.toString(); this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY); this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER); - rootDir = FSUtils.getRootDir(conf); - oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); } private WALProvider createProvider(String group) throws IOException { WALProvider provider = WALFactory.createProvider(providerClass); provider.init(factory, conf, + provider.init(factory, conf, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : group); provider.addWALActionsListener(new MetricsWAL()); return provider; @@ -301,30 +292,4 @@ public class RegionGroupingProvider implements WALProvider { listeners.add(listener); } - @Override - public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, - Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics) - throws IOException { - return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, - serverName, metrics, this); - } - - @Override - public RecoveredReplicationSource getRecoveredReplicationSource() { - return new FSRecoveredReplicationSource(); - } - - @Override - public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) { - Path walPath; - if (isArchive) { - walPath = new Path(oldLogDir, walName); - } else { - Path logDir = - new Path(rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); - walPath = new Path(logDir, walName); - } - return new FSWALIdentity(walPath); - } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 02eb776815..818d8a26bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.function.BiPredicate; @@ -37,25 +36,16 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; -import org.apache.hadoop.hbase.replication.regionserver.FSRecoveredReplicationSource; -import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; -import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener; -import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; -import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -76,7 +66,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; * the request to the normal {@link WALProvider}. */ @InterfaceAudience.Private -public class SyncReplicationWALProvider implements WALProvider, PeerActionListener { +public class SyncReplicationWALProvider extends DelegateWalProvider implements PeerActionListener { private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class); @@ -84,8 +74,6 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen @VisibleForTesting public static final String DUAL_WAL_IMPL = "hbase.wal.sync.impl"; - private final WALProvider provider; - private SyncReplicationPeerInfoProvider peerInfoProvider = new DefaultSyncReplicationPeerInfoProvider(); @@ -109,12 +97,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen private final KeyLocker createLock = new KeyLocker<>(); - private Path rootDir; - - private Path oldLogDir; - SyncReplicationWALProvider(WALProvider provider) { - this.provider = provider; + super(provider); } public void setPeerInfoProvider(SyncReplicationPeerInfoProvider peerInfoProvider) { @@ -126,15 +110,13 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen if (!initialized.compareAndSet(false, true)) { throw new IllegalStateException("WALProvider.init should only be called once."); } - provider.init(factory, conf, providerId); + delegateProvider.init(factory, conf, providerId); this.conf = conf; this.factory = factory; Pair> eventLoopGroupAndChannelClass = NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); channelClass = eventLoopGroupAndChannelClass.getSecond(); - rootDir = FSUtils.getRootDir(conf); - oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); } // Use a timestamp to make it identical. That means, after we transit the peer to DA/S and then @@ -208,7 +190,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen @Override public WAL getWAL(RegionInfo region) throws IOException { if (region == null) { - return provider.getWAL(null); + return delegateProvider.getWAL(null); } WAL wal = null; Optional> peerIdAndRemoteWALDir = @@ -217,13 +199,13 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen Pair pair = peerIdAndRemoteWALDir.get(); wal = getWAL(pair.getFirst(), pair.getSecond()); } - return wal != null ? wal : provider.getWAL(region); + return wal != null ? wal : delegateProvider.getWAL(region); } private Stream getWALStream() { return Streams.concat( peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get), - provider.getWALs().stream()); + delegateProvider.getWALs().stream()); } @Override @@ -245,7 +227,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen } } } - provider.shutdown(); + delegateProvider.shutdown(); if (failure != null) { throw failure; } @@ -265,7 +247,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen } } } - provider.close(); + delegateProvider.close(); if (failure != null) { throw failure; } @@ -273,13 +255,13 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen @Override public long getNumLogFiles() { - return peerId2WAL.size() + provider.getNumLogFiles(); + return peerId2WAL.size() + delegateProvider.getNumLogFiles(); } @Override public long getLogFileSize() { return peerId2WAL.values().stream().filter(Optional::isPresent).map(Optional::get) - .mapToLong(DualAsyncFSWAL::getLogFileSize).sum() + provider.getLogFileSize(); + .mapToLong(DualAsyncFSWAL::getLogFileSize).sum() + delegateProvider.getLogFileSize(); } private void safeClose(WAL wal) { @@ -295,7 +277,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen @Override public void addWALActionsListener(WALActionsListener listener) { listeners.add(listener); - provider.addWALActionsListener(listener); + delegateProvider.addWALActionsListener(listener); } @Override @@ -362,33 +344,6 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen @VisibleForTesting WALProvider getWrappedProvider() { - return provider; - } - - @Override - public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, - Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics) - throws IOException { - return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, - serverName, metrics, this); + return delegateProvider; } - - @Override - public RecoveredReplicationSource getRecoveredReplicationSource() { - return new FSRecoveredReplicationSource(); - } - - @Override - public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) { - Path walPath; - if (isArchive) { - walPath = new Path(oldLogDir, walName); - } else { - Path logDir = - new Path(rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); - walPath = new Path(logDir, walName); - } - return new FSWALIdentity(walPath); - } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 8bde6d2001..3f2269d4de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -18,18 +18,13 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.yetus.audience.InterfaceAudience; @@ -90,35 +85,8 @@ public class WALFactory { // lazily intialized; most RegionServers don't deal with META private final AtomicReference metaProvider = new AtomicReference<>(); - /** - * Configuration-specified WAL Reader used when a custom reader is requested - */ - private final Class logReaderClass; - - /** - * How long to attempt opening in-recovery wals - */ - private final int timeoutMillis; - private final Configuration conf; - // Used for the singleton WALFactory, see below. - private WALFactory(Configuration conf) { - // this code is duplicated here so we can keep our members final. - // until we've moved reader/writer construction down into providers, this initialization must - // happen prior to provider initialization, in case they need to instantiate a reader/writer. - timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); - /* TODO Both of these are probably specific to the fs wal provider */ - logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, - AbstractFSWALProvider.Reader.class); - this.conf = conf; - // end required early initialization - - // this instance can't create wals, just reader/writers. - provider = null; - factoryId = SINGLETON_ID; - } - @VisibleForTesting Providers getDefaultProvider() { return Providers.defaultProvider; @@ -153,9 +121,16 @@ public class WALFactory { } } - static WALProvider createProvider(Class clazz) throws IOException { + WALProvider createProvider(Class clazz) throws IOException { LOG.info("Instantiating WALProvider of type {}", clazz); try { + if (DelegateWalProvider.class.isAssignableFrom(clazz)) { + Class delegateClass = + getProviderClass(clazz.getField("DELEGATE_PROVIDER").get(null).toString(), + clazz.getField("DEFAULT_DELEGATE_PROVIDER").get(null).toString()); + WALProvider delegate = createProvider(delegateClass); + return clazz.getDeclaredConstructor(WALProvider.class).newInstance(delegate); + } return clazz.getDeclaredConstructor().newInstance(); } catch (Exception e) { LOG.error("couldn't set up WALProvider, the configured class is " + clazz); @@ -186,12 +161,6 @@ public class WALFactory { */ public WALFactory(Configuration conf, String factoryId, boolean enableSyncReplicationWALProvider) throws IOException { - // until we've moved reader/writer construction down into providers, this initialization must - // happen prior to provider initialization, in case they need to instantiate a reader/writer. - timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); - /* TODO Both of these are probably specific to the fs wal provider */ - logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, - AbstractFSWALProvider.Reader.class); this.conf = conf; this.factoryId = factoryId; // end required early initialization @@ -249,9 +218,6 @@ public class WALFactory { } } - public List getWALs() { - return provider.getWALs(); - } @VisibleForTesting WALProvider getMetaProvider() throws IOException { @@ -296,107 +262,10 @@ public class WALFactory { } } - public Reader createReader(final FileSystem fs, final Path path) throws IOException { - return createReader(fs, path, (CancelableProgressable)null); - } - - /** - * Create a reader for the WAL. If you are reading from a file that's being written to and need - * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method - * then just seek back to the last known good position. - * @return A WAL reader. Close when done with it. - * @throws IOException - */ - public Reader createReader(final FileSystem fs, final Path path, - CancelableProgressable reporter) throws IOException { - return createReader(fs, path, reporter, true); - } public Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, boolean allowCustom) throws IOException { - Class lrClass = - allowCustom ? logReaderClass : ProtobufLogReader.class; - try { - // A wal file could be under recovery, so it may take several - // tries to get it open. Instead of claiming it is corrupted, retry - // to open it up to 5 minutes by default. - long startWaiting = EnvironmentEdgeManager.currentTime(); - long openTimeout = timeoutMillis + startWaiting; - int nbAttempt = 0; - AbstractFSWALProvider.Reader reader = null; - while (true) { - try { - reader = lrClass.getDeclaredConstructor().newInstance(); - reader.init(fs, path, conf, null); - return reader; - } catch (IOException e) { - if (reader != null) { - try { - reader.close(); - } catch (IOException exception) { - LOG.warn("Could not close FSDataInputStream" + exception.getMessage()); - LOG.debug("exception details", exception); - } - } - - String msg = e.getMessage(); - if (msg != null - && (msg.contains("Cannot obtain block length") - || msg.contains("Could not obtain the last block") || msg - .matches("Blocklist for [^ ]* has changed.*"))) { - if (++nbAttempt == 1) { - LOG.warn("Lease should have recovered. This is not expected. Will retry", e); - } - if (reporter != null && !reporter.progress()) { - throw new InterruptedIOException("Operation is cancelled"); - } - if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) { - LOG.error("Can't open after " + nbAttempt + " attempts and " - + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path); - } else { - try { - Thread.sleep(nbAttempt < 3 ? 500 : 1000); - continue; // retry - } catch (InterruptedException ie) { - InterruptedIOException iioe = new InterruptedIOException(); - iioe.initCause(ie); - throw iioe; - } - } - throw new LeaseNotRecoveredException(e); - } else { - throw e; - } - } - } - } catch (IOException ie) { - throw ie; - } catch (Exception e) { - throw new IOException("Cannot get log reader", e); - } - } - - /** - * Create a writer for the WAL. - * Uses defaults. - *

- * Should be package-private. public only for tests and - * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} - * @return A WAL writer. Close when done with it. - */ - public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, false); - } - - /** - * Should be package-private, visible for recovery testing. - * Uses defaults. - * @return an overwritable writer for recovered edits. caller should close. - */ - @VisibleForTesting - public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) - throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, true); + return provider.createReader(fs, path, reporter, allowCustom); } // These static methods are currently used where it's impractical to @@ -407,10 +276,10 @@ public class WALFactory { private static final String SINGLETON_ID = WALFactory.class.getName(); // Public only for FSHLog - public static WALFactory getInstance(Configuration configuration) { + public static WALFactory getInstance(Configuration configuration) throws IOException { WALFactory factory = singleton.get(); if (null == factory) { - WALFactory temp = new WALFactory(configuration); + WALFactory temp = new WALFactory(configuration, SINGLETON_ID); if (singleton.compareAndSet(null, temp)) { factory = temp; } else { @@ -426,58 +295,9 @@ public class WALFactory { return factory; } - /** - * Create a reader for the given path, accept custom reader classes from conf. - * If you already have a WALFactory, you should favor the instance method. - * @return a WAL Reader, caller must close. - */ - public static Reader createReader(final FileSystem fs, final Path path, - final Configuration configuration) throws IOException { - return getInstance(configuration).createReader(fs, path); - } - - /** - * Create a reader for the given path, accept custom reader classes from conf. - * If you already have a WALFactory, you should favor the instance method. - * @return a WAL Reader, caller must close. - */ - static Reader createReader(final FileSystem fs, final Path path, - final Configuration configuration, final CancelableProgressable reporter) throws IOException { - return getInstance(configuration).createReader(fs, path, reporter); - } - - /** - * Create a reader for the given path, ignore custom reader classes from conf. - * If you already have a WALFactory, you should favor the instance method. - * only public pending move of {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} - * @return a WAL Reader, caller must close. - */ - public static Reader createReaderIgnoreCustomClass(final FileSystem fs, final Path path, - final Configuration configuration) throws IOException { - return getInstance(configuration).createReader(fs, path, null, false); - } - - /** - * If you already have a WALFactory, you should favor the instance method. - * Uses defaults. - * @return a Writer that will overwrite files. Caller must close. - */ - static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, - final Configuration configuration) - throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, true); - } - - /** - * If you already have a WALFactory, you should favor the instance method. - * Uses defaults. - * @return a writer that won't overwrite files. Caller must close. - */ - @VisibleForTesting - public static Writer createWALWriter(final FileSystem fs, final Path path, - final Configuration configuration) + public Writer createWALWriter(final FileSystem fs, final Path path, boolean overwritable) throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, false); + return provider.createWriter(conf, fs, path, overwritable); } public final WALProvider getWALProvider() { @@ -487,4 +307,8 @@ public class WALFactory { public final WALProvider getMetaWALProvider() { return this.metaProvider.get(); } + + public Reader createReader(FileSystem fs, Path path) throws IOException { + return provider.createReader(fs, path, null, true); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java index 281f3c9c1d..2bb3417acb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java @@ -259,7 +259,7 @@ public class WALPrettyPrinter { throw new IOException(p + " is not a file"); } - WAL.Reader log = WALFactory.createReader(fs, p, conf); + WAL.Reader log = WALFactory.getInstance(conf).createReader(fs, p, null, false); if (log instanceof ProtobufLogReader) { List writerClsNames = ((ProtobufLogReader) log).getWriterClsNames(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 28efcd57dd..897429fece 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -24,12 +24,16 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.yetus.audience.InterfaceAudience; /** @@ -140,4 +144,28 @@ public interface WALProvider { * @return WALIdentity */ WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive); + + /** + * Create a writer + * @param conf configuration + * @param fs WAL FileSystem + * @param path Path of the wal + * @param overwritable is overwritable + * @return Writer + * @throws IOException IOException + */ + Writer createWriter(Configuration conf, FileSystem fs, Path path, boolean overwritable) + throws IOException; + + /** + * Create a reader + * @param fs WAL filesystem + * @param path Path of the wal + * @param reporter CancelableProgressable + * @param allowCustom allow custom reader class + * @return Reader + * @throws IOException IOException + */ + Reader createReader(final FileSystem fs, final Path path, CancelableProgressable reporter, + boolean allowCustom) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index bc67d9863d..076fbf08f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -796,7 +796,7 @@ public class WALSplitter { */ protected Writer createWriter(Path logfile) throws IOException { - return walFactory.createRecoveredEditsWriter(walFS, logfile); + return walFactory.createWALWriter(walFS, logfile, true); } /** @@ -804,7 +804,7 @@ public class WALSplitter { * @return new Reader instance, caller should close */ protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException { - return walFactory.createReader(walFS, curLogFile, reporter); + return walFactory.createReader(walFS, curLogFile, reporter, true); } /** @@ -1282,7 +1282,7 @@ public class WALSplitter { private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException { long dstMinLogSeqNum = -1L; - try (WAL.Reader reader = walFactory.createReader(walFS, dst)) { + try (WAL.Reader reader = walFactory.createReader(walFS, dst, null, true)) { WAL.Entry entry = reader.next(); if (entry != null) { dstMinLogSeqNum = entry.getKey().getSequenceId(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java index e657d9c74a..ec22055833 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSequenceIdMonotonicallyIncreasing.java @@ -93,8 +93,8 @@ public class TestSequenceIdMonotonicallyIncreasing { private long getMaxSeqId(HRegionServer rs, RegionInfo region) throws IOException { Path walFile = ((AbstractFSWAL) rs.getWAL(null)).getCurrentFileName(); long maxSeqId = -1L; - try (WAL.Reader reader = - WALFactory.createReader(UTIL.getTestFileSystem(), walFile, UTIL.getConfiguration())) { + try (WAL.Reader reader = WALFactory.getInstance(UTIL.getConfiguration()) + .createReader(UTIL.getTestFileSystem(), walFile, null, true)) { for (;;) { WAL.Entry entry = reader.next(); if (entry == null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index 388c53dc05..1d1f01543b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -639,7 +639,7 @@ public abstract class AbstractTestDLS { private int countWAL(Path log, FileSystem fs, Configuration conf) throws IOException { int count = 0; - try (WAL.Reader in = WALFactory.createReader(fs, log, conf)) { + try (WAL.Reader in = WALFactory.getInstance(conf).createReader(fs, log, null, true)) { WAL.Entry e; while ((e = in.next()) != null) { if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index b2d9a1bff2..029ec384c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -689,7 +689,7 @@ public class TestHRegion { for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = wals.createWALWriter(fs, recoveredEdits, true); long time = System.nanoTime(); WALEdit edit = new WALEdit(); @@ -740,7 +740,7 @@ public class TestHRegion { for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = wals.createWALWriter(fs, recoveredEdits, true); long time = System.nanoTime(); WALEdit edit = new WALEdit(); @@ -826,7 +826,7 @@ public class TestHRegion { for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = wals.createWALWriter(fs, recoveredEdits, true); long time = System.nanoTime(); WALEdit edit = null; @@ -938,7 +938,7 @@ public class TestHRegion { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = wals.createWALWriter(fs, recoveredEdits, true); long time = System.nanoTime(); @@ -1018,8 +1018,8 @@ public class TestHRegion { // now verify that the flush markers are written wal.shutdown(); - WAL.Reader reader = WALFactory.createReader(fs, AbstractFSWALProvider.getCurrentFileName(wal), - TEST_UTIL.getConfiguration()); + WAL.Reader reader = WALFactory.getInstance(TEST_UTIL.getConfiguration()).createReader(fs, + AbstractFSWALProvider.getCurrentFileName(wal), null, true); try { List flushDescriptors = new ArrayList<>(); long lastFlushSeqId = -1; @@ -1063,7 +1063,7 @@ public class TestHRegion { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = wals.createWALWriter(fs, recoveredEdits, true); for (WAL.Entry entry : flushDescriptors) { writer.append(entry); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 3b3b8c39db..932f0c53cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -317,9 +317,9 @@ public class TestHRegionReplayEvents { } WAL.Reader createWALReaderForPrimary() throws FileNotFoundException, IOException { - return WALFactory.createReader(TEST_UTIL.getTestFileSystem(), - AbstractFSWALProvider.getCurrentFileName(walPrimary), - TEST_UTIL.getConfiguration()); + return WALFactory.getInstance(TEST_UTIL.getConfiguration()).createReader( + TEST_UTIL.getTestFileSystem(), AbstractFSWALProvider.getCurrentFileName(walPrimary), null, + true); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java index 543126e5b5..b6051e1bfd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java @@ -174,7 +174,7 @@ public class TestRecoveredEdits { // Based on HRegion#replayRecoveredEdits WAL.Reader reader = null; try { - reader = WALFactory.createReader(fs, edits, conf); + reader = WALFactory.getInstance(conf).createReader(fs, edits, null, true); WAL.Entry entry; while ((entry = reader.next()) != null) { WALKey key = entry.getKey(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java index 7aeff84a82..238544bc23 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java @@ -155,8 +155,7 @@ public class TestRecoveredEditsReplayAndAbort { String.format("%019d", i)); LOG.info("Begin to write recovered.edits : " + recoveredEdits); fs.create(recoveredEdits); - WALProvider.Writer writer = wals - .createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = wals.createWALWriter(fs, recoveredEdits, true); for (long j = i; j < i + 100; j++) { long time = System.nanoTime(); WALEdit edit = new WALEdit(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 8e7cacf0b9..394c4ab168 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -326,7 +326,8 @@ public class TestLogRolling extends AbstractTestLogRolling { LOG.debug("Reading WAL " + FSUtils.getPath(p)); WAL.Reader reader = null; try { - reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration()); + reader = + WALFactory.getInstance(TEST_UTIL.getConfiguration()).createReader(fs, p, null, true); WAL.Entry entry; while ((entry = reader.next()) != null) { LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java index d429a01fdb..502e8a1fa2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. */ package org.apache.hadoop.hbase.regionserver.wal; @@ -22,7 +15,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.wal.FSHLogProvider; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.junit.ClassRule; import org.junit.experimental.categories.Category; @@ -36,6 +29,6 @@ public class TestProtobufLog extends AbstractTestProtobufLog { @Override protected Writer createWriter(Path path) throws IOException { - return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false); + return WALFactory.getInstance(TEST_UTIL.getConfiguration()).createWALWriter(fs, path, false); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index 1b98518728..37b39dc340 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -184,7 +184,7 @@ public class SerialReplicationTestBase { protected final void setupWALWriter() throws IOException { logPath = new Path(LOG_DIR, name.getMethodName()); - WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); + WRITER = WALFactory.getInstance(UTIL.getConfiguration()).createWALWriter(FS, logPath, false); } protected final void waitUntilReplicationDone(int expectedEntries) throws Exception { @@ -192,7 +192,8 @@ public class SerialReplicationTestBase { @Override public boolean evaluate() throws Exception { - try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { + try (WAL.Reader reader = + WALFactory.getInstance(UTIL.getConfiguration()).createReader(FS, logPath, null, true)) { int count = 0; while (reader.next() != null) { count++; @@ -225,7 +226,8 @@ public class SerialReplicationTestBase { protected final void checkOrder(int expectedEntries) throws IOException { try (WAL.Reader reader = - WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { + WALFactory.getInstance(UTIL.getConfiguration()).createReader(UTIL.getTestFileSystem(), + logPath, null, true)) { long seqId = -1L; int count = 0; for (Entry entry;;) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java index 07e626b3c8..6ebe31e268 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java @@ -108,7 +108,8 @@ public class TestSerialReplication extends SerialReplicationTestBase { regionsToSeqId.put(region.getEncodedName(), -1L); regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L)); try (WAL.Reader reader = - WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { + WALFactory.getInstance(UTIL.getConfiguration()).createReader(UTIL.getTestFileSystem(), + logPath, null, true)) { int count = 0; for (Entry entry;;) { entry = reader.next(); @@ -169,7 +170,8 @@ public class TestSerialReplication extends SerialReplicationTestBase { regionsToSeqId.put(region.getEncodedName(), -1L); regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L)); try (WAL.Reader reader = - WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { + WALFactory.getInstance(UTIL.getConfiguration()).createReader(UTIL.getTestFileSystem(), + logPath, null, true)) { int count = 0; for (Entry entry;;) { entry = reader.next(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java index 42adab60b5..05201c7607 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java @@ -124,7 +124,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase { Assert.assertTrue(files.length > 0); for (FileStatus file : files) { try ( - Reader reader = WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) { + Reader reader = WALFactory.getInstance(utility.getConfiguration()).createReader(fs2, + file.getPath(), null, true)) { Entry entry = reader.next(); Assert.assertTrue(entry != null); while (entry != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java index bd800a841f..055883cc10 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java @@ -138,7 +138,7 @@ public class TestRaceWhenCreatingReplicationSource { Path dir = UTIL.getDataTestDirOnTestFS(); FS = UTIL.getTestFileSystem(); LOG_PATH = new Path(dir, "replicated"); - WRITER = WALFactory.createWALWriter(FS, LOG_PATH, UTIL.getConfiguration()); + WRITER = WALFactory.getInstance(UTIL.getConfiguration()).createWALWriter(FS, LOG_PATH, false); UTIL.getAdmin().addReplicationPeer(PEER_ID, ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(), @@ -184,7 +184,8 @@ public class TestRaceWhenCreatingReplicationSource { @Override public boolean evaluate() throws Exception { - try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) { + try (WAL.Reader reader = WALFactory.getInstance(UTIL.getConfiguration()).createReader(FS, + LOG_PATH, null, true)) { return reader.next() != null; } catch (IOException e) { return false; @@ -196,7 +197,8 @@ public class TestRaceWhenCreatingReplicationSource { return "Replication has not catched up"; } }); - try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) { + try (WAL.Reader reader = + WALFactory.getInstance(UTIL.getConfiguration()).createReader(FS, LOG_PATH, null, true)) { Cell cell = reader.next().getEdit().getCells().get(0); assertEquals(1, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); assertArrayEquals(CF, CellUtil.cloneFamily(cell)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 6322f7903b..5d5f0ad7b2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -113,8 +113,8 @@ public class TestReplicationSource { Path logPath = new Path(logDir, "log"); if (!FS.exists(logDir)) FS.mkdirs(logDir); if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir); - WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath, - TEST_UTIL.getConfiguration()); + WALProvider.Writer writer = + WALFactory.getInstance(TEST_UTIL.getConfiguration()).createWALWriter(FS, logPath, false); for(int i = 0; i < 3; i++) { byte[] b = Bytes.toBytes(Integer.toString(i)); KeyValue kv = new KeyValue(b,b,b); @@ -127,7 +127,8 @@ public class TestReplicationSource { } writer.close(); - WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration()); + WAL.Reader reader = + WALFactory.getInstance(TEST_UTIL.getConfiguration()).createReader(FS, logPath, null, true); WAL.Entry entry = reader.next(); assertNotNull(entry); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index bb8c2915ed..e083e38378 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -77,8 +77,16 @@ import org.slf4j.LoggerFactory; * management over time, becaue the data set size may result in additional HDFS block allocations. */ @InterfaceAudience.Private -public class IOTestProvider implements WALProvider { +public class IOTestProvider extends DelegateWalProvider { + public IOTestProvider(WALProvider provider) { + super(provider); + } + private static final Logger LOG = LoggerFactory.getLogger(IOTestProvider.class); + /** delegate provider for WAL creation/roll/close */ + public static final String DELEGATE_PROVIDER = "hbase.wal.iotest.delegate.provider"; + public static final String DEFAULT_DELEGATE_PROVIDER = + WALFactory.Providers.defaultProvider.name(); private static final String ALLOWED_OPERATIONS = "hbase.wal.iotestprovider.operations"; private enum AllowedOperations { @@ -100,10 +108,6 @@ public class IOTestProvider implements WALProvider { private List listeners = new ArrayList<>(); - private Path oldLogDir; - - private Path rootDir; - /** * @param factory factory that made us, identity used for FS layout. may not be null * @param conf may not be null @@ -118,8 +122,6 @@ public class IOTestProvider implements WALProvider { this.factory = factory; this.conf = conf; this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID; - rootDir = FSUtils.getRootDir(conf); - oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); } @Override @@ -297,36 +299,4 @@ public class IOTestProvider implements WALProvider { return this.log.getLogFileSize(); } - @Override - public void addWALActionsListener(WALActionsListener listener) { - // TODO Implement WALProvider.addWALActionLister - - } - - @Override - public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, - Configuration conf, long startPosition, ServerName serverName, MetricsSource metrics) - throws IOException { - return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, - serverName, metrics, this); - } - - @Override - public RecoveredReplicationSource getRecoveredReplicationSource() { - return new FSRecoveredReplicationSource(); - } - - @Override - public WALIdentity createWalIdentity(ServerName serverName, String walName, boolean isArchive) { - Path walPath; - if (isArchive) { - walPath = new Path(oldLogDir, walName); - } else { - Path logDir = - new Path(rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); - walPath = new Path(logDir, walName); - } - return new FSWALIdentity(walPath); - } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestRaceBetweenGetWALAndGetWALs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestRaceBetweenGetWALAndGetWALs.java index 26ff11836f..396828d54a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestRaceBetweenGetWALAndGetWALs.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestRaceBetweenGetWALAndGetWALs.java @@ -26,8 +26,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Threads; @@ -65,6 +68,13 @@ public class TestRaceBetweenGetWALAndGetWALs { @Override protected void doInit(Configuration conf) throws IOException { } + + @Override + public Writer createWriter(Configuration conf, FileSystem fs, Path path, boolean overwritable) + throws IOException { + return FSHLogProvider.createWriter(conf, fs, path, overwritable, + WALUtil.getWALBlockSize(conf, fs, path)); + } } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java index 8189cef081..1552085bae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java @@ -162,11 +162,11 @@ public class TestSyncReplicationWALProvider { WAL walNoRep = FACTORY.getWAL(REGION_NO_REP); assertThat(walNoRep, not(instanceOf(DualAsyncFSWAL.class))); DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION); - assertEquals(2, FACTORY.getWALs().size()); + assertEquals(2, FACTORY.getWALProvider().getWALs().size()); testReadWrite(wal); SyncReplicationWALProvider walProvider = (SyncReplicationWALProvider) FACTORY.getWALProvider(); walProvider.peerSyncReplicationStateChange(PEER_ID, SyncReplicationState.ACTIVE, SyncReplicationState.DOWNGRADE_ACTIVE, 1); - assertEquals(1, FACTORY.getWALs().size()); + assertEquals(1, FACTORY.getWALProvider().getWALs().size()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java index 7d7896c3f9..02a5210074 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALOpenAfterDNRollingStart.java @@ -122,8 +122,8 @@ public class TestWALOpenAfterDNRollingStart { currentFile = new Path(oldLogDir, currentFile.getName()); } // if the log is not rolled, then we can never open this wal forever. - try (WAL.Reader reader = WALFactory.createReader(TEST_UTIL.getTestFileSystem(), currentFile, - TEST_UTIL.getConfiguration())) { + try (WAL.Reader reader = WALFactory.getInstance(TEST_UTIL.getConfiguration()) + .createReader(TEST_UTIL.getTestFileSystem(), currentFile, null, true)) { reader.next(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index e6644f07dd..d417493c03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -423,7 +423,7 @@ public class TestWALSplit { FILENAME_BEING_SPLIT, TMPDIRNAME, conf); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); - WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); + wals.createWALWriter(fs, p, false).close(); } private void useDifferentDFSClient() throws IOException { @@ -687,7 +687,7 @@ public class TestWALSplit { assertEquals(1, splitLog.length); int actualCount = 0; - Reader in = wals.createReader(fs, splitLog[0]); + Reader in = wals.createReader(fs, splitLog[0], null, true); @SuppressWarnings("unused") Entry entry; while ((entry = in.next()) != null) ++actualCount; @@ -1151,7 +1151,7 @@ public class TestWALSplit { @Override protected Writer createWriter(Path logfile) throws IOException { - Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile); + Writer writer = wals.createWALWriter(this.walFS, logfile, false); // After creating writer, simulate region's // replayRecoveredEditsIfAny() which gets SplitEditFiles of this // region and delete them, excluding files with '.temp' suffix. @@ -1210,7 +1210,7 @@ public class TestWALSplit { int seq = 0; int numRegionEventsAdded = 0; for (int i = 0; i < writers; i++) { - ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); + ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i), false); for (int j = 0; j < entries; j++) { int prefix = 0; for (String region : REGIONS) { @@ -1339,7 +1339,7 @@ public class TestWALSplit { private int countWAL(Path log) throws IOException { int count = 0; - Reader in = wals.createReader(fs, log); + Reader in = wals.createReader(fs, log, null, true); while (in.next() != null) { count++; } @@ -1409,8 +1409,7 @@ public class TestWALSplit { } private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { - Writer writer = - WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); + Writer writer = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), false); if (closeFile) { writer.close(); } @@ -1418,8 +1417,8 @@ public class TestWALSplit { private boolean logsAreEqual(Path p1, Path p2) throws IOException { Reader in1, in2; - in1 = wals.createReader(fs, p1); - in2 = wals.createReader(fs, p2); + in1 = wals.createReader(fs, p1, null, true); + in2 = wals.createReader(fs, p2, null, true); Entry entry1; Entry entry2; while ((entry1 = in1.next()) != null) {