From 67538b2125d5d5e609008102f807d72862c15341 Mon Sep 17 00:00:00 2001 From: Vladimir Rodionov Date: Wed, 27 Mar 2019 17:11:36 -0700 Subject: [PATCH] HBASE-21688: Address WAL filesystem issues --- .../hadoop/hbase/util/CommonFSUtils.java | 1 + .../test/IntegrationTestBigLinkedList.java | 6 ++--- .../test/IntegrationTestLoadAndVerify.java | 6 ++--- .../org/apache/hadoop/hbase/io/WALLink.java | 3 ++- .../hadoop/hbase/master/MasterWalManager.java | 16 +++++++----- .../ReplicationSourceWALReader.java | 2 +- .../regionserver/WALEntryStream.java | 15 ++++++----- .../apache/hadoop/hbase/util/HBaseFsck.java | 6 ++--- .../hbase/wal/AbstractFSWALProvider.java | 6 ++--- .../hbase/fs/TestBlockReorderMultiBlocks.java | 2 +- .../hadoop/hbase/master/AbstractTestDLS.java | 2 +- .../hbase/master/TestMasterWALManager.java | 23 ++++++++++++---- .../regionserver/TestWALEntryStream.java | 26 +++++++++---------- 13 files changed, 66 insertions(+), 48 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java index a08f9f22c8..bf0d792bed 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java @@ -379,6 +379,7 @@ public abstract class CommonFSUtils { * @throws IOException e */ public static Path getWALRootDir(final Configuration c) throws IOException { + Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR))); if (!isValidWALRootDir(p, c)) { return getRootDir(c); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 35bc7a1a42..f9839355da 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; @@ -944,10 +945,9 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { if (keys.isEmpty()) throw new RuntimeException("No keys to find"); LOG.info("Count of keys to find: " + keys.size()); for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key)); - Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR)); // Now read all WALs. In two dirs. Presumes certain layout. - Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); - Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path walsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME); + Path oldWalsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME); LOG.info("Running Search with keys inputDir=" + inputDir +", numMappers=" + numMappers + " against " + getConf().get(HConstants.HBASE_DIR)); int ret = ToolRunner.run(getConf(), new WALSearcher(getConf()), diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java index ffdfa86acc..55d85f1622 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl; import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -506,10 +507,9 @@ public void cleanUpCluster() throws Exception { if (keys.isEmpty()) throw new RuntimeException("No keys to find"); LOG.info("Count of keys to find: " + keys.size()); for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key)); - Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR)); // Now read all WALs. In two dirs. Presumes certain layout. - Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); - Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path walsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_LOGDIR_NAME); + Path oldWalsDir = new Path(CommonFSUtils.getWALRootDir(getConf()), HConstants.HREGION_OLDLOGDIR_NAME); LOG.info("Running Search with keys inputDir=" + inputDir + " against " + getConf().get(HConstants.HBASE_DIR)); int ret = ToolRunner.run(new WALSearcher(getConf()), new String [] {walsDir.toString(), ""}); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java index aac2a87e17..5409bcd725 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/WALLink.java @@ -24,6 +24,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; /** @@ -45,7 +46,7 @@ public class WALLink extends FileLink { */ public WALLink(final Configuration conf, final String serverName, final String logName) throws IOException { - this(FSUtils.getWALRootDir(conf), serverName, logName); + this(CommonFSUtils.getWALRootDir(conf), serverName, logName); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java index 7ba5709c68..dd6396def6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -93,15 +94,14 @@ public class MasterWalManager { private volatile boolean fsOk = true; public MasterWalManager(MasterServices services) throws IOException { - this(services.getConfiguration(), services.getMasterFileSystem().getWALFileSystem(), - services.getMasterFileSystem().getWALRootDir(), services); + this(services.getConfiguration(), services.getMasterFileSystem().getWALFileSystem(), services); } - public MasterWalManager(Configuration conf, FileSystem fs, Path rootDir, MasterServices services) + public MasterWalManager(Configuration conf, FileSystem fs, MasterServices services) throws IOException { this.fs = fs; this.conf = conf; - this.rootDir = rootDir; + this.rootDir = CommonFSUtils.getWALRootDir(conf); this.services = services; this.splitLogManager = new SplitLogManager(services, conf); @@ -190,9 +190,10 @@ public class MasterWalManager { /** * @return Returns the WALs dir under rootDir + * @throws IOException */ - Path getWALDirPath() { - return new Path(this.rootDir, HConstants.HREGION_LOGDIR_NAME); + Path getWALDirPath() throws IOException { + return new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME); } /** @@ -207,13 +208,14 @@ public class MasterWalManager { /** * Inspect the log directory to find dead servers which need recovery work * @return A set of ServerNames which aren't running but still have WAL files left in file system + * @throws IOException * @deprecated With proc-v2, we can record the crash server with procedure store, so do not need * to scan the wal directory to find out the splitting wal directory any more. Leave * it here only because {@code RecoverMetaProcedure}(which is also deprecated) uses * it. */ @Deprecated - public Set getFailedServersFromLogFolders() { + public Set getFailedServersFromLogFolders() throws IOException { boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors", WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index b6b50adc85..2a2c9dea2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -123,7 +123,7 @@ public class ReplicationSourceWALReader extends Thread { int sleepMultiplier = 1; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream try (WALEntryStream entryStream = - new WALEntryStream(logQueue, fs, conf, currentPosition, + new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), source.getSourceMetrics())) { while (isReaderRunning()) { // loop here to keep reusing stream while we can diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 7c83c0c1fa..fe708ce6a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -72,18 +73,17 @@ class WALEntryStream implements Closeable { /** * Create an entry stream over the given queue at the given start position * @param logQueue the queue of WAL paths - * @param fs {@link FileSystem} to use to create {@link Reader} for this stream * @param conf {@link Configuration} to use to create {@link Reader} for this stream * @param startPosition the position in the first WAL to start reading at * @param serverName the server name which all WALs belong to * @param metrics replication metrics * @throws IOException */ - public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs, Configuration conf, + public WALEntryStream(PriorityBlockingQueue logQueue, Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, MetricsSource metrics) throws IOException { this.logQueue = logQueue; - this.fs = fs; + this.fs = CommonFSUtils.getWALFileSystem(conf); this.conf = conf; this.currentPosition = startPosition; this.walFileLengthProvider = walFileLengthProvider; @@ -301,10 +301,10 @@ class WALEntryStream implements Closeable { } private Path getArchivedLog(Path path) throws IOException { - Path rootDir = FSUtils.getRootDir(conf); + Path walRootDir = CommonFSUtils.getWALRootDir(conf); // Try found the log in old dir - Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); Path archivedLogLocation = new Path(oldLogDir, path.getName()); if (fs.exists(archivedLogLocation)) { LOG.info("Log " + path + " was moved to " + archivedLogLocation); @@ -313,7 +313,7 @@ class WALEntryStream implements Closeable { // Try found the log in the seperate old log dir oldLogDir = - new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) + new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); archivedLogLocation = new Path(oldLogDir, path.getName()); if (fs.exists(archivedLogLocation)) { @@ -370,7 +370,8 @@ class WALEntryStream implements Closeable { // For HBASE-15019 private void recoverLease(final Configuration conf, final Path path) { try { - final FileSystem dfs = FSUtils.getCurrentFileSystem(conf); + + final FileSystem dfs = CommonFSUtils.getWALFileSystem(conf); FSUtils fsUtils = FSUtils.getInstance(dfs, conf); fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index ce2a2adda8..0bba04173f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -1669,9 +1669,9 @@ public class HBaseFsck extends Configured implements Closeable { * Meta recovery WAL directory inside WAL directory path. */ private void removeHBCKMetaRecoveryWALDir(String walFactoryId) throws IOException { - Path rootdir = FSUtils.getRootDir(getConf()); - Path walLogDir = new Path(new Path(rootdir, HConstants.HREGION_LOGDIR_NAME), walFactoryId); - FileSystem fs = FSUtils.getCurrentFileSystem(getConf()); + Path walLogDir = new Path(new Path(CommonFSUtils.getWALRootDir(getConf()), + HConstants.HREGION_LOGDIR_NAME), walFactoryId); + FileSystem fs = CommonFSUtils.getWALFileSystem(getConf()); FileStatus[] walFiles = FSUtils.listStatus(fs, walLogDir, null); if (walFiles == null || walFiles.length == 0) { LOG.info("HBCK meta recovery WAL directory is empty, removing it now."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index fea05ded77..6387b038b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -418,8 +418,8 @@ public abstract class AbstractFSWALProvider> implemen * @throws IOException exception */ public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException { - Path rootDir = FSUtils.getRootDir(conf); - Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path walRootDir = FSUtils.getWALRootDir(conf); + Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) { ServerName serverName = getServerNameFromWALDirectoryName(path); if (serverName == null) { @@ -429,7 +429,7 @@ public abstract class AbstractFSWALProvider> implemen oldLogDir = new Path(oldLogDir, serverName.getServerName()); } Path archivedLogLocation = new Path(oldLogDir, path.getName()); - final FileSystem fs = FSUtils.getCurrentFileSystem(conf); + final FileSystem fs = FSUtils.getWALFileSystem(conf); if (fs.exists(archivedLogLocation)) { LOG.info("Log " + path + " was moved to " + archivedLogLocation); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java index 937b5dd662..acc5d05a2f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java @@ -141,7 +141,7 @@ public class TestBlockReorderMultiBlocks { // Now we need to find the log file, its locations, and look at it - String rootDir = new Path(FSUtils.getRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME + + String rootDir = new Path(FSUtils.getWALRootDir(conf) + "/" + HConstants.HREGION_LOGDIR_NAME + "/" + targetRs.getServerName().toString()).toUri().getPath(); DistributedFileSystem mdfs = (DistributedFileSystem) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index f36b38c3cf..43847dd252 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -416,7 +416,7 @@ public abstract class AbstractTestDLS { startCluster(1); final SplitLogManager slm = master.getMasterWalManager().getSplitLogManager(); final FileSystem fs = master.getMasterFileSystem().getFileSystem(); - final Path logDir = new Path(new Path(FSUtils.getRootDir(conf), HConstants.HREGION_LOGDIR_NAME), + final Path logDir = new Path(new Path(FSUtils.getWALRootDir(conf), HConstants.HREGION_LOGDIR_NAME), ServerName.valueOf("x", 1, 1).toString()); fs.mkdirs(logDir); ExecutorService executor = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWALManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWALManager.java index 4f8a240526..bb460384ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWALManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWALManager.java @@ -25,7 +25,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -60,14 +60,27 @@ public class TestMasterWALManager { public void before() throws IOException { MasterFileSystem mfs = Mockito.mock(MasterFileSystem.class); Mockito.when(mfs.getWALFileSystem()).thenReturn(HTU.getTestFileSystem()); - Path walRootDir = HTU.createWALRootDir(); + final Path walRootDir = HTU.getDataTestDir();; + Mockito.when(mfs.getWALRootDir()).thenReturn(walRootDir); this.masterServices = Mockito.mock(MasterServices.class); Mockito.when(this.masterServices.getConfiguration()).thenReturn(HTU.getConfiguration()); Mockito.when(this.masterServices.getMasterFileSystem()).thenReturn(mfs); - Mockito.when(this.masterServices.getServerName()). - thenReturn(ServerName.parseServerName("master.example.org,0123,456")); - this.mwm = new MasterWalManager(this.masterServices); + Mockito.when(this.masterServices.getServerName()) + .thenReturn(ServerName.parseServerName("master.example.org,0123,456")); + this.mwm = new MasterWalManager(this.masterServices) { + + @Override + Path getWALDirPath() throws IOException { + return walRootDir; + } + + @Override + Path getWALDirectoryName(ServerName serverName) { + return new Path(walRootDir, + AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); + } + }; } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 2146e474a2..ce257020bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -150,7 +150,7 @@ public class TestWALEntryStream { log.rollWriter(); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { int i = 0; while (entryStream.hasNext()) { assertNotNull(entryStream.next()); @@ -177,7 +177,7 @@ public class TestWALEntryStream { appendToLog(); long oldPos; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { // There's one edit in the log, read it. Reading past it needs to throw exception assertTrue(entryStream.hasNext()); WAL.Entry entry = entryStream.next(); @@ -194,7 +194,7 @@ public class TestWALEntryStream { appendToLog(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos, + try (WALEntryStream entryStream = new WALEntryStream(walQueue, conf, oldPos, log, null, new MetricsSource("1"))) { // Read the newly added entry, make sure we made progress WAL.Entry entry = entryStream.next(); @@ -208,7 +208,7 @@ public class TestWALEntryStream { log.rollWriter(); appendToLog(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos, + try (WALEntryStream entryStream = new WALEntryStream(walQueue, conf, oldPos, log, null, new MetricsSource("1"))) { WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); @@ -234,7 +234,7 @@ public class TestWALEntryStream { appendToLog("1"); appendToLog("2");// 2 try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { assertEquals("1", getRow(entryStream.next())); appendToLog("3"); // 3 - comes in after reader opened @@ -259,7 +259,7 @@ public class TestWALEntryStream { public void testNewEntriesWhileStreaming() throws Exception { appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { entryStream.next(); // we've hit the end of the stream at this point // some new entries come in while we're streaming @@ -282,7 +282,7 @@ public class TestWALEntryStream { long lastPosition = 0; appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { entryStream.next(); // we've hit the end of the stream at this point appendToLog("2"); appendToLog("3"); @@ -290,7 +290,7 @@ public class TestWALEntryStream { } // next stream should picks up where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, conf, lastPosition, log, null, new MetricsSource("1"))) { assertEquals("2", getRow(entryStream.next())); assertEquals("3", getRow(entryStream.next())); assertFalse(entryStream.hasNext()); // done @@ -307,14 +307,14 @@ public class TestWALEntryStream { long lastPosition = 0; appendEntriesToLog(3); // read only one element - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition, + try (WALEntryStream entryStream = new WALEntryStream(walQueue, conf, lastPosition, log, null, new MetricsSource("1"))) { entryStream.next(); lastPosition = entryStream.getPosition(); } // there should still be two more entries from where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, conf, lastPosition, log, null, new MetricsSource("1"))) { assertNotNull(entryStream.next()); assertNotNull(entryStream.next()); assertFalse(entryStream.hasNext()); @@ -325,7 +325,7 @@ public class TestWALEntryStream { @Test public void testEmptyStream() throws Exception { try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { assertFalse(entryStream.hasNext()); } } @@ -336,7 +336,7 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { + new WALEntryStream(walQueue, conf, 0, log, null, new MetricsSource("1"))) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -440,7 +440,7 @@ public class TestWALEntryStream { appendToLog("2"); long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong(); AtomicLong fileLength = new AtomicLong(size - 1); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0, + try (WALEntryStream entryStream = new WALEntryStream(walQueue, conf, 0, p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) { assertTrue(entryStream.hasNext()); assertNotNull(entryStream.next()); -- 2.17.2 (Apple Git-113)