diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ListWal.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ListWal.java new file mode 100644 index 0000000..68f6ea3 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ListWal.java @@ -0,0 +1,352 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; + +import java.io.IOException; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; +import org.apache.hadoop.hbase.wal.ListWalProvider.ListWalMetaDataProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALInfo; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ListWal implements WAL { + + protected final AtomicBoolean shutdown = new AtomicBoolean(false); + protected final AtomicLong filenum = new AtomicLong(-1); + protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); + List activeWal; + private WALCoprocessorHost coprocessorHost; + protected final List listeners = new CopyOnWriteArrayList<>(); + private String walFilePrefix; + private String walFileSuffix; + private boolean closed; + private ListWalMetaDataProvider metaDataProvider; + + public ListWal(Configuration conf, String prefix, String suffix, + ListWalMetaDataProvider metaDataProvider, List listeners) throws IOException { + this.coprocessorHost = new WALCoprocessorHost(this, conf); + // If prefix is null||empty then just name it wal + this.walFilePrefix = + prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); + // we only correctly differentiate suffices when numeric ones start with '.' + if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { + throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER + + "' but instead was '" + suffix + "'"); + } + this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); + this.metaDataProvider = metaDataProvider; + // Register listeners. TODO: Should this exist anymore? We have CPs? + if (listeners != null) { + for (WALActionsListener i : listeners) { + registerWALActionsListener(i); + } + } + } + + @Override + public OptionalLong getLogFileSizeIfBeingWritten(WALInfo walInfo) { + return OptionalLong.of(metaDataProvider.get(walInfo).size()); + } + + @Override + public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + if (this.closed) { + throw new WALClosedException("WAL has been closed"); + } + WALInfo oldPath = getOldPath(); + WALInfo newPath = getNewPath(); + tellListenersAboutPreLogRoll(oldPath, newPath); + activeWal = createWal(newPath); + tellListenersAboutPostLogRoll(oldPath, newPath); + return null; + } + + List createWal(WALInfo newPath) { + return metaDataProvider.createList(newPath); + } + + @Override + public void shutdown() throws IOException { + if (!shutdown.compareAndSet(false, true)) { + return; + } + closed = true; + // Tell our listeners that the log is closing + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.logCloseRequested(); + } + } + metaDataProvider.clear(); + } + + @Override + public void close() throws IOException { + shutdown(); + } + + @Override + public long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) + throws IOException { + if (this.closed) { + throw new IOException( + "Cannot append; log is closed, regionName = " + key.getEncodedRegionName()); + } + Entry entry = new Entry(key, edits); + MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); + stampRegionSequenceId(entry,we, inMemstore); + activeWal.add(entry); + return activeWal.size(); + } + + + + private void stampRegionSequenceId(Entry entry, WriteEntry we, boolean inMemstore) throws IOException { + long regionSequenceId = we.getWriteNumber(); + if (!entry.getEdit().isReplay() && inMemstore) { + for (Cell c : entry.getEdit().getCells()) { + PrivateCellUtil.setSequenceId(c, regionSequenceId); + } + } + entry.getKey().setWriteEntry(we); + } + + @Override + public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, + boolean onlyIfGreater) { + sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); + } + + @Override + public void sync() throws IOException { + + } + + @Override + public void sync(long txid) throws IOException { + } + + /** + * Used to initialize the WAL. Usually just call rollWriter to create the first log writer. + */ + public void init() throws IOException { + rollWriter(false); + } + + @Override + public void registerWALActionsListener(WALActionsListener listener) { + this.listeners.add(listener); + } + + @Override + public boolean unregisterWALActionsListener(WALActionsListener listener) { + return this.listeners.remove(listener); + } + + @Override + public WALCoprocessorHost getCoprocessorHost() { + return coprocessorHost; + } + + @Override + public Long startCacheFlush(byte[] encodedRegionName, Set families) { + return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); + } + + @Override + public Long startCacheFlush(byte[] encodedRegionName, Map familyToSeq) { + return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); + } + + @Override + public void completeCacheFlush(byte[] encodedRegionName) { + this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); + } + + @Override + public void abortCacheFlush(byte[] encodedRegionName) { + this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); + } + + @Override + public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { + // Used by tests. Deprecated as too subtle for general usage. + return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); + } + + @Override + public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { + return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); + } + + /** + * This is a convenience method that computes a new filename with a given file-number. + * @param filenum to use + * @return WALInfo + */ + protected WALInfo computeLogName(final long filenum) { + if (filenum < 0) { + throw new RuntimeException("WAL file number can't be < 0"); + } + String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix; + return new WalInfoImpl(child); + } + + /** + * This is a convenience method that computes a new filename with a given using the current WAL + * file-number + * @return WALInfo + */ + public WALInfo getCurrentFileName() { + return computeLogName(this.filenum.get()); + } + + /** + * retrieve the next path to use for writing. Increments the internal filenum. + */ + private WALInfo getNewPath() throws IOException { + this.filenum.set(System.currentTimeMillis()); + WALInfo newPath = getCurrentFileName(); + while (metaDataProvider.exists(newPath.getName())) { + this.filenum.incrementAndGet(); + newPath = getCurrentFileName(); + } + return newPath; + } + + @VisibleForTesting + WALInfo getOldPath() { + long currentFilenum = this.filenum.get(); + WALInfo oldPath = null; + if (currentFilenum > 0) { + // ComputeFilename will take care of meta wal filename + oldPath = computeLogName(currentFilenum); + } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? + return oldPath; + } + + /** + * Tell listeners about post log roll. + */ + private void tellListenersAboutPostLogRoll(final WALInfo oldPath, final WALInfo newPath) + throws IOException { + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.postLogRoll(oldPath, newPath); + } + } + coprocessorHost.postWALRoll(oldPath, newPath); + } + + /** + * Tell listeners about pre log roll. + */ + private void tellListenersAboutPreLogRoll(final WALInfo oldPath, final WALInfo newPath) + throws IOException { + coprocessorHost.preWALRoll(oldPath, newPath); + + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.preLogRoll(oldPath, newPath); + } + } + } + + public int getLogFileSize() { + return activeWal.size(); + } + + public int getNumLogFiles() { + return metaDataProvider.getNumLogFiles(); + } + + public static class ListReader implements Reader { + + private List list; + int count = 0; + + public ListReader(WALInfo info, ListWalMetaDataProvider metaDataProvider) { + list = metaDataProvider.get(info); + } + + @Override + public void close() throws IOException { + + } + + @Override + public Entry next() throws IOException { + if (count >= list.size()) { + return null; + } + Entry entry = list.get(count); + ++count; + WALKeyImpl key = entry.getKey(); + WALKeyImpl keyModify = new WALKeyImpl(key.getEncodedRegionName(), key.getTableName(), + key.getLogSeqNum(), key.getWriteTime(), new ArrayList<>(), key.getNonceGroup(), + key.getNonce(), key.getMvcc(), key.getReplicationScopes()); + // Modifying Key with modifiable list of UUIDs , so that replication can edit them + return new Entry(keyModify, entry.getEdit()); + } + + @Override + public Entry next(Entry reuse) throws IOException { + return next(); + } + + @Override + public void seek(long pos) throws IOException { + count = (int)pos; + } + + @Override + public long getPosition() throws IOException { + return count; + } + + @Override + public void reset() throws IOException { + seek(0); + } + + } + +} \ No newline at end of file diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/wal/ListWalProvider.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/wal/ListWalProvider.java new file mode 100644 index 0000000..6bfaf7b --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/wal/ListWalProvider.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.wal.ListWal; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WalInfoImpl; +import org.apache.hadoop.hbase.replication.regionserver.AbstractWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ListWalProvider implements WALProvider { + + private Configuration conf; + private String providerId; + protected List listeners = new ArrayList<>(); + protected AtomicBoolean initialized = new AtomicBoolean(false); + private String logPrefix; + private final Object walCreateLock = new Object(); + public static final String WAL_FILE_NAME_DELIMITER = "."; + public static final String META_WAL_PROVIDER_ID = ".meta"; + protected volatile ListWal wal; + private ListWalMetaDataProvider listWalMetaDataProvider; + + @Override + public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { + if (!initialized.compareAndSet(false, true)) { + throw new IllegalStateException("WALProvider.init should only be called once."); + } + this.conf = conf; + this.providerId = providerId; + // get log prefix + StringBuilder sb = new StringBuilder().append(factory.factoryId); + if (providerId != null) { + if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) { + sb.append(providerId); + } else { + sb.append(WAL_FILE_NAME_DELIMITER).append(providerId); + } + } + logPrefix = sb.toString(); + listWalMetaDataProvider = new ListWalMetaDataProvider(); + } + + @Override + public WAL getWAL(RegionInfo region) throws IOException { + ListWal walCopy = wal; + if (walCopy == null) { + // only lock when need to create wal, and need to lock since + // creating hlog on fs is time consuming + synchronized (walCreateLock) { + walCopy = wal; + if (walCopy == null) { + walCopy = createWAL(); + boolean succ = false; + try { + walCopy.init(); + succ = true; + } finally { + if (!succ) { + walCopy.close(); + } + } + wal = walCopy; + } + } + } + return walCopy; + } + + private ListWal createWAL() throws IOException { + return new ListWal(conf, logPrefix, + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, + listWalMetaDataProvider, listeners); + } + + @Override + public List getWALs() { + if (wal == null) { + return Collections.emptyList(); + } + List wals = new ArrayList<>(1); + wals.add(wal); + return wals; + } + + @Override + public void shutdown() throws IOException { + WAL log = this.wal; + if (log != null) { + log.shutdown(); + } + } + + @Override + public void close() throws IOException { + WAL log = this.wal; + if (log != null) { + log.close(); + } + } + + @Override + public long getNumLogFiles() { + ListWal log = this.wal; + return log == null ? 0 : log.getNumLogFiles(); + } + + /** + * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the + * size of files (only rolled). if either of them aren't, count 0 for that provider. + */ + @Override + public long getLogFileSize() { + ListWal log = this.wal; + return log == null ? 0 : log.getLogFileSize(); + } + + @Override + public void addWALActionsListener(WALActionsListener listener) { + listeners.add(listener); + } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + return new AbstractWALEntryStream(logQueue, conf, startPosition, walFileLengthProvider, + serverName, metrics) { + + @Override + protected void handleIOException(WALInfo walInfo, IOException e) throws IOException { + throw e; + } + + @Override + protected Reader createReader(WALInfo walInfo, Configuration conf) throws IOException { + return listWalMetaDataProvider.createReader(walInfo); + } + }; + } + + @Override + public WALMetaDataProvider getWalMetaDataProvider() throws IOException { + return listWalMetaDataProvider; + } + + public class ListWalMetaDataProvider implements WALMetaDataProvider { + ConcurrentHashMap> map = new ConcurrentHashMap>(); + + @Override + public boolean exists(String log) throws IOException { + return map.containsKey(new WalInfoImpl(log)); + } + + public Reader createReader(WALInfo walInfo) { + return new ListWal.ListReader(walInfo, this); + } + + @Override + public WALInfo[] list(WALInfo walInfo) throws IOException { + WALInfo[] walInfos = new WALInfo[1]; + walInfos[0] = walInfo; + return walInfos; + } + + public List createList(WALInfo info) { + List list = map.putIfAbsent(info, new ArrayList()); + if (list == null) { + return map.get(info); + } + return list; + } + + public void clear() { + map.clear(); + } + + public int getNumLogFiles() { + return map.keySet().size(); + } + + public List get(WALInfo walInfo) { + return map.get(walInfo); + } + + } + + @Override + public WALInfo createWalInfo(String wal) { + return new WalInfoImpl(wal); + } + + @Override + public RecoveredReplicationSource getRecoveredReplicationSource() { + return new RecoveredReplicationSource() { + @Override + public void locateRecoveredWALInfos(PriorityBlockingQueue queue) throws IOException { + + } + }; + } + +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index 0dfe863..7a9e37e 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -133,7 +133,7 @@ public class TestWALPlayer { // replay the WAL, map table 1 to table 2 WAL log = cluster.getRegionServer(0).getWAL(null); - log.rollWriter(); + log.rollWriter(false); String walInputDir = new Path(cluster.getMaster().getMasterFileSystem() .getWALRootDir(), HConstants.HREGION_LOGDIR_NAME).toString(); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index 449c4b7..69be98c 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -141,7 +141,7 @@ public class TestWALRecordReader { log.append(info, getWalKeyImpl(ts+1, scopes), edit, true); log.sync(); LOG.info("Before 1st WAL roll " + log.toString()); - log.rollWriter(); + log.rollWriter(false); LOG.info("Past 1st WAL roll " + log.toString()); Thread.sleep(1); @@ -198,7 +198,7 @@ public class TestWALRecordReader { Thread.sleep(1); // make sure 2nd log gets a later timestamp long secondTs = System.currentTimeMillis(); - log.rollWriter(); + log.rollWriter(false); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java index b2fa7ca..1b89d00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/WALObserver.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -96,7 +97,7 @@ public interface WALObserver { * @param newPath the path of the wal we are going to create */ default void preWALRoll(ObserverContext ctx, - Path oldPath, Path newPath) throws IOException {} + WALInfo oldPath, WALInfo newPath) throws IOException {} /** * Called after rolling the current WAL @@ -104,6 +105,6 @@ public interface WALObserver { * @param newPath the path of the wal we have created and now is the current */ default void postWALRoll(ObserverContext ctx, - Path oldPath, Path newPath) throws IOException {} + WALInfo oldPath, WALInfo newPath) throws IOException {} } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 85175be..abeff5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2981,7 +2981,7 @@ public class HRegionServer extends HasThread implements throw new IOException("Could not find class for " + classname); } T service = ReflectionUtils.newInstance(clazz, conf); - service.initialize(server, walFs, logDir, oldLogDir, walProvider); + service.initialize(server, walProvider); return service; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java index e9bbaea..702ace6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java @@ -38,7 +38,7 @@ public interface ReplicationService { * @param walProvider can be null if not initialized inside a live region server environment, for * example, {@code ReplicationSyncUp}. */ - void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, WALProvider walProvider) + void initialize(Server rs, WALProvider walProvider) throws IOException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSProtobufLogWriter.java new file mode 100644 index 0000000..a8f4af8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSProtobufLogWriter.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE; +import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.crypto.Encryptor; +import org.apache.hadoop.hbase.io.util.LRUDictionary; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for Protobuf log writer. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public abstract class AbstractFSProtobufLogWriter extends AbstractProtobufLogWriter { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractFSProtobufLogWriter.class); + + private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException { + boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); + if (doCompress) { + try { + this.compressionContext = new CompressionContext(LRUDictionary.class, + FSUtils.isRecoveredEdits(path), + conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true)); + } catch (Exception e) { + throw new IOException("Failed to initiate CompressionContext", e); + } + } + return doCompress; + } + + public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, + long blocksize) throws IOException, StreamLacksCapabilityException { + this.conf = conf; + boolean doCompress = initializeCompressionContext(conf, path); + this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); + int bufferSize = FSUtils.getDefaultBufferSize(fs); + short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", + FSUtils.getDefaultReplication(fs, path)); + + initOutput(fs, path, overwritable, bufferSize, replication, blocksize); + + boolean doTagCompress = doCompress + && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); + length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf, + WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)))); + + initAfterHeader(doCompress); + + // instantiate trailer to default value. + trailer = WALTrailer.newBuilder().build(); + if (LOG.isTraceEnabled()) { + LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress); + } + } + + private void initAfterHeader0(boolean doCompress) throws IOException { + WALCellCodec codec = getCodec(conf, this.compressionContext); + this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder()); + if (doCompress) { + this.compressor = codec.getByteStringCompressor(); + } else { + this.compressor = WALCellCodec.getNoneCompressor(); + } + } + + protected void initAfterHeader(boolean doCompress) throws IOException { + initAfterHeader0(doCompress); + } + + // should be called in sub classes's initAfterHeader method to init SecureWALCellCodec. + protected final void secureInitAfterHeader(boolean doCompress, Encryptor encryptor) + throws IOException { + if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false) && encryptor != null) { + WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor); + this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder()); + // We do not support compression + this.compressionContext = null; + this.compressor = WALCellCodec.getNoneCompressor(); + } else { + initAfterHeader0(doCompress); + } + } + + protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, + short replication, long blockSize) throws IOException, StreamLacksCapabilityException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 2b45a04..a941267 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -18,13 +18,10 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; -import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; -import com.lmax.disruptor.RingBuffer; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InterruptedIOException; import java.lang.management.MemoryType; import java.net.URLEncoder; import java.util.ArrayList; @@ -33,57 +30,40 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.OptionalLong; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; -import org.apache.commons.lang3.mutable.MutableLong; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.PrivateCellUtil; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.log.HBaseMarkers; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALInfo; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.wal.WALPrettyPrinter; import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; -import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - /** * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. @@ -111,14 +91,10 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti * (Need to keep our own file lengths, not rely on HDFS). */ @InterfaceAudience.Private -public abstract class AbstractFSWAL implements WAL { +public abstract class AbstractFSWAL extends AbstractWAL { private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class); - protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms - - private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min - /** * file system instance */ @@ -140,99 +116,16 @@ public abstract class AbstractFSWAL implements WAL { protected final PathFilter ourFiles; /** - * Prefix of a WAL file, usually the region server name it is hosted on. - */ - protected final String walFilePrefix; - - /** - * Suffix included on generated wal file names - */ - protected final String walFileSuffix; - - /** * Prefix used when checking for wal membership. */ protected final String prefixPathStr; - protected final WALCoprocessorHost coprocessorHost; - - /** - * conf object - */ - protected final Configuration conf; - - /** Listeners that are called on WAL events. */ - protected final List listeners = new CopyOnWriteArrayList<>(); - - /** - * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence - * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has - * facility for answering questions such as "Is it safe to GC a WAL?". - */ - protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); - - protected final long slowSyncNs; - - private final long walSyncTimeoutNs; - // If > than this size, roll the log. - protected final long logrollsize; - - /** - * Block size to use writing files. - */ - protected final long blocksize; - - /* - * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too - * many and we crash, then will take forever replaying. Keep the number of logs tidy. - */ - protected final int maxLogs; - - /** - * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock - * is held. We don't just use synchronized because that results in bogus and tedious findbugs - * warning when it thinks synchronized controls writer thread safety. It is held when we are - * actually rolling the log. It is checked when we are looking to see if we should roll the log or - * not. - */ - protected final ReentrantLock rollWriterLock = new ReentrantLock(true); - - // The timestamp (in ms) when the log file was created. - protected final AtomicLong filenum = new AtomicLong(-1); - - // Number of transactions in the current Wal. - protected final AtomicInteger numEntries = new AtomicInteger(0); - - /** - * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass - * WALEdit to background consumer thread, and the transaction id is the sequence number of the - * corresponding entry in queue. - */ - protected volatile long highestUnsyncedTxid = -1; - - /** - * Updated to the transaction id of the last successful sync call. This can be less than - * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in - * for it. - */ - protected final AtomicLong highestSyncedTxid = new AtomicLong(0); - - /** - * The total size of wal - */ - protected final AtomicLong totalLogSize = new AtomicLong(0); - /** - * Current log file. - */ - volatile W writer; + protected long logrollsize; // Last time to check low replication on hlog's pipeline private long lastTimeCheckLowReplication = EnvironmentEdgeManager.currentTime(); - protected volatile boolean closed = false; - - protected final AtomicBoolean shutdown = new AtomicBoolean(false); /** * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws * an IllegalArgumentException if used to compare paths from different wals. @@ -240,26 +133,6 @@ public abstract class AbstractFSWAL implements WAL { final Comparator LOG_NAME_COMPARATOR = (o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); - private static final class WalProps { - - /** - * Map the encoded region name to the highest sequence id. Contain all the regions it has - * entries of - */ - public final Map encodedName2HighestSequenceId; - - /** - * The log file size. Notice that the size may not be accurate if we do asynchronous close in - * sub classes. - */ - public final long logSize; - - public WalProps(Map encodedName2HighestSequenceId, long logSize) { - this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; - this.logSize = logSize; - } - } - /** * Map of WAL log file to properties. The map is sorted by the log file creation timestamp * (contained in the log file name). @@ -267,82 +140,14 @@ public abstract class AbstractFSWAL implements WAL { protected ConcurrentNavigableMap walFile2Props = new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR); - /** - * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures. - *

- * TODO: Reuse FSWALEntry's rather than create them anew each time as we do SyncFutures here. - *

- * TODO: Add a FSWalEntry and SyncFuture as thread locals on handlers rather than have them get - * them from this Map? - */ - private final ConcurrentMap syncFuturesByHandler; - - /** - * The class name of the runtime implementation, used as prefix for logging/tracing. - *

- * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here, - * refer to HBASE-17676 for more details - *

- */ - protected final String implClassName; - - public long getFilenum() { - return this.filenum.get(); - } - - /** - * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper - * method returns the creation timestamp from a given log file. It extracts the timestamp assuming - * the filename is created with the {@link #computeFilename(long filenum)} method. - * @return timestamp, as in the log file name. - */ - protected long getFileNumFromFileName(Path fileName) { - checkNotNull(fileName, "file name can't be null"); - if (!ourFiles.accept(fileName)) { - throw new IllegalArgumentException( - "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")"); - } - final String fileNameString = fileName.toString(); - String chompedPath = fileNameString.substring(prefixPathStr.length(), - (fileNameString.length() - walFileSuffix.length())); - return Long.parseLong(chompedPath); - } - - private int calculateMaxLogFiles(Configuration conf, long logRollSize) { - Pair globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf); - return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize); - } - - // must be power of 2 - protected final int getPreallocatedEventCount() { - // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will - // be stuck and make no progress if the buffer is filled with appends only and there is no - // sync. If no sync, then the handlers will be outstanding just waiting on sync completion - // before they return. - int preallocatedEventCount = - this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); - checkArgument(preallocatedEventCount >= 0, - "hbase.regionserver.wal.disruptor.event.count must > 0"); - int floor = Integer.highestOneBit(preallocatedEventCount); - if (floor == preallocatedEventCount) { - return floor; - } - // max capacity is 1 << 30 - if (floor >= 1 << 29) { - return 1 << 30; - } - return floor << 1; - } - protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir, final String archiveDir, final Configuration conf, final List listeners, final boolean failIfWALExists, final String prefix, final String suffix) throws FailedLogCloseException, IOException { - this.fs = fs; + super(conf, listeners, prefix, suffix); this.walDir = new Path(rootDir, logDir); this.walArchiveDir = new Path(rootDir, archiveDir); - this.conf = conf; - + this.fs = fs; if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { throw new IOException("Unable to mkdir " + walDir); } @@ -353,20 +158,6 @@ public abstract class AbstractFSWAL implements WAL { } } - // If prefix is null||empty then just name it wal - this.walFilePrefix = - prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); - // we only correctly differentiate suffices when numeric ones start with '.' - if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { - throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER + - "' but instead was '" + suffix + "'"); - } - // Now that it exists, set the storage policy for the entire directory of wal files related to - // this FSHLog instance - String storagePolicy = - conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); - CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy); - this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); this.ourFiles = new PathFilter() { @@ -387,6 +178,19 @@ public abstract class AbstractFSWAL implements WAL { return true; } }; + // If prefix is null||empty then just name it wal + this.walFilePrefix = + prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); + // we only correctly differentiate suffices when numeric ones start with '.' + if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { + throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER + + "' but instead was '" + suffix + "'"); + } + // Now that it exists, set the storage policy for the entire directory of wal files related to + // this FSHLog instance + String storagePolicy = + conf.get(HConstants.WAL_STORAGE_POLICY, HConstants.DEFAULT_WAL_STORAGE_POLICY); + CommonFSUtils.setStoragePolicy(fs, this.walDir, storagePolicy); if (failIfWALExists) { final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles); @@ -395,14 +199,6 @@ public abstract class AbstractFSWAL implements WAL { } } - // Register listeners. TODO: Should this exist anymore? We have CPs? - if (listeners != null) { - for (WALActionsListener i : listeners) { - registerWALActionsListener(i); - } - } - this.coprocessorHost = new WALCoprocessorHost(this, conf); - // Schedule a WAL roll when the WAL is 50% of the HDFS block size. Scheduling at 50% of block // size should make it so WAL rolls before we get to the end-of-block (Block transitions cost // some latency). In hbase-1 we did this differently. We scheduled a roll when we hit 95% of @@ -421,84 +217,6 @@ public abstract class AbstractFSWAL implements WAL { } this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", Math.max(32, calculateMaxLogFiles(conf, logrollsize))); - - LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + - StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" + - walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir); - this.slowSyncNs = TimeUnit.MILLISECONDS - .toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)); - this.walSyncTimeoutNs = TimeUnit.MILLISECONDS - .toNanos(conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)); - int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); - // Presize our map of SyncFutures by handler objects. - this.syncFuturesByHandler = new ConcurrentHashMap<>(maxHandlersCount); - this.implClassName = getClass().getSimpleName(); - } - - /** - * Used to initialize the WAL. Usually just call rollWriter to create the first log writer. - */ - public void init() throws IOException { - rollWriter(); - } - - @Override - public void registerWALActionsListener(WALActionsListener listener) { - this.listeners.add(listener); - } - - @Override - public boolean unregisterWALActionsListener(WALActionsListener listener) { - return this.listeners.remove(listener); - } - - @Override - public WALCoprocessorHost getCoprocessorHost() { - return coprocessorHost; - } - - @Override - public Long startCacheFlush(byte[] encodedRegionName, Set families) { - return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); - } - - @Override - public Long startCacheFlush(byte[] encodedRegionName, Map familyToSeq) { - return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); - } - - @Override - public void completeCacheFlush(byte[] encodedRegionName) { - this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); - } - - @Override - public void abortCacheFlush(byte[] encodedRegionName) { - this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); - } - - @Override - public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { - // Used by tests. Deprecated as too subtle for general usage. - return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); - } - - @Override - public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { - // This method is used by tests and for figuring if we should flush or not because our - // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use - // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId - // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the - // currently flushing sequence ids, and if anything found there, it is returning these. This is - // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if - // we crash during the flush. For figuring what to flush, we might get requeued if our sequence - // id is old even though we are currently flushing. This may mean we do too much flushing. - return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); - } - - @Override - public byte[][] rollWriter() throws FailedLogCloseException, IOException { - return rollWriter(false); } /** @@ -523,19 +241,6 @@ public abstract class AbstractFSWAL implements WAL { return computeFilename(this.filenum.get()); } - /** - * retrieve the next path to use for writing. Increments the internal filenum. - */ - private Path getNewPath() throws IOException { - this.filenum.set(System.currentTimeMillis()); - Path newPath = getCurrentFileName(); - while (fs.exists(newPath)) { - this.filenum.incrementAndGet(); - newPath = getCurrentFileName(); - } - return newPath; - } - @VisibleForTesting Path getOldPath() { long currentFilenum = this.filenum.get(); @@ -548,31 +253,26 @@ public abstract class AbstractFSWAL implements WAL { } /** - * Tell listeners about pre log roll. - */ - private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) - throws IOException { - coprocessorHost.preWALRoll(oldPath, newPath); - - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.preLogRoll(oldPath, newPath); - } - } - } - - /** - * Tell listeners about post log roll. + * if the given {@code path} is being written currently, then return its length. + *

+ * This is used by replication to prevent replicating unacked log entries. See + * https://issues.apache.org/jira/browse/HBASE-14004 for more details. */ - private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) - throws IOException { - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.postLogRoll(oldPath, newPath); + @Override + public OptionalLong getLogFileSizeIfBeingWritten(WALInfo path) { + rollWriterLock.lock(); + try { + Path currentPath = getOldPath(); + if (path instanceof FSWALInfo && + ((FSWALInfo)path).getPath().equals(currentPath)) { + W writer = this.writer; + return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty(); + } else { + return OptionalLong.empty(); } + } finally { + rollWriterLock.unlock(); } - - coprocessorHost.postWALRoll(oldPath, newPath); } // public only until class moves to o.a.h.h.wal @@ -616,49 +316,63 @@ public abstract class AbstractFSWAL implements WAL { return regions; } + protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) { + int oldNumEntries = this.numEntries.getAndSet(0); + String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; + if (oldPath != null) { + this.walFile2Props.put(oldPath, + new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); + this.totalLogSize.addAndGet(oldFileLen); + LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", + CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), + newPathString); + } else { + LOG.info("New WAL {}", newPathString); + } + } + /** - * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. + * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper + * method returns the creation timestamp from a given log file. It extracts the timestamp assuming + * the filename is created with the {@link #computeFilename(long filenum)} method. + * @return timestamp, as in the log file name. */ - private void cleanOldLogs() throws IOException { - List> logsToArchive = null; - // For each log file, look at its Map of regions to highest sequence id; if all sequence ids - // are older than what is currently in memory, the WAL can be GC'd. - for (Map.Entry e : this.walFile2Props.entrySet()) { - Path log = e.getKey(); - Map sequenceNums = e.getValue().encodedName2HighestSequenceId; - if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { - if (logsToArchive == null) { - logsToArchive = new ArrayList<>(); - } - logsToArchive.add(Pair.newPair(log, e.getValue().logSize)); - if (LOG.isTraceEnabled()) { - LOG.trace("WAL file ready for archiving " + log); - } - } - } - if (logsToArchive != null) { - for (Pair logAndSize : logsToArchive) { - this.totalLogSize.addAndGet(-logAndSize.getSecond()); - archiveLogFile(logAndSize.getFirst()); - this.walFile2Props.remove(logAndSize.getFirst()); - } + protected long getFileNumFromFileName(Path fileName) { + checkNotNull(fileName, "file name can't be null"); + if (!ourFiles.accept(fileName)) { + throw new IllegalArgumentException( + "The log file " + fileName + " doesn't belong to this WAL. (" + toString() + ")"); } + final String fileNameString = fileName.toString(); + String chompedPath = fileNameString.substring(prefixPathStr.length(), + (fileNameString.length() - walFileSuffix.length())); + return Long.parseLong(chompedPath); } - /* - * only public so WALSplitter can use. - * @return archived location of a WAL file with the given path p + private int calculateMaxLogFiles(Configuration conf, long logRollSize) { + Pair globalMemstoreSize = MemorySizeUtil.getGlobalMemStoreSize(conf); + return (int) ((globalMemstoreSize.getFirst() * 2) / logRollSize); + } + + /** + * retrieve the next path to use for writing. Increments the internal filenum. */ - public static Path getWALArchivePath(Path archiveDir, Path p) { - return new Path(archiveDir, p.getName()); + private Path getNewPath() throws IOException { + this.filenum.set(System.currentTimeMillis()); + Path newPath = getCurrentFileName(); + while (fs.exists(newPath)) { + this.filenum.incrementAndGet(); + newPath = getCurrentFileName(); + } + return newPath; } private void archiveLogFile(final Path p) throws IOException { Path newPath = getWALArchivePath(this.walArchiveDir, p); // Tell our listeners that a log is going to be archived. if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.preLogArchive(p, newPath); + for (Object i : this.listeners) { + ((WALActionsListener)i).preLogArchive(new FSWALInfo(p), new FSWALInfo(newPath)); } } LOG.info("Archiving " + p + " to " + newPath); @@ -667,88 +381,40 @@ public abstract class AbstractFSWAL implements WAL { } // Tell our listeners that a log has been archived. if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.postLogArchive(p, newPath); + for (Object i : this.listeners) { + ((WALActionsListener)i).postLogArchive(new FSWALInfo(p), new FSWALInfo(newPath)); } } } - protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long oldFileLen) { - int oldNumEntries = this.numEntries.getAndSet(0); - String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null; - if (oldPath != null) { - this.walFile2Props.put(oldPath, - new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen)); - this.totalLogSize.addAndGet(oldFileLen); - LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}", - CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen), - newPathString); - } else { - LOG.info("New WAL {}", newPathString); - } - } - /** - *

- * Cleans up current writer closing it and then puts in place the passed in - * nextWriter. - *

- *

- *

    - *
  • In the case of creating a new WAL, oldPath will be null.
  • - *
  • In the case of rolling over from one file to the next, none of the parameters will be null. - *
  • - *
  • In the case of closing out this FSHLog with no further use newPath and nextWriter will be - * null.
  • - *
- *

- * @param oldPath may be null - * @param newPath may be null - * @param nextWriter may be null - * @return the passed in newPath - * @throws IOException if there is a problem flushing or closing the underlying FS + * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. */ - @VisibleForTesting - Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { - try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) { - doReplaceWriter(oldPath, newPath, nextWriter); - return newPath; - } - } - - protected final void blockOnSync(SyncFuture syncFuture) throws IOException { - // Now we have published the ringbuffer, halt the current thread until we get an answer back. - try { - if (syncFuture != null) { - if (closed) { - throw new IOException("WAL has been closed"); - } else { - syncFuture.get(walSyncTimeoutNs); + protected void cleanOldLogs() throws IOException { + List> logsToArchive = null; + // For each log file, look at its Map of regions to highest sequence id; if all sequence ids + // are older than what is currently in memory, the WAL can be GC'd. + for (Object o : this.walFile2Props.entrySet()) { + Map.Entry e = (Map.Entry) o; + Path log = e.getKey(); + Map sequenceNums = e.getValue().encodedName2HighestSequenceId; + if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { + if (logsToArchive == null) { + logsToArchive = new ArrayList<>(); + } + logsToArchive.add(Pair.newPair(log, e.getValue().logSize)); + if (LOG.isTraceEnabled()) { + LOG.trace("WAL file ready for archiving " + log); } } - } catch (TimeoutIOException tioe) { - // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer - // still refer to it, so if this thread use it next time may get a wrong - // result. - this.syncFuturesByHandler.remove(Thread.currentThread()); - throw tioe; - } catch (InterruptedException ie) { - LOG.warn("Interrupted", ie); - throw convertInterruptedExceptionToIOException(ie); - } catch (ExecutionException e) { - throw ensureIOException(e.getCause()); } - } - - private static IOException ensureIOException(final Throwable t) { - return (t instanceof IOException) ? (IOException) t : new IOException(t); - } - - private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { - Thread.currentThread().interrupt(); - IOException ioe = new InterruptedIOException(); - ioe.initCause(ie); - return ioe; + if (logsToArchive != null) { + for (Pair logAndSize : logsToArchive) { + this.totalLogSize.addAndGet(-logAndSize.getSecond()); + archiveLogFile(logAndSize.getFirst()); + this.walFile2Props.remove(logAndSize.getFirst()); + } + } } @Override @@ -767,11 +433,11 @@ public abstract class AbstractFSWAL implements WAL { Path oldPath = getOldPath(); Path newPath = getNewPath(); // Any exception from here on is catastrophic, non-recoverable so we currently abort. - W nextWriter = this.createWriterInstance(newPath); - tellListenersAboutPreLogRoll(oldPath, newPath); + W nextWriter = (W) this.createWriterInstance(newPath); + tellListenersAboutPreLogRoll(new FSWALInfo(oldPath), new FSWALInfo(newPath)); // NewPath could be equal to oldPath if replaceWriter fails. newPath = replaceWriter(oldPath, newPath, nextWriter); - tellListenersAboutPostLogRoll(oldPath, newPath); + tellListenersAboutPostLogRoll(new FSWALInfo(oldPath), new FSWALInfo(newPath)); if (LOG.isDebugEnabled()) { LOG.debug("Create new " + implClassName + " writer with pipeline: " + Arrays.toString(getPipeline())); @@ -794,17 +460,6 @@ public abstract class AbstractFSWAL implements WAL { } } - // public only until class moves to o.a.h.h.wal - /** @return the size of log files in use */ - public long getLogFileSize() { - return this.totalLogSize.get(); - } - - // public only until class moves to o.a.h.h.wal - public void requestLogRoll() { - requestLogRoll(false); - } - /** * Get the backing files associated with this WAL. * @return may be null if there are no files. @@ -814,24 +469,12 @@ public abstract class AbstractFSWAL implements WAL { return CommonFSUtils.listStatus(fs, walDir, ourFiles); } - @Override - public void shutdown() throws IOException { - if (!shutdown.compareAndSet(false, true)) { - return; - } - closed = true; - // Tell our listeners that the log is closing - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.logCloseRequested(); - } - } - rollWriterLock.lock(); - try { - doShutdown(); - } finally { - rollWriterLock.unlock(); - } + /* + * only public so WALSplitter can use. + * @return archived location of a WAL file with the given path p + */ + public static Path getWALArchivePath(Path archiveDir, Path p) { + return new Path(archiveDir, p.getName()); } @Override @@ -843,8 +486,8 @@ public abstract class AbstractFSWAL implements WAL { Path p = getWALArchivePath(this.walArchiveDir, file.getPath()); // Tell our listeners that a log is going to be archived. if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.preLogArchive(file.getPath(), p); + for (Object i : this.listeners) { + ((WALActionsListener)i).preLogArchive(new FSWALInfo(file.getPath()), new FSWALInfo(p)); } } @@ -853,8 +496,8 @@ public abstract class AbstractFSWAL implements WAL { } // Tell our listeners that a log was archived. if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.postLogArchive(file.getPath(), p); + for (Object i : this.listeners) { + ((WALActionsListener)i).postLogArchive(new FSWALInfo(file.getPath()), new FSWALInfo(p)); } } } @@ -864,220 +507,6 @@ public abstract class AbstractFSWAL implements WAL { LOG.info("Closed WAL: " + toString()); } - /** - * updates the sequence number of a specific store. depending on the flag: replaces current seq - * number if the given seq id is bigger, or even if it is lower than existing one - * @param encodedRegionName - * @param familyName - * @param sequenceid - * @param onlyIfGreater - */ - @Override - public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, - boolean onlyIfGreater) { - sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); - } - - protected final SyncFuture getSyncFuture(long sequence) { - return CollectionUtils - .computeIfAbsent(syncFuturesByHandler, Thread.currentThread(), SyncFuture::new) - .reset(sequence); - } - - protected final void requestLogRoll(boolean tooFewReplicas) { - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.logRollRequested(tooFewReplicas); - } - } - } - - long getUnflushedEntriesCount() { - long highestSynced = this.highestSyncedTxid.get(); - long highestUnsynced = this.highestUnsyncedTxid; - return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced; - } - - boolean isUnflushedEntries() { - return getUnflushedEntriesCount() > 0; - } - - /** - * Exposed for testing only. Use to tricks like halt the ring buffer appending. - */ - @VisibleForTesting - void atHeadOfRingBufferEventHandlerAppend() { - // Noop - } - - protected final boolean append(W writer, FSWALEntry entry) throws IOException { - // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. - atHeadOfRingBufferEventHandlerAppend(); - long start = EnvironmentEdgeManager.currentTime(); - byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); - long regionSequenceId = entry.getKey().getSequenceId(); - - // Edits are empty, there is nothing to append. Maybe empty when we are looking for a - // region sequence id only, a region edit/sequence id that is not associated with an actual - // edit. It has to go through all the rigmarole to be sure we have the right ordering. - if (entry.getEdit().isEmpty()) { - return false; - } - - // Coprocessor hook. - coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); - if (!listeners.isEmpty()) { - for (WALActionsListener i : listeners) { - i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit()); - } - } - doAppend(writer, entry); - assert highestUnsyncedTxid < entry.getTxid(); - highestUnsyncedTxid = entry.getTxid(); - sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, - entry.isInMemStore()); - coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); - // Update metrics. - postAppend(entry, EnvironmentEdgeManager.currentTime() - start); - numEntries.incrementAndGet(); - return true; - } - - private long postAppend(final Entry e, final long elapsedTime) throws IOException { - long len = 0; - if (!listeners.isEmpty()) { - for (Cell cell : e.getEdit().getCells()) { - len += PrivateCellUtil.estimatedSerializedSizeOf(cell); - } - for (WALActionsListener listener : listeners) { - listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); - } - } - return len; - } - - protected final void postSync(final long timeInNanos, final int handlerSyncs) { - if (timeInNanos > this.slowSyncNs) { - String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000) - .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString(); - TraceUtil.addTimelineAnnotation(msg); - LOG.info(msg); - } - if (!listeners.isEmpty()) { - for (WALActionsListener listener : listeners) { - listener.postSync(timeInNanos, handlerSyncs); - } - } - } - - protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, - WALEdit edits, boolean inMemstore, RingBuffer ringBuffer) - throws IOException { - if (this.closed) { - throw new IOException( - "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); - } - MutableLong txidHolder = new MutableLong(); - MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { - txidHolder.setValue(ringBuffer.next()); - }); - long txid = txidHolder.longValue(); - try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { - FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); - entry.stampRegionSequenceId(we); - ringBuffer.get(txid).load(entry); - } finally { - ringBuffer.publish(txid); - } - return txid; - } - - @Override - public String toString() { - return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; - } - - /** - * if the given {@code path} is being written currently, then return its length. - *

- * This is used by replication to prevent replicating unacked log entries. See - * https://issues.apache.org/jira/browse/HBASE-14004 for more details. - */ - @Override - public OptionalLong getLogFileSizeIfBeingWritten(Path path) { - rollWriterLock.lock(); - try { - Path currentPath = getOldPath(); - if (path.equals(currentPath)) { - W writer = this.writer; - return writer != null ? OptionalLong.of(writer.getLength()) : OptionalLong.empty(); - } else { - return OptionalLong.empty(); - } - } finally { - rollWriterLock.unlock(); - } - } - - /** - * NOTE: This append, at a time that is usually after this call returns, starts an mvcc - * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment - * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must - * 'complete' the transaction this mvcc transaction by calling - * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it - * in the finally of a try/finally block within which this append lives and any subsequent - * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the - * passed in WALKey walKey parameter. Be warned that the WriteEntry is not - * immediately available on return from this method. It WILL be available subsequent to a sync of - * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. - */ - @Override - public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) - throws IOException; - - protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; - - protected abstract W createWriterInstance(Path path) - throws IOException, CommonFSUtils.StreamLacksCapabilityException; - - protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) - throws IOException; - - protected abstract void doShutdown() throws IOException; - - protected abstract boolean doCheckLogLowReplication(); - - public void checkLogLowReplication(long checkInterval) { - long now = EnvironmentEdgeManager.currentTime(); - if (now - lastTimeCheckLowReplication < checkInterval) { - return; - } - // Will return immediately if we are in the middle of a WAL log roll currently. - if (!rollWriterLock.tryLock()) { - return; - } - try { - lastTimeCheckLowReplication = now; - if (doCheckLogLowReplication()) { - requestLogRoll(true); - } - } finally { - rollWriterLock.unlock(); - } - } - - /** - * This method gets the pipeline for the current WAL. - */ - @VisibleForTesting - abstract DatanodeInfo[] getPipeline(); - - /** - * This method gets the datanode replication count for the current WAL. - */ - @VisibleForTesting - abstract int getLogReplication(); - private static void split(final Configuration conf, final Path p) throws IOException { FileSystem fs = FSUtils.getWALFileSystem(conf); if (!fs.exists(p)) { @@ -1141,4 +570,54 @@ public abstract class AbstractFSWAL implements WAL { System.exit(-1); } } + + protected abstract W createWriterInstance(Path path) + throws IOException, CommonFSUtils.StreamLacksCapabilityException; + + protected abstract boolean doCheckLogLowReplication(); + + public void checkLogLowReplication(long checkInterval) { + long now = EnvironmentEdgeManager.currentTime(); + if (now - lastTimeCheckLowReplication < checkInterval) { + return; + } + // Will return immediately if we are in the middle of a WAL log roll currently. + if (!rollWriterLock.tryLock()) { + return; + } + try { + lastTimeCheckLowReplication = now; + if (doCheckLogLowReplication()) { + requestLogRoll(true); + } + } finally { + rollWriterLock.unlock(); + } + } + + /** + * This method gets the pipeline for the current WAL. + */ + @VisibleForTesting + abstract DatanodeInfo[] getPipeline(); + + protected final void postSync(final long timeInNanos, final int handlerSyncs) { + if (timeInNanos > this.slowSyncNs) { + String msg = new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000) + .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString(); + TraceUtil.addTimelineAnnotation(msg); + LOG.info(msg); + } + if (!listeners.isEmpty()) { + for (WALActionsListener listener : listeners) { + listener.postSync(timeInNanos, handlerSyncs); + } + } + } + + /** + * This method gets the datanode replication count for the current WAL. + */ + @VisibleForTesting + abstract int getLogReplication(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index ae084a4..7fa35ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE; -import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE; - import java.io.IOException; import java.io.OutputStream; import java.security.Key; @@ -29,8 +26,6 @@ import java.util.concurrent.atomic.AtomicLong; import javax.crypto.spec.SecretKeySpec; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; @@ -40,15 +35,12 @@ import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.crypto.Cipher; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.crypto.Encryptor; -import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; -import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.util.EncryptionTest; -import org.apache.hadoop.hbase.util.FSUtils; /** * Base class for Protobuf log writer. @@ -70,7 +62,7 @@ public abstract class AbstractProtobufLogWriter { protected AtomicLong length = new AtomicLong(); - private WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext) + protected WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext) throws IOException { return WALCellCodec.create(conf, null, compressionContext); } @@ -139,73 +131,6 @@ public abstract class AbstractProtobufLogWriter { return getClass().getSimpleName(); } - private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException { - boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); - if (doCompress) { - try { - this.compressionContext = new CompressionContext(LRUDictionary.class, - FSUtils.isRecoveredEdits(path), - conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true)); - } catch (Exception e) { - throw new IOException("Failed to initiate CompressionContext", e); - } - } - return doCompress; - } - - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, - long blocksize) throws IOException, StreamLacksCapabilityException { - this.conf = conf; - boolean doCompress = initializeCompressionContext(conf, path); - this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); - int bufferSize = FSUtils.getDefaultBufferSize(fs); - short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", - FSUtils.getDefaultReplication(fs, path)); - - initOutput(fs, path, overwritable, bufferSize, replication, blocksize); - - boolean doTagCompress = doCompress - && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); - length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf, - WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)))); - - initAfterHeader(doCompress); - - // instantiate trailer to default value. - trailer = WALTrailer.newBuilder().build(); - if (LOG.isTraceEnabled()) { - LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress); - } - } - - private void initAfterHeader0(boolean doCompress) throws IOException { - WALCellCodec codec = getCodec(conf, this.compressionContext); - this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder()); - if (doCompress) { - this.compressor = codec.getByteStringCompressor(); - } else { - this.compressor = WALCellCodec.getNoneCompressor(); - } - } - - protected void initAfterHeader(boolean doCompress) throws IOException { - initAfterHeader0(doCompress); - } - - // should be called in sub classes's initAfterHeader method to init SecureWALCellCodec. - protected final void secureInitAfterHeader(boolean doCompress, Encryptor encryptor) - throws IOException { - if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false) && encryptor != null) { - WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, encryptor); - this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder()); - // We do not support compression - this.compressionContext = null; - this.compressor = WALCellCodec.getNoneCompressor(); - } else { - initAfterHeader0(doCompress); - } - } - void setWALTrailer(WALTrailer walTrailer) { this.trailer = walTrailer; } @@ -238,9 +163,6 @@ public abstract class AbstractProtobufLogWriter { } } - protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException, StreamLacksCapabilityException; - /** * return the file length after written. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractWAL.java new file mode 100644 index 0000000..b96de96 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractWAL.java @@ -0,0 +1,605 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; +import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.URLEncoder; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALInfo; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; +import org.apache.hadoop.util.StringUtils; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.htrace.core.TraceScope; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.lmax.disruptor.RingBuffer; + +/** + *

+ * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a + * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. + * A bunch of work in the below is done keeping account of these region sequence ids -- what is + * flushed out to hfiles, and what is yet in WAL and in memory only. + *

+ * It is only practical to delete entire files. Thus, we delete an entire on-disk file + * F when all of the edits in F have a log-sequence-id that's older + * (smaller) than the most-recent flush. + *

+ *

Failure Semantic

If an exception on append or sync, roll the WAL because the current WAL + * is now a lame duck; any more appends or syncs will fail also with the same original exception. If + * we have made successful appends to the WAL and we then are unable to sync them, our current + * semantic is to return error to the client that the appends failed but also to abort the current + * context, usually the hosting server. We need to replay the WALs.
+ * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client + * that the append failed.
+ * TODO: replication may pick up these last edits though they have been marked as failed append + * (Need to keep our own file lengths, not rely on HDFS). + */ +@InterfaceAudience.Private +public abstract class AbstractWAL implements WAL { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractWAL.class); + + protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms + + private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min + + /** + * Prefix of a WAL file, usually the region server name it is hosted on. + */ + protected String walFilePrefix; + + /** + * Suffix included on generated wal file names + */ + protected final String walFileSuffix; + + protected final WALCoprocessorHost coprocessorHost; + + /** + * conf object + */ + protected final Configuration conf; + + /** Listeners that are called on WAL events. */ + protected final List listeners = new CopyOnWriteArrayList<>(); + + /** + * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence + * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has + * facility for answering questions such as "Is it safe to GC a WAL?". + */ + protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); + + protected final long slowSyncNs; + + private final long walSyncTimeoutNs; + + /** + * Block size to use writing files. + */ + protected long blocksize; + + /* + * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too + * many and we crash, then will take forever replaying. Keep the number of logs tidy. + */ + protected int maxLogs; + + /** + * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock + * is held. We don't just use synchronized because that results in bogus and tedious findbugs + * warning when it thinks synchronized controls writer thread safety. It is held when we are + * actually rolling the log. It is checked when we are looking to see if we should roll the log or + * not. + */ + protected final ReentrantLock rollWriterLock = new ReentrantLock(true); + + // The timestamp (in ms) when the log file was created. + protected final AtomicLong filenum = new AtomicLong(-1); + + // Number of transactions in the current Wal. + protected final AtomicInteger numEntries = new AtomicInteger(0); + + /** + * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass + * WALEdit to background consumer thread, and the transaction id is the sequence number of the + * corresponding entry in queue. + */ + protected volatile long highestUnsyncedTxid = -1; + + /** + * Updated to the transaction id of the last successful sync call. This can be less than + * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in + * for it. + */ + protected final AtomicLong highestSyncedTxid = new AtomicLong(0); + + /** + * The total size of wal + */ + protected final AtomicLong totalLogSize = new AtomicLong(0); + /** + * Current log file. + */ + volatile W writer; + + protected volatile boolean closed = false; + + protected final AtomicBoolean shutdown = new AtomicBoolean(false); + protected static final class WalProps { + + /** + * Map the encoded region name to the highest sequence id. Contain all the regions it has + * entries of + */ + public final Map encodedName2HighestSequenceId; + + /** + * The log file size. Notice that the size may not be accurate if we do asynchronous close in + * sub classes. + */ + public final long logSize; + + public WalProps(Map encodedName2HighestSequenceId, long logSize) { + this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; + this.logSize = logSize; + } + } + + /** + * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures. + *

+ * TODO: Reuse FSWALEntry's rather than create them anew each time as we do SyncFutures here. + *

+ * TODO: Add a FSWalEntry and SyncFuture as thread locals on handlers rather than have them get + * them from this Map? + */ + private final ConcurrentMap syncFuturesByHandler; + + /** + * The class name of the runtime implementation, used as prefix for logging/tracing. + *

+ * Performance testing shows getClass().getSimpleName() might be a bottleneck so we store it here, + * refer to HBASE-17676 for more details + *

+ */ + protected final String implClassName; + + public long getFilenum() { + return this.filenum.get(); + } + + // must be power of 2 + protected final int getPreallocatedEventCount() { + // Preallocate objects to use on the ring buffer. The way that appends and syncs work, we will + // be stuck and make no progress if the buffer is filled with appends only and there is no + // sync. If no sync, then the handlers will be outstanding just waiting on sync completion + // before they return. + int preallocatedEventCount = + this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); + checkArgument(preallocatedEventCount >= 0, + "hbase.regionserver.wal.disruptor.event.count must > 0"); + int floor = Integer.highestOneBit(preallocatedEventCount); + if (floor == preallocatedEventCount) { + return floor; + } + // max capacity is 1 << 30 + if (floor >= 1 << 29) { + return 1 << 30; + } + return floor << 1; + } + + protected AbstractWAL( + final Configuration conf, final List listeners, + final String prefix, final String suffix) + throws FailedLogCloseException, IOException { + this.conf = conf; + + // If prefix is null||empty then just name it wal + this.walFilePrefix = + prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); + // we only correctly differentiate suffices when numeric ones start with '.' + if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { + throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER + + "' but instead was '" + suffix + "'"); + } + this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); + + // Register listeners. TODO: Should this exist anymore? We have CPs? + if (listeners != null) { + for (WALActionsListener i : listeners) { + registerWALActionsListener(i); + } + } + this.coprocessorHost = new WALCoprocessorHost(this, conf); + + LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", prefix=" + + this.walFilePrefix + ", suffix=" + walFileSuffix); + this.slowSyncNs = TimeUnit.MILLISECONDS + .toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)); + this.walSyncTimeoutNs = TimeUnit.MILLISECONDS + .toNanos(conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)); + int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); + // Presize our map of SyncFutures by handler objects. + this.syncFuturesByHandler = new ConcurrentHashMap<>(maxHandlersCount); + this.implClassName = getClass().getSimpleName(); + } + + /** + * Used to initialize the WAL. Usually just call rollWriter to create the first log writer. + */ + public void init() throws IOException { + rollWriter(false); + } + + @Override + public void registerWALActionsListener(WALActionsListener listener) { + this.listeners.add(listener); + } + + @Override + public boolean unregisterWALActionsListener(WALActionsListener listener) { + return this.listeners.remove(listener); + } + + @Override + public WALCoprocessorHost getCoprocessorHost() { + return coprocessorHost; + } + + @Override + public Long startCacheFlush(byte[] encodedRegionName, Set families) { + return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); + } + + @Override + public Long startCacheFlush(byte[] encodedRegionName, Map familyToSeq) { + return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); + } + + @Override + public void completeCacheFlush(byte[] encodedRegionName) { + this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); + } + + @Override + public void abortCacheFlush(byte[] encodedRegionName) { + this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); + } + + @Override + public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) { + // Used by tests. Deprecated as too subtle for general usage. + return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); + } + + @Override + public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) { + // This method is used by tests and for figuring if we should flush or not because our + // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use + // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId + // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the + // currently flushing sequence ids, and if anything found there, it is returning these. This is + // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if + // we crash during the flush. For figuring what to flush, we might get requeued if our sequence + // id is old even though we are currently flushing. This may mean we do too much flushing. + return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); + } + + /** + * Tell listeners about pre log roll. + */ + protected void tellListenersAboutPreLogRoll(final WALInfo oldInfo, final WALInfo newInfo) + throws IOException { + coprocessorHost.preWALRoll(oldInfo, newInfo); + + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.preLogRoll(oldInfo, newInfo); + } + } + } + + /** + * Tell listeners about post log roll. + */ + protected void tellListenersAboutPostLogRoll(final WALInfo oldPath, final WALInfo newPath) + throws IOException { + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.postLogRoll(oldPath, newPath); + } + } + + coprocessorHost.postWALRoll(oldPath, newPath); + } + + /** + *

+ * Cleans up current writer closing it and then puts in place the passed in + * nextWriter. + *

+ *

+ *

    + *
  • In the case of creating a new WAL, oldPath will be null.
  • + *
  • In the case of rolling over from one file to the next, none of the parameters will be null. + *
  • + *
  • In the case of closing out this FSHLog with no further use newPath and nextWriter will be + * null.
  • + *
+ *

+ * @param oldPath may be null + * @param newPath may be null + * @param nextWriter may be null + * @return the passed in newPath + * @throws IOException if there is a problem flushing or closing the underlying FS + */ + @VisibleForTesting + Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { + try (TraceScope scope = TraceUtil.createTrace("FSHFile.replaceWriter")) { + doReplaceWriter(oldPath, newPath, nextWriter); + return newPath; + } + } + + protected final void blockOnSync(SyncFuture syncFuture) throws IOException { + // Now we have published the ringbuffer, halt the current thread until we get an answer back. + try { + if (syncFuture != null) { + if (closed) { + throw new IOException("WAL has been closed"); + } else { + syncFuture.get(walSyncTimeoutNs); + } + } + } catch (TimeoutIOException tioe) { + // SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer + // still refer to it, so if this thread use it next time may get a wrong + // result. + this.syncFuturesByHandler.remove(Thread.currentThread()); + throw tioe; + } catch (InterruptedException ie) { + LOG.warn("Interrupted", ie); + throw convertInterruptedExceptionToIOException(ie); + } catch (ExecutionException e) { + throw ensureIOException(e.getCause()); + } + } + + private static IOException ensureIOException(final Throwable t) { + return (t instanceof IOException) ? (IOException) t : new IOException(t); + } + + private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { + Thread.currentThread().interrupt(); + IOException ioe = new InterruptedIOException(); + ioe.initCause(ie); + return ioe; + } + + // public only until class moves to o.a.h.h.wal + /** @return the size of log files in use */ + public long getLogFileSize() { + return this.totalLogSize.get(); + } + + // public only until class moves to o.a.h.h.wal + public void requestLogRoll() { + requestLogRoll(false); + } + + @Override + public void shutdown() throws IOException { + if (!shutdown.compareAndSet(false, true)) { + return; + } + closed = true; + // Tell our listeners that the log is closing + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.logCloseRequested(); + } + } + rollWriterLock.lock(); + try { + doShutdown(); + } finally { + rollWriterLock.unlock(); + } + } + + /** + * updates the sequence number of a specific store. depending on the flag: replaces current seq + * number if the given seq id is bigger, or even if it is lower than existing one + * @param encodedRegionName + * @param familyName + * @param sequenceid + * @param onlyIfGreater + */ + @Override + public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid, + boolean onlyIfGreater) { + sequenceIdAccounting.updateStore(encodedRegionName, familyName, sequenceid, onlyIfGreater); + } + + protected final SyncFuture getSyncFuture(long sequence) { + return CollectionUtils + .computeIfAbsent(syncFuturesByHandler, Thread.currentThread(), SyncFuture::new) + .reset(sequence); + } + + protected final void requestLogRoll(boolean tooFewReplicas) { + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.logRollRequested(tooFewReplicas); + } + } + } + + long getUnflushedEntriesCount() { + long highestSynced = this.highestSyncedTxid.get(); + long highestUnsynced = this.highestUnsyncedTxid; + return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced; + } + + boolean isUnflushedEntries() { + return getUnflushedEntriesCount() > 0; + } + + /** + * Exposed for testing only. Use to tricks like halt the ring buffer appending. + */ + @VisibleForTesting + void atHeadOfRingBufferEventHandlerAppend() { + // Noop + } + + protected final boolean append(W writer, FSWALEntry entry) throws IOException { + // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. + atHeadOfRingBufferEventHandlerAppend(); + long start = EnvironmentEdgeManager.currentTime(); + byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); + long regionSequenceId = entry.getKey().getSequenceId(); + + // Edits are empty, there is nothing to append. Maybe empty when we are looking for a + // region sequence id only, a region edit/sequence id that is not associated with an actual + // edit. It has to go through all the rigmarole to be sure we have the right ordering. + if (entry.getEdit().isEmpty()) { + return false; + } + + // Coprocessor hook. + coprocessorHost.preWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); + if (!listeners.isEmpty()) { + for (WALActionsListener i : listeners) { + i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit()); + } + } + doAppend(writer, entry); + assert highestUnsyncedTxid < entry.getTxid(); + highestUnsyncedTxid = entry.getTxid(); + sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, + entry.isInMemStore()); + coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit()); + // Update metrics. + postAppend(entry, EnvironmentEdgeManager.currentTime() - start); + numEntries.incrementAndGet(); + return true; + } + + private long postAppend(final Entry e, final long elapsedTime) throws IOException { + long len = 0; + if (!listeners.isEmpty()) { + for (Cell cell : e.getEdit().getCells()) { + len += PrivateCellUtil.estimatedSerializedSizeOf(cell); + } + for (WALActionsListener listener : listeners) { + listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); + } + } + return len; + } + + protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, + WALEdit edits, boolean inMemstore, RingBuffer ringBuffer) + throws IOException { + if (this.closed) { + throw new IOException( + "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); + } + MutableLong txidHolder = new MutableLong(); + MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { + txidHolder.setValue(ringBuffer.next()); + }); + long txid = txidHolder.longValue(); + try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { + FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); + entry.stampRegionSequenceId(we); + ringBuffer.get(txid).load(entry); + } finally { + ringBuffer.publish(txid); + } + return txid; + } + + @Override + public String toString() { + return implClassName + " " + walFilePrefix + ":" + walFileSuffix + "(num " + filenum + ")"; + } + + /** + * NOTE: This append, at a time that is usually after this call returns, starts an mvcc + * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment + * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must + * 'complete' the transaction this mvcc transaction by calling + * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it + * in the finally of a try/finally block within which this append lives and any subsequent + * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the + * passed in WALKey walKey parameter. Be warned that the WriteEntry is not + * immediately available on return from this method. It WILL be available subsequent to a sync of + * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. + */ + @Override + public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) + throws IOException; + + protected void doAppend(W writer, FSWALEntry entry) throws IOException { + } + + protected void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) + throws IOException { + } + + protected void doShutdown() throws IOException { + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 6368fb7..0a7ed85 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; * AsyncWriter for protobuf-based WAL. */ @InterfaceAudience.Private -public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter +public class AsyncProtobufLogWriter extends AbstractFSProtobufLogWriter implements AsyncFSWALProvider.AsyncWriter { private static final Logger LOG = LoggerFactory.getLogger(AsyncProtobufLogWriter.class); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java index 13f5d6e..3d42d19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/Compressor.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; - +import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALProvider; @@ -82,7 +82,7 @@ public class Compressor { } boolean compress = ((ReaderBase)in).hasCompression(); conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, !compress); - out = WALFactory.createWALWriter(outFS, output, conf); + out = FSHLogProvider.createWriter(conf, outFS, output, false); WAL.Entry e = null; while ((e = in.next()) != null) out.append(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index baa87a4..93d4f9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; import java.util.List; +import java.util.OptionalLong; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hdfs.DFSOutputStream; @@ -1112,4 +1114,5 @@ public class FSHLog extends AbstractFSWAL { } return new DatanodeInfo[0]; } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 778a9db..6f56f54 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -28,6 +28,7 @@ import java.util.TreeSet; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; @@ -41,15 +42,15 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; /** - * A WAL Entry for {@link AbstractFSWAL} implementation. Immutable. + * A WAL Entry for {@link AbstractWAL} implementation. Immutable. * A subclass of {@link Entry} that carries extra info across the ring buffer such as * region sequence id (we want to use this later, just before we write the WAL to ensure region * edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit * hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on * the assign of the region sequence id. See #stampRegionSequenceId(). */ -@InterfaceAudience.Private -class FSWALEntry extends Entry { +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +public class FSWALEntry extends Entry { // The below data members are denoted 'transient' just to highlight these are not persisted; // they are only in memory and held here while passing over the ring buffer. private final transient long txid; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 5c8e0d2..5e13931 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; * Writer for protobuf-based WAL. */ @InterfaceAudience.Private -public class ProtobufLogWriter extends AbstractProtobufLogWriter +public class ProtobufLogWriter extends AbstractFSProtobufLogWriter implements FSHLogProvider.Writer { private static final Logger LOG = LoggerFactory.getLogger(ProtobufLogWriter.class); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index 13ffac7..2cd9980 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; @@ -38,7 +39,7 @@ public interface WALActionsListener { * @param oldPath the path to the old wal * @param newPath the path to the new wal */ - default void preLogRoll(Path oldPath, Path newPath) throws IOException {} + default void preLogRoll(WALInfo oldPath, WALInfo newPath) throws IOException {} /** * The WAL has been rolled. The oldPath can be null if this is @@ -46,21 +47,21 @@ public interface WALActionsListener { * @param oldPath the path to the old wal * @param newPath the path to the new wal */ - default void postLogRoll(Path oldPath, Path newPath) throws IOException {} + default void postLogRoll(WALInfo oldPath, WALInfo newPath) throws IOException {} /** * The WAL is going to be archived. * @param oldPath the path to the old wal * @param newPath the path to the new wal */ - default void preLogArchive(Path oldPath, Path newPath) throws IOException {} + default void preLogArchive(WALInfo oldPath, WALInfo newPath) throws IOException {} /** * The WAL has been archived. * @param oldPath the path to the old wal * @param newPath the path to the new wal */ - default void postLogArchive(Path oldPath, Path newPath) throws IOException {} + default void postLogArchive(WALInfo oldPath, WALInfo newPath) throws IOException {} /** * A request was made that the WAL be rolled. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java index 40d6d0f..43c0673 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.coprocessor.WALObserver; import org.apache.hadoop.hbase.metrics.MetricRegistry; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -173,7 +174,7 @@ public class WALCoprocessorHost * @param oldPath the path of the current wal that we are replacing * @param newPath the path of the wal we are going to create */ - public void preWALRoll(Path oldPath, Path newPath) throws IOException { + public void preWALRoll(WALInfo oldPath, WALInfo newPath) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { @Override protected void call(WALObserver observer) throws IOException { @@ -187,7 +188,7 @@ public class WALCoprocessorHost * @param oldPath the path of the wal that we replaced * @param newPath the path of the wal we have created and now is the current */ - public void postWALRoll(Path oldPath, Path newPath) throws IOException { + public void postWALRoll(WALInfo oldPath, WALInfo newPath) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { @Override protected void call(WALObserver observer) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WalInfoImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WalInfoImpl.java new file mode 100644 index 0000000..35d1375 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WalInfoImpl.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; + +import org.apache.hadoop.hbase.wal.WALInfo; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class WalInfoImpl implements WALInfo { + + private String name; + + public WalInfoImpl(String name) { + this.name=name; + } + + @Override + public int compareTo(WALInfo o) { + return this.getName().compareTo(o.getName()); + } + + @Override + public String getName() { + return this.name; + } + + @Override + public long getWalStartTime() { + // TODO Implement WALInfo.getWalStartTime + return 0; + } + + @Override + public long getSize() throws IOException { + // TODO Implement WALInfo.getSize + return 0; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) return false; + if (obj instanceof WALInfo) { + WALInfo info = (WALInfo) obj; + if (this.name.equals(info.getName())) return true; + } + return false; + } + + @Override + public int hashCode() { + return this.name.hashCode(); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index f4c37b1..97e5bde 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -55,7 +55,6 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { class Context { private final Configuration localConf; private final Configuration conf; - private final FileSystem fs; private final TableDescriptors tableDescriptors; private final ReplicationPeer replicationPeer; private final String peerId; @@ -67,7 +66,6 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { public Context( final Configuration localConf, final Configuration conf, - final FileSystem fs, final String peerId, final UUID clusterId, final ReplicationPeer replicationPeer, @@ -76,7 +74,6 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { final Abortable abortable) { this.localConf = localConf; this.conf = conf; - this.fs = fs; this.clusterId = clusterId; this.peerId = peerId; this.replicationPeer = replicationPeer; @@ -90,9 +87,6 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { public Configuration getLocalConfiguration() { return localConf; } - public FileSystem getFilesystem() { - return fs; - } public UUID getClusterId() { return clusterId; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java new file mode 100644 index 0000000..9f25084 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/AbstractWALEntryStream.java @@ -0,0 +1,308 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.OptionalLong; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.WALInfo; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Streaming access to WAL entries. This class is given a queue of WAL {@link WALInfo}, and continually + * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it + * dequeues it and starts reading from the next. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class AbstractWALEntryStream implements WALEntryStream { + private static final Logger LOG = LoggerFactory.getLogger(AbstractWALEntryStream.class); + + protected Reader reader; + protected WALInfo currentPath; + // cache of next entry for hasNext() + protected Entry currentEntry; + // position for the current entry. As now we support peek, which means that the upper layer may + // choose to return before reading the current entry, so it is not safe to return the value below + // in getPosition. + protected long currentPositionOfEntry = 0; + // position after reading current entry + protected long currentPositionOfReader = 0; + protected final PriorityBlockingQueue logQueue; + protected final Configuration conf; + protected final WALFileLengthProvider walFileLengthProvider; + // which region server the WALs belong to + protected final ServerName serverName; + protected final MetricsSource metrics; + + protected boolean eofAutoRecovery; + + /** + * Create an entry stream over the given queue at the given start position + * @param logQueue the queue of WAL paths + * @param conf {@link Configuration} to use to create {@link Reader} for this stream + * @param startPosition the position in the first WAL to start reading at + * @param serverName the server name which all WALs belong to + * @param metrics replication metrics + * @throws IOException + */ + public AbstractWALEntryStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + this.logQueue = logQueue; + this.conf = conf; + this.currentPositionOfEntry = startPosition; + this.walFileLengthProvider = walFileLengthProvider; + this.serverName = serverName; + this.metrics = metrics; + this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); + + } + + @Override + public boolean hasNext() throws IOException { + if (currentEntry == null) { + try { + tryAdvanceEntry(); + } catch (IOException e) { + handleIOException(logQueue.peek(), e); + } + } + return currentEntry != null; + } + + @Override + public Entry peek() throws IOException { + return hasNext() ? currentEntry: null; + } + + @Override + public Entry next() throws IOException { + Entry save = peek(); + currentPositionOfEntry = currentPositionOfReader; + currentEntry = null; + return save; + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + closeReader(); + } + + @Override + public long getPosition() { + return currentPositionOfEntry; + } + + @Override + public WALInfo getCurrentWalInfo() { + return currentPath; + } + + + @Override + public void reset() throws IOException { + if (reader != null && currentPath != null) { + resetReader(); + } + } + + protected void setPosition(long position) { + currentPositionOfEntry = position; + } + + private void setCurrentPath(WALInfo path) { + this.currentPath = path; + } + + private void tryAdvanceEntry() throws IOException { + if (checkReader()) { + boolean beingWritten = readNextEntryAndRecordReaderPosition(); + if (currentEntry == null && !beingWritten) { + // no more entries in this log file, and the file is already closed, i.e, rolled + // Before dequeueing, we should always get one more attempt at reading. + // This is in case more entries came in after we opened the reader, and the log is rolled + // while we were reading. See HBASE-6758 + resetReader(); + readNextEntryAndRecordReaderPosition(); + if (currentEntry == null) { + if (checkAllBytesParsed()) { // now we're certain we're done with this log file + dequeueCurrentLog(); + if (openNextLog()) { + readNextEntryAndRecordReaderPosition(); + } + } + } + } + // if currentEntry != null then just return + // if currentEntry == null but the file is still being written, then we should not switch to + // the next log either, just return here and try next time to see if there are more entries in + // the current file + } + // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) + } + + + + private void dequeueCurrentLog() throws IOException { + LOG.debug("Reached the end of log {}", currentPath); + closeReader(); + logQueue.remove(); + setPosition(0); + metrics.decrSizeOfLogQueue(); + } + + /** + * Returns whether the file is opened for writing. + */ + private boolean readNextEntryAndRecordReaderPosition() throws IOException { + Entry readEntry = reader.next(); + long readerPos = reader.getPosition(); + OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); + if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { + // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted + // data, so we need to make sure that we do not read beyond the committed file length. + if (LOG.isDebugEnabled()) { + LOG.debug("The provider tells us the valid length for " + currentPath + " is " + + fileLength.getAsLong() + ", but we have advanced to " + readerPos); + } + resetReader(); + return true; + } + if (readEntry != null) { + metrics.incrLogEditsRead(); + metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); + } + currentEntry = readEntry; // could be null + this.currentPositionOfReader = readerPos; + return fileLength.isPresent(); + } + + private void closeReader() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + } + + // if we don't have a reader, open a reader on the next log + private boolean checkReader() throws IOException { + if (reader == null) { + return openNextLog(); + } + return true; + } + + // open a reader on the next log in queue + private boolean openNextLog() throws IOException { + WALInfo nextPath = logQueue.peek(); + if (nextPath != null) { + openReader(nextPath); + if (reader != null) { + return true; + } + } else { + // no more files in queue, this could happen for recovered queue, or for a wal group of a sync + // replication peer which has already been transited to DA or S. + setCurrentPath(null); + } + return false; + } + + + protected void openReader(WALInfo path) throws IOException { + try { + // Detect if this is a new file, if so get a new reader else + // reset the current reader so that we see the new data + if (reader == null || !getCurrentWalInfo().equals(path)) { + closeReader(); + reader = createReader(path, conf); + seek(); + setCurrentPath(path); + } else { + resetReader(); + } + } catch (RemoteException re) { + IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); + handleIOException (path, ioe); + } catch (IOException ioe) { + handleIOException(path, ioe); + } catch (NullPointerException npe) { + // Workaround for race condition in HDFS-4380 + // which throws a NPE if we open a file before any data node has the most recent block + // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. + LOG.warn("Got NPE opening reader, will retry."); + reader = null; + } + } + + /** + * Creates a reader for a wal info + * + * @param walInfo path for FS based or stream name for stream based wal provider + * @param conf + * @return return a reader for the file + * @throws IOException + */ + protected abstract Reader createReader(WALInfo walInfo, Configuration conf) throws IOException; + + protected void resetReader() throws IOException { + try { + currentEntry = null; + reader.reset(); + seek(); + } catch (NullPointerException npe) { + throw new IOException("NPE resetting reader, likely HDFS-4380", npe); + } catch (IOException e) { + handleIOException(currentPath, e); + } + } + + /** + * Implement for handling IO exceptions , throw back if doesn't need to be handled + * @param walInfo + * @param ioe IOException + * @throws IOException + */ + protected abstract void handleIOException(WALInfo walInfo, IOException e) throws IOException; + + protected void seek() throws IOException { + if (currentPositionOfEntry != 0) { + reader.seek(currentPositionOfEntry); + } + } + + + protected boolean checkAllBytesParsed() throws IOException { + return true; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java new file mode 100644 index 0000000..8ddcde3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSRecoveredReplicationSource.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALInfo; +import org.apache.hadoop.hbase.wal.WALInfo; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class FSRecoveredReplicationSource extends RecoveredReplicationSource { + + private static final Logger LOG = LoggerFactory.getLogger(FSRecoveredReplicationSource.class); + private String logDir; + + @Override + public void init(Configuration conf, ReplicationSourceManager manager, + ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, + String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, + MetricsSource metrics, WALProvider walProvider) throws IOException { + super.init(conf, manager, queueStorage, replicationPeer, server, peerClusterZnode, clusterId, + walFileLengthProvider, metrics, walProvider); + this.logDir = AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString()); + } + + @Override + public void locateRecoveredWALInfos(PriorityBlockingQueue queue) throws IOException { + boolean hasWALInfoChanged = false; + PriorityBlockingQueue newWALInfos = + new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator()); + walinfoLoop: + for (WALInfo walinfo : queue) { + if (walProvider.getWalMetaDataProvider().exists(((FSWALInfo)walinfo).getPath().toString())) { // still in same location, don't need to do anything + newWALInfos.add(walinfo); + continue; + } + // WALInfo changed - try to find the right WALInfo. + hasWALInfoChanged = true; + if (server instanceof ReplicationSyncUp.DummyServer) { + // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data + // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists + WALInfo newWALInfo = getReplSyncUpPath(walinfo); + newWALInfos.add(newWALInfo); + continue; + } else { + // See if Path exists in the dead RS folder (there could be a chain of failures + // to look at) + List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); + LOG.info("NB dead servers : " + deadRegionServers.size()); + final Path walDir = FSUtils.getWALRootDir(conf); + for (ServerName curDeadServerName : deadRegionServers) { + final Path deadRsDirectory = + new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName + .getServerName())); + Path[] locs = new Path[] { new Path(deadRsDirectory, walinfo.getName()), new Path( + deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), walinfo.getName()) }; + for (Path possibleLogLocation : locs) { + LOG.info("Possible location " + possibleLogLocation.toUri().toString()); + if (walProvider.getWalMetaDataProvider().exists(possibleLogLocation.toString())) { + // We found the right new location + LOG.info("Log " + walinfo + " still exists at " + possibleLogLocation); + newWALInfos.add(new FSWALInfo(possibleLogLocation)); + continue walinfoLoop; + } + } + } + // didn't find a new location + LOG.error( + String.format("WAL Path %s doesn't exist and couldn't find its new location", walinfo)); + newWALInfos.add(walinfo); + } + } + + if (hasWALInfoChanged) { + if (newWALInfos.size() != queue.size()) { // this shouldn't happen + LOG.error("Recovery queue size is incorrect"); + throw new IOException("Recovery queue size error"); + } + // put the correct locations in the queue + // since this is a recovered queue with no new incoming logs, + // there shouldn't be any concurrency issues + queue.clear(); + for (WALInfo walinfo : newWALInfos) { + queue.add(walinfo); + } + } + } + + // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal + // area rather than to the wal area for a particular region server. + private WALInfo getReplSyncUpPath(WALInfo path) throws IOException { + WALInfo[] rss = walProvider.getWalMetaDataProvider().list(this.walProvider.createWalInfo(logDir)); + for (WALInfo rs : rss) { + WALInfo[] logs = walProvider.getWalMetaDataProvider().list(rs); + for (WALInfo log : logs) { + WALInfo p = this.walProvider.createWalInfo(new Path(((FSWALInfo)rs).getPath(), log.getName()).toString()); + if (p.getName().equals(path.getName())) { + LOG.info("Log " + p.getName() + " found at " + p); + return p; + } + } + } + LOG.error("Didn't find path for: " + path.getName()); + return path; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java new file mode 100644 index 0000000..e24e14f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/FSWALEntryStream.java @@ -0,0 +1,224 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.concurrent.PriorityBlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.wal.FSWALInfo; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALInfo; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Streaming access to WAL entries. This class is given a queue of WAL {@link WALInfo}, and continually + * iterates through all the WAL {@link Entry} in the queue. When it's done reading from a Path, it + * dequeues it and starts reading from the next. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FSWALEntryStream extends AbstractWALEntryStream { + private static final Logger LOG = LoggerFactory.getLogger(FSWALEntryStream.class); + + private FileSystem fs; + + public FSWALEntryStream(FileSystem fs, PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics); + this.fs = fs; + } + + @Override + // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file + protected boolean checkAllBytesParsed() throws IOException { + // -1 means the wal wasn't closed cleanly. + final long trailerSize = currentTrailerSize(); + FileStatus stat = null; + try { + stat = fs.getFileStatus(((FSWALInfo)this.currentPath).getPath()); + } catch (IOException exception) { + LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", + currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat()); + metrics.incrUnknownFileLengthForClosedWAL(); + } + // Here we use currentPositionOfReader instead of currentPositionOfEntry. + // We only call this method when currentEntry is null so usually they are the same, but there + // are two exceptions. One is we have nothing in the file but only a header, in this way + // the currentPositionOfEntry will always be 0 since we have no change to update it. The other + // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the + // last valid entry, and the currentPositionOfReader will usually point to the end of the file. + if (stat != null) { + if (trailerSize < 0) { + if (currentPositionOfReader < stat.getLen()) { + final long skippedBytes = stat.getLen() - currentPositionOfReader; + LOG.debug( + "Reached the end of WAL file '{}'. It was not closed cleanly," + + " so we did not parse {} bytes of data. This is normally ok.", + currentPath, skippedBytes); + metrics.incrUncleanlyClosedWALs(); + metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); + } + } else if (currentPositionOfReader + trailerSize < stat.getLen()) { + LOG.warn( + "Processing end of WAL file '{}'. At position {}, which is too far away from" + + " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", + currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); + setPosition(0); + resetReader(); + metrics.incrRestartedWALReading(); + metrics.incrRepeatedFileBytes(currentPositionOfReader); + return false; + } + } + if (LOG.isTraceEnabled()) { + LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + + (stat == null ? "N/A" : stat.getLen())); + } + metrics.incrCompletedWAL(); + return true; + } + + private Path getArchivedLog(Path path) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + + // Try found the log in old dir + Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } + + // Try found the log in the seperate old log dir + oldLogDir = new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) + .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); + archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } + + LOG.error("Couldn't locate log: " + path); + return path; + } + + private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { + // If the log was archived, continue reading from there + Path archivedLog = getArchivedLog(path); + if (!path.equals(archivedLog)) { + openReader(new FSWALInfo(archivedLog)); + } else { + throw fnfe; + } + } + + // For HBASE-15019 + private void recoverLease(final Configuration conf, final Path path) { + try { + final FileSystem dfs = FSUtils.getCurrentFileSystem(conf); + FSUtils fsUtils = FSUtils.getInstance(dfs, conf); + fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { + @Override + public boolean progress() { + LOG.debug("recover WAL lease: " + path); + return true; + } + }); + } catch (IOException e) { + LOG.warn("unable to recover lease for WAL: " + path, e); + } + } + + @Override + protected void handleIOException(WALInfo path, IOException e) throws IOException { + try { + throw e; + } catch (FileNotFoundException fnfe) { + handleFileNotFound(((FSWALInfo)path).getPath(), fnfe); + } catch (EOFException eo) { + handleEofException(eo); + } catch (LeaseNotRecoveredException lnre) { + // HBASE-15019 the WAL was not closed due to some hiccup. + LOG.warn("Try to recover the WAL lease " + currentPath, lnre); + recoverLease(conf, ((FSWALInfo)currentPath).getPath()); + reader = null; + } + } + + private long currentTrailerSize() { + long size = -1L; + if (reader instanceof ProtobufLogReader) { + final ProtobufLogReader pblr = (ProtobufLogReader) reader; + size = pblr.trailerSize(); + } + return size; + } + + // if we get an EOF due to a zero-length log, and there are other logs in queue + // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is + // enabled, then dump the log + private void handleEofException(IOException e) { + if ((e instanceof EOFException || e.getCause() instanceof EOFException) && logQueue.size() > 1 + && this.eofAutoRecovery) { + try { + if (fs.getFileStatus(((FSWALInfo)logQueue.peek()).getPath()).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); + logQueue.remove(); + setPosition(0); + } + } catch (IOException ioe) { + LOG.warn("Couldn't get file length information about log " + logQueue.peek()); + } + } + } + + private String getCurrentPathStat() { + StringBuilder sb = new StringBuilder(); + if (currentPath != null) { + sb.append("currently replicating from: ").append(currentPath).append(" at position: ") + .append(currentPositionOfEntry).append("\n"); + } else { + sb.append("no replication ongoing, waiting for new log"); + } + return sb.toString(); + } + + @Override + protected Reader createReader(WALInfo path, Configuration conf) throws IOException { + return WALFactory.createReader(fs, ((FSWALInfo)path).getPath(), conf); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 7db53aa..d0bb50c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -86,7 +86,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; private ClusterConnection conn; - private Configuration localConf; private Configuration conf; // How long should we sleep for each retry private long sleepForRetries; @@ -117,7 +116,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi public void init(Context context) throws IOException { super.init(context); this.conf = HBaseConfiguration.create(ctx.getConfiguration()); - this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration()); decorateConf(); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index f1bb538..c363a5d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -18,130 +18,42 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; -import java.util.List; import java.util.UUID; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WALInfo; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Class that handles the recovered source of a replication stream, which is transfered from * another dead region server. This will be closed when all logs are pushed to peer cluster. */ @InterfaceAudience.Private -public class RecoveredReplicationSource extends ReplicationSource { - - private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class); +public abstract class RecoveredReplicationSource extends ReplicationSource { private String actualPeerId; @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { - super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode, - clusterId, walFileLengthProvider, metrics); + MetricsSource metrics, WALProvider walProvider) throws IOException { + super.init(conf, manager, queueStorage, replicationPeer, server, peerClusterZnode, + clusterId, walFileLengthProvider, metrics, walProvider); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } @Override protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId, - PriorityBlockingQueue queue) { + PriorityBlockingQueue queue) { return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); } - public void locateRecoveredPaths(PriorityBlockingQueue queue) throws IOException { - boolean hasPathChanged = false; - PriorityBlockingQueue newPaths = - new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator()); - pathsLoop: for (Path path : queue) { - if (fs.exists(path)) { // still in same location, don't need to do anything - newPaths.add(path); - continue; - } - // Path changed - try to find the right path. - hasPathChanged = true; - if (server instanceof ReplicationSyncUp.DummyServer) { - // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data - // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists - Path newPath = getReplSyncUpPath(path); - newPaths.add(newPath); - continue; - } else { - // See if Path exists in the dead RS folder (there could be a chain of failures - // to look at) - List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); - LOG.info("NB dead servers : " + deadRegionServers.size()); - final Path walDir = FSUtils.getWALRootDir(conf); - for (ServerName curDeadServerName : deadRegionServers) { - final Path deadRsDirectory = - new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName - .getServerName())); - Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path( - deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) }; - for (Path possibleLogLocation : locs) { - LOG.info("Possible location " + possibleLogLocation.toUri().toString()); - if (manager.getFs().exists(possibleLogLocation)) { - // We found the right new location - LOG.info("Log " + path + " still exists at " + possibleLogLocation); - newPaths.add(possibleLogLocation); - continue pathsLoop; - } - } - } - // didn't find a new location - LOG.error( - String.format("WAL Path %s doesn't exist and couldn't find its new location", path)); - newPaths.add(path); - } - } - - if (hasPathChanged) { - if (newPaths.size() != queue.size()) { // this shouldn't happen - LOG.error("Recovery queue size is incorrect"); - throw new IOException("Recovery queue size error"); - } - // put the correct locations in the queue - // since this is a recovered queue with no new incoming logs, - // there shouldn't be any concurrency issues - queue.clear(); - for (Path path : newPaths) { - queue.add(path); - } - } - } - - // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal - // area rather than to the wal area for a particular region server. - private Path getReplSyncUpPath(Path path) throws IOException { - FileStatus[] rss = fs.listStatus(manager.getLogDir()); - for (FileStatus rs : rss) { - Path p = rs.getPath(); - FileStatus[] logs = fs.listStatus(p); - for (FileStatus log : logs) { - p = new Path(p, log.getPath().getName()); - if (p.getName().equals(path.getName())) { - LOG.info("Log " + p.getName() + " found at " + p); - return p; - } - } - } - LOG.error("Didn't find path for: " + path.getName()); - return path; - } - void tryFinish() { if (workerThreads.isEmpty()) { this.getSourceMetrics().clear(); @@ -163,4 +75,13 @@ public class RecoveredReplicationSource extends ReplicationSource { public boolean isRecovered() { return true; } + + + /** + * Get the updated queue of the wals if the wals are moved to another location. + * @param queue Updated queue with the new WalInfo(paths or stream) if wals are archived + * @throws IOException + */ + public abstract void locateRecoveredWALInfos(PriorityBlockingQueue queue) + throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index b0d4db0..061b52f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +40,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper private final ReplicationQueueStorage replicationQueues; public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, - PriorityBlockingQueue queue, RecoveredReplicationSource source, + PriorityBlockingQueue queue, RecoveredReplicationSource source, ReplicationQueueStorage queueStorage) { super(conf, walGroupId, queue, source); this.source = source; @@ -58,7 +58,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper int numRetries = 0; while (numRetries <= maxRetriesMultiplier) { try { - source.locateRecoveredPaths(queue); + source.locateRecoveredWALInfos(queue); break; } catch (IOException e) { LOG.error("Error while locating recovered queue paths, attempt #" + numRetries); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index b04f0cb..5598595 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -26,7 +26,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; @@ -85,7 +84,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer } @Override - public void initialize(Server server, FileSystem fs, Path logDir, Path oldLogDir, + public void initialize(Server server, WALProvider walProvider) throws IOException { this.server = server; this.conf = this.server.getConfiguration(); @@ -124,9 +123,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer } SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager(); this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, - replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, + replicationTracker, conf, this.server,clusterId, walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(), - mapping); + mapping, walProvider); this.syncReplicationPeerInfoProvider = new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping); PeerActionListener peerActionListener = PeerActionListener.DUMMY; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 10fa50f..62f29e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getArchivedLogPath; - -import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; @@ -37,7 +34,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -62,6 +58,8 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALInfo; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +84,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); // Queues of logs to process, entry in format of walGroupId->queue, // each presents a queue for one wal group - private Map> queues = new HashMap<>(); + private Map> queues = new HashMap<>(); // per group queue size, keep no more than this number of logs in each wal group protected int queueSizePerGroup; protected ReplicationQueueStorage queueStorage; @@ -101,7 +99,6 @@ public class ReplicationSource implements ReplicationSourceInterface { protected Server server; // How long should we sleep for each retry private long sleepForRetries; - protected FileSystem fs; // id of this cluster private UUID clusterId; // total number of edits we replicated @@ -137,6 +134,7 @@ public class ReplicationSource implements ReplicationSourceInterface { private int waitOnEndpointSeconds = -1; private Thread initThread; + protected WALProvider walProvider; /** * Instantiation method used by region servers @@ -149,10 +147,10 @@ public class ReplicationSource implements ReplicationSourceInterface { * @param metrics metrics for replication source */ @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException { + MetricsSource metrics, WALProvider walProvider) throws IOException { this.server = server; this.conf = HBaseConfiguration.create(conf); this.waitOnEndpointSeconds = @@ -166,7 +164,6 @@ public class ReplicationSource implements ReplicationSourceInterface { this.queueStorage = queueStorage; this.replicationPeer = replicationPeer; this.manager = manager; - this.fs = fs; this.metrics = metrics; this.clusterId = clusterId; @@ -179,6 +176,7 @@ public class ReplicationSource implements ReplicationSourceInterface { this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0); this.totalBufferUsed = manager.getTotalBufferUsed(); this.walFileLengthProvider = walFileLengthProvider; + this.walProvider = walProvider; LOG.info("queueId={}, ReplicationSource : {}, currentBandwidth={}", queueId, replicationPeer.getId(), this.currentBandwidth); } @@ -191,9 +189,9 @@ public class ReplicationSource implements ReplicationSourceInterface { } @Override - public void enqueueLog(Path log) { + public void enqueueLog(WALInfo log) { String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log.getName()); - PriorityBlockingQueue queue = queues.get(logPrefix); + PriorityBlockingQueue queue = queues.get(logPrefix); if (queue == null) { queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise @@ -282,7 +280,7 @@ public class ReplicationSource implements ReplicationSourceInterface { tableDescriptors = ((HRegionServer) server).getTableDescriptors(); } replicationEndpoint - .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, + .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server)); replicationEndpoint.start(); replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); @@ -300,7 +298,7 @@ public class ReplicationSource implements ReplicationSourceInterface { this.walEntryFilter = new ChainWALEntryFilter(filters); } - private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { + private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) { ReplicationSourceShipper worker = createNewShipper(walGroupId, queue); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); if (extant != null) { @@ -328,13 +326,14 @@ public class ReplicationSource implements ReplicationSourceInterface { int queueSize = queues.get(walGroupId).size(); replicationDelay = ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize); - Path currentPath = shipper.getCurrentPath(); + WALInfo currentPath = shipper.getCurrentWALInfo(); try { - fileSize = getFileSize(currentPath); + fileSize = currentPath.getSize(); } catch (IOException e) { LOG.warn("Ignore the exception as the file size of HLog only affects the web ui", e); fileSize = -1; } + fileSize = -1; ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder(); statusBuilder.withPeerId(this.getPeerId()) .withQueueSize(queueSize) @@ -349,32 +348,21 @@ public class ReplicationSource implements ReplicationSourceInterface { return sourceReplicationStatus; } - private long getFileSize(Path currentPath) throws IOException { - long fileSize; - try { - fileSize = fs.getContentSummary(currentPath).getLength(); - } catch (FileNotFoundException e) { - currentPath = getArchivedLogPath(currentPath, conf); - fileSize = fs.getContentSummary(currentPath).getLength(); - } - return fileSize; - } - protected ReplicationSourceShipper createNewShipper(String walGroupId, - PriorityBlockingQueue queue) { + PriorityBlockingQueue queue) { return new ReplicationSourceShipper(conf, walGroupId, queue, this); } private ReplicationSourceWALReader createNewWALReader(String walGroupId, - PriorityBlockingQueue queue, long startPosition) { + PriorityBlockingQueue queue, long startPosition) { return replicationPeer.getPeerConfig().isSerial() - ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this) - : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); + ? new SerialReplicationSourceWALReader(conf, queue, startPosition, walEntryFilter, this) + : new ReplicationSourceWALReader(conf, queue, startPosition, walEntryFilter, this); } protected final void uncaughtException(Thread t, Throwable e) { RSRpcServices.exitIfOOME(e); - LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e); + LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentWALInfo(), e); server.abort("Unexpected exception in " + t.getName(), e); } @@ -497,9 +485,9 @@ public class ReplicationSource implements ReplicationSourceInterface { initializeWALEntryFilter(peerClusterId); // start workers - for (Map.Entry> entry : queues.entrySet()) { + for (Map.Entry> entry : queues.entrySet()) { String walGroupId = entry.getKey(); - PriorityBlockingQueue queue = entry.getValue(); + PriorityBlockingQueue queue = entry.getValue(); tryStartNewShipper(walGroupId, queue); } } @@ -593,11 +581,11 @@ public class ReplicationSource implements ReplicationSourceInterface { } @Override - public Path getCurrentPath() { + public WALInfo getCurrentWALInfo() { // only for testing for (ReplicationSourceShipper worker : workerThreads.values()) { - if (worker.getCurrentPath() != null) { - return worker.getCurrentPath(); + if (worker.getCurrentWALInfo() != null) { + return worker.getCurrentWALInfo(); } } return null; @@ -611,10 +599,10 @@ public class ReplicationSource implements ReplicationSourceInterface { /** * Comparator used to compare logs together based on their start time */ - public static class LogsComparator implements Comparator { + public static class LogsComparator implements Comparator { @Override - public int compare(Path o1, Path o2) { + public int compare(WALInfo o1, WALInfo o2) { return Long.compare(getTS(o1), getTS(o2)); } @@ -628,8 +616,8 @@ public class ReplicationSource implements ReplicationSourceInterface { * @param p path to split * @return start time */ - private static long getTS(Path p) { - return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName()); + private static long getTS(WALInfo p) { + return p.getWalStartTime(); } } @@ -642,7 +630,7 @@ public class ReplicationSource implements ReplicationSourceInterface { String walGroupId = entry.getKey(); ReplicationSourceShipper worker = entry.getValue(); long position = worker.getCurrentPosition(); - Path currentPath = worker.getCurrentPath(); + WALInfo currentPath = worker.getCurrentWALInfo(); sb.append("walGroup [").append(walGroupId).append("]: "); if (currentPath != null) { sb.append("currently replicating from: ").append(currentPath).append(" at position: ") @@ -695,4 +683,8 @@ public class ReplicationSource implements ReplicationSourceInterface { void removeWorker(ReplicationSourceShipper worker) { workerThreads.remove(worker.walGroupId, worker); } + + public WALProvider getWalProvider() { + return walProvider; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java index d613049..ff804d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java @@ -23,6 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.wal.WALProvider; /** * Constructs a {@link ReplicationSourceInterface} @@ -32,13 +33,13 @@ public class ReplicationSourceFactory { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class); - static ReplicationSourceInterface create(Configuration conf, String queueId) { + static ReplicationSourceInterface create(Configuration conf, String queueId, WALProvider walProvider) { ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId); boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered(); ReplicationSourceInterface src; try { String defaultReplicationSourceImpl = - isQueueRecovered ? RecoveredReplicationSource.class.getCanonicalName() + isQueueRecovered ? walProvider.getRecoveredReplicationSource().getClass().getCanonicalName() : ReplicationSource.class.getCanonicalName(); Class c = Class.forName( conf.get("replication.replicationsource.implementation", defaultReplicationSourceImpl)); @@ -47,7 +48,7 @@ public class ReplicationSourceFactory { LOG.warn("Passed replication source implementation throws errors, " + "defaulting to ReplicationSource", e); - src = isQueueRecovered ? new RecoveredReplicationSource() : new ReplicationSource(); + src = isQueueRecovered ? walProvider.getRecoveredReplicationSource() : new ReplicationSource(); } return src; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index df7a8cc..b386b26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -23,9 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; @@ -36,6 +34,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALInfo; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; /** @@ -50,17 +50,18 @@ public interface ReplicationSourceInterface { * @param fs the file system to use * @param manager the manager to use * @param server the server for this region server + * @param walProvider */ - void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, String queueId, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - MetricsSource metrics) throws IOException; + MetricsSource metrics, WALProvider walProvider) throws IOException; /** * Add a log to the list of logs to replicate * @param log path to the log to replicate */ - void enqueueLog(Path log); + void enqueueLog(WALInfo log); /** * Add hfile names to the queue to be replicated. @@ -95,7 +96,7 @@ public interface ReplicationSourceInterface { * Get the current log that's replicated * @return the current log */ - Path getCurrentPath(); + WALInfo getCurrentWALInfo(); /** * Get the queue id that the source is replicating to diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 428ec98..5980014 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -61,9 +61,12 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; +import org.apache.hadoop.hbase.wal.WALInfo; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -145,13 +148,8 @@ public class ReplicationSourceManager implements ReplicationListener { private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager; private final Configuration conf; - private final FileSystem fs; // The paths to the latest log of each wal group, for new coming peers - private final Set latestPaths; - // Path to the wals directories - private final Path logDir; - // Path to the wal archive - private final Path oldLogDir; + private final Set latestPaths; private final WALFileLengthProvider walFileLengthProvider; // The number of ms that we wait before moving znodes, HBASE-3596 private final long sleepBeforeFailover; @@ -168,6 +166,8 @@ public class ReplicationSourceManager implements ReplicationListener { // Maximum number of retries before taking bold actions when deleting remote wal files for sync // replication peer. private final int maxRetriesMultiplier; + private final WALProvider walProvider; + private final ServerName serverName; /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -180,12 +180,13 @@ public class ReplicationSourceManager implements ReplicationListener { * @param logDir the directory that contains all wal directories of live RSs * @param oldLogDir the directory where old logs are archived * @param clusterId + * @param walProvider */ public ReplicationSourceManager(ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, - Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, + Server server, UUID clusterId, WALFileLengthProvider walFileLengthProvider, - SyncReplicationPeerMappingManager syncReplicationPeerMappingManager) throws IOException { + SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, WALProvider walProvider) throws IOException { this.sources = new ConcurrentHashMap<>(); this.queueStorage = queueStorage; this.replicationPeers = replicationPeers; @@ -195,9 +196,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.walsByIdRecoveredQueues = new ConcurrentHashMap<>(); this.oldsources = new ArrayList<>(); this.conf = conf; - this.fs = fs; - this.logDir = logDir; - this.oldLogDir = oldLogDir; + this.walProvider = walProvider; // 30 seconds this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000); this.clusterId = clusterId; @@ -211,16 +210,18 @@ public class ReplicationSourceManager implements ReplicationListener { // even if we fail, other region servers can take care of it this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + this.serverName = this.server.getServerName(); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setNameFormat("ReplicationExecutor-%d"); tfb.setDaemon(true); this.executor.setThreadFactory(tfb.build()); - this.latestPaths = new HashSet(); + this.latestPaths = new HashSet(); this.replicationForBulkLoadDataEnabled = conf.getBoolean( HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000); this.maxRetriesMultiplier = this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60); + } /** @@ -344,12 +345,12 @@ public class ReplicationSourceManager implements ReplicationListener { */ private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer) throws IOException { - ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); + ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId, walProvider); MetricsSource metrics = new MetricsSource(queueId); // init replication source - src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId, - walFileLengthProvider, metrics); + src.init(conf, this, queueStorage, replicationPeer, server, queueId, clusterId, + walFileLengthProvider, metrics, walProvider); return src; } @@ -371,7 +372,7 @@ public class ReplicationSourceManager implements ReplicationListener { this.walsById.put(peerId, walsByGroup); // Add the latest wal to that source's queue if (this.latestPaths.size() > 0) { - for (Path logPath : latestPaths) { + for (WALInfo logPath : latestPaths) { String name = logPath.getName(); String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name); NavigableSet logs = new TreeSet<>(); @@ -484,7 +485,8 @@ public class ReplicationSourceManager implements ReplicationListener { toRemove.terminate(terminateMessage); } for (NavigableSet walsByGroup : walsById.get(peerId).values()) { - walsByGroup.forEach(wal -> src.enqueueLog(new Path(this.logDir, wal))); + walsByGroup.forEach(wal -> src + .enqueueLog(((SyncReplicationWALProvider)this.walProvider).getFullPath(serverName, wal))); } } LOG.info("Startup replication source for " + src.getPeerId()); @@ -505,7 +507,7 @@ public class ReplicationSourceManager implements ReplicationListener { ReplicationSourceInterface replicationSource = createSource(queueId, peer); this.oldsources.add(replicationSource); for (SortedSet walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) { - walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal))); + walsByGroup.forEach(wal -> src.enqueueLog(this.walProvider.createWalInfo(wal))); } toStartup.add(replicationSource); } @@ -672,6 +674,8 @@ public class ReplicationSourceManager implements ReplicationListener { private void removeRemoteWALs(String peerId, String remoteWALDir, Collection wals) throws IOException { Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); + //Currently Sync replication is only supported on FS based WALProvider + //TODO: Abstract FileSystem once all APIs for Sync replication is conciled for remote calls. FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); for (String wal : wals) { Path walFile = new Path(remoteWALDirForPeer, wal); @@ -735,7 +739,7 @@ public class ReplicationSourceManager implements ReplicationListener { // public because of we call it in TestReplicationEmptyWALRecovery @VisibleForTesting - public void preLogRoll(Path newLog) throws IOException { + public void preLogRoll(WALInfo newLog) throws IOException { String logName = newLog.getName(); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName); // synchronized on latestPaths to avoid the new open source miss the new log @@ -779,9 +783,9 @@ public class ReplicationSourceManager implements ReplicationListener { } // Add to latestPaths - Iterator iterator = latestPaths.iterator(); + Iterator iterator = latestPaths.iterator(); while (iterator.hasNext()) { - Path path = iterator.next(); + WALInfo path = iterator.next(); if (path.getName().contains(logPrefix)) { iterator.remove(); break; @@ -793,7 +797,7 @@ public class ReplicationSourceManager implements ReplicationListener { // public because of we call it in TestReplicationEmptyWALRecovery @VisibleForTesting - public void postLogRoll(Path newLog) throws IOException { + public void postLogRoll(WALInfo newLog) throws IOException { // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources.values()) { source.enqueueLog(newLog); @@ -962,7 +966,11 @@ public class ReplicationSourceManager implements ReplicationListener { } oldsources.add(src); for (String wal : walsSet) { - src.enqueueLog(new Path(oldLogDir, wal)); + WALInfo archivedWal = ((SyncReplicationWALProvider)walProvider) + .getWalFromArchivePath(wal); + if (archivedWal != null) { + src.enqueueLog(archivedWal); + } } src.startup(); } @@ -1051,30 +1059,6 @@ public class ReplicationSourceManager implements ReplicationListener { } /** - * Get the directory where wals are archived - * @return the directory where wals are archived - */ - public Path getOldLogDir() { - return this.oldLogDir; - } - - /** - * Get the directory where wals are stored by their RSs - * @return the directory where wals are stored by their RSs - */ - public Path getLogDir() { - return this.logDir; - } - - /** - * Get the handle on the local file system - * @return Handle on the local file system - */ - public FileSystem getFs() { - return this.fs; - } - - /** * Get the ReplicationPeers used by this ReplicationSourceManager * @return the ReplicationPeers used by this ReplicationSourceManager */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 5d6198e..389f599 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; @@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,14 +56,14 @@ public class ReplicationSourceShipper extends Thread { private final Configuration conf; protected final String walGroupId; - protected final PriorityBlockingQueue queue; + protected final PriorityBlockingQueue queue; private final ReplicationSource source; // Last position in the log that we sent to ZooKeeper // It will be accessed by the stats thread so make it volatile private volatile long currentPosition = -1; // Path of the current log - private Path currentPath; + private WALInfo currentPath; // Current state of the worker thread private volatile WorkerState state; protected ReplicationSourceWALReader entryReader; @@ -76,7 +76,7 @@ public class ReplicationSourceShipper extends Thread { private final int getEntriesTimeout; public ReplicationSourceShipper(Configuration conf, String walGroupId, - PriorityBlockingQueue queue, ReplicationSource source) { + PriorityBlockingQueue queue, ReplicationSource source) { this.conf = conf; this.walGroupId = walGroupId; this.queue = queue; @@ -293,7 +293,7 @@ public class ReplicationSourceShipper extends Thread { name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(), handler); } - Path getCurrentPath() { + WALInfo getCurrentWALInfo() { return entryReader.getCurrentPath(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java index 27b25c4..8fe41ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java @@ -19,11 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; @@ -46,12 +46,12 @@ class ReplicationSourceWALActionListener implements WALActionsListener { } @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { + public void preLogRoll(WALInfo oldPath, WALInfo newPath) throws IOException { manager.preLogRoll(newPath); } @Override - public void postLogRoll(Path oldPath, Path newPath) throws IOException { + public void postLogRoll(WALInfo oldPath, WALInfo newPath) throws IOException { manager.postLogRoll(newPath); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index b3bdb02..227e15a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import java.io.EOFException; import java.io.IOException; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -27,8 +26,6 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; @@ -37,6 +34,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -55,8 +53,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript class ReplicationSourceWALReader extends Thread { private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class); - private final PriorityBlockingQueue logQueue; - private final FileSystem fs; + private final PriorityBlockingQueue logQueue; private final Configuration conf; private final WALEntryFilter filter; private final ReplicationSource source; @@ -70,7 +67,6 @@ class ReplicationSourceWALReader extends Thread { private long currentPosition; private final long sleepForRetries; private final int maxRetriesMultiplier; - private final boolean eofAutoRecovery; //Indicates whether this particular worker is running private boolean isReaderRunning = true; @@ -88,12 +84,11 @@ class ReplicationSourceWALReader extends Thread { * @param filter The filter to use while reading * @param source replication source */ - public ReplicationSourceWALReader(FileSystem fs, Configuration conf, - PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, + public ReplicationSourceWALReader(Configuration conf, + PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, ReplicationSource source) { this.logQueue = logQueue; this.currentPosition = startPosition; - this.fs = fs; this.conf = conf; this.filter = filter; this.source = source; @@ -110,7 +105,6 @@ class ReplicationSourceWALReader extends Thread { this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per - this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : " + source.getPeerId() @@ -123,10 +117,9 @@ class ReplicationSourceWALReader extends Thread { public void run() { int sleepMultiplier = 1; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream - try (WALEntryStream entryStream = - new WALEntryStream(logQueue, fs, conf, currentPosition, - source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), - source.getSourceMetrics())) { + try (WALEntryStream entryStream = this.source.getWalProvider().getWalStream(logQueue, conf, + currentPosition, source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), + source.getSourceMetrics())) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!source.isPeerEnabled()) { Threads.sleep(sleepForRetries); @@ -152,9 +145,6 @@ class ReplicationSourceWALReader extends Thread { if (sleepMultiplier < maxRetriesMultiplier) { LOG.debug("Failed to read stream of replication entries: " + e); sleepMultiplier++; - } else { - LOG.error("Failed to read stream of replication entries", e); - handleEofException(e); } Threads.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { @@ -181,14 +171,14 @@ class ReplicationSourceWALReader extends Thread { batch.getNbEntries() >= replicationBatchCountCapacity; } - protected static final boolean switched(WALEntryStream entryStream, Path path) { - Path newPath = entryStream.getCurrentPath(); + protected static final boolean switched(WALEntryStream entryStream, WALInfo path) { + WALInfo newPath = entryStream.getCurrentWalInfo(); return newPath == null || !path.getName().equals(newPath.getName()); } protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { - Path currentPath = entryStream.getCurrentPath(); + WALInfo currentPath = entryStream.getCurrentWalInfo(); if (!entryStream.hasNext()) { // check whether we have switched a file if (currentPath != null && switched(entryStream, currentPath)) { @@ -203,7 +193,7 @@ class ReplicationSourceWALReader extends Thread { } } else { // when reading from the entry stream first time we will enter here - currentPath = entryStream.getCurrentPath(); + currentPath = entryStream.getCurrentWalInfo(); } WALEntryBatch batch = createBatch(entryStream); for (;;) { @@ -241,25 +231,8 @@ class ReplicationSourceWALReader extends Thread { } } - // if we get an EOF due to a zero-length log, and there are other logs in queue - // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is - // enabled, then dump the log - private void handleEofException(IOException e) { - if ((e instanceof EOFException || e.getCause() instanceof EOFException) && - logQueue.size() > 1 && this.eofAutoRecovery) { - try { - if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { - LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); - logQueue.remove(); - currentPosition = 0; - } - } catch (IOException ioe) { - LOG.warn("Couldn't get file length information about log " + logQueue.peek()); - } - } - } - public Path getCurrentPath() { + public WALInfo getCurrentPath() { // if we've read some WAL entries, get the Path we read from WALEntryBatch batchQueueHead = entryBatchQueue.peek(); if (batchQueueHead != null) { @@ -280,7 +253,7 @@ class ReplicationSourceWALReader extends Thread { } protected final WALEntryBatch createBatch(WALEntryStream entryStream) { - return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); + return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentWalInfo()); } protected final Entry filterEntry(Entry entry) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java index 10d6cd5..e57628c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatus.java @@ -17,14 +17,14 @@ */ package org.apache.hadoop.hbase.replication.regionserver; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public final class ReplicationStatus { private final String peerId; private final String walGroup; - private final Path currentPath; + private final WALInfo currentPath; private final int queueSize; private final long ageOfLastShippedOp; private final long replicationDelay; @@ -70,7 +70,7 @@ public final class ReplicationStatus { return replicationDelay; } - public Path getCurrentPath() { + public WALInfo getCurrentPath() { return currentPath; } @@ -81,7 +81,7 @@ public final class ReplicationStatus { public static class ReplicationStatusBuilder { private String peerId = "UNKNOWN"; private String walGroup = "UNKNOWN"; - private Path currentPath = new Path("UNKNOWN"); + private WALInfo currentPath = WALInfo.UNKNOWN; private int queueSize = -1; private long ageOfLastShippedOp = -1; private long replicationDelay = -1; @@ -103,7 +103,7 @@ public final class ReplicationStatus { return this; } - public ReplicationStatusBuilder withCurrentPath(Path currentPath) { + public ReplicationStatusBuilder withCurrentPath(WALInfo currentPath) { this.currentPath = currentPath; return this; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index 62068fd..9338907 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -21,17 +21,15 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.util.Tool; @@ -76,14 +74,12 @@ public class ReplicationSyncUp extends Configured implements Tool { Configuration conf = getConf(); try (ZKWatcher zkw = new ZKWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable, true)) { - Path walRootDir = FSUtils.getWALRootDir(conf); - FileSystem fs = FSUtils.getWALFileSystem(conf); - Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); - Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); - System.out.println("Start Replication Server start"); + DummyServer dummyServer = new DummyServer(zkw); + WALFactory factory = + new WALFactory(conf, dummyServer.getServerName().toString()); Replication replication = new Replication(); - replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null); + replication.initialize(dummyServer, factory.getWALProvider()); ReplicationSourceManager manager = replication.getReplicationManager(); manager.init().get(); while (manager.activeFailoverTaskCount() > 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index 9edcc8a..e35d2a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -20,12 +20,11 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.yetus.audience.InterfaceAudience; /** @@ -43,17 +42,17 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader private final SerialReplicationChecker checker; - public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf, - PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, + public SerialReplicationSourceWALReader(Configuration conf, + PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, ReplicationSource source) { - super(fs, conf, logQueue, startPosition, filter, source); + super(conf, logQueue, startPosition, filter, source); checker = new SerialReplicationChecker(conf, source); } @Override protected WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException, InterruptedException { - Path currentPath = entryStream.getCurrentPath(); + WALInfo currentPath = entryStream.getCurrentWalInfo(); if (!entryStream.hasNext()) { // check whether we have switched a file if (currentPath != null && switched(entryStream, currentPath)) { @@ -68,7 +67,7 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader } } else { // when reading from the entry stream first time we will enter here - currentPath = entryStream.getCurrentPath(); + currentPath = entryStream.getCurrentWalInfo(); } long positionBefore = entryStream.getPosition(); WALEntryBatch batch = createBatch(entryStream); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java index 22b2de7..5fe688a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -21,8 +21,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.yetus.audience.InterfaceAudience; /** @@ -36,7 +36,7 @@ class WALEntryBatch { private List walEntries; // last WAL that was read - private Path lastWalPath; + private WALInfo lastWalPath; // position in WAL of last entry in this batch private long lastWalPosition = 0; // number of distinct row keys in this batch @@ -53,13 +53,13 @@ class WALEntryBatch { /** * @param lastWalPath Path of the WAL the last entry in this batch was read from */ - WALEntryBatch(int maxNbEntries, Path lastWalPath) { + WALEntryBatch(int maxNbEntries, WALInfo lastWalPath) { this.walEntries = new ArrayList<>(maxNbEntries); this.lastWalPath = lastWalPath; } - static WALEntryBatch endOfFile(Path lastWalPath) { + static WALEntryBatch endOfFile(WALInfo lastWalPath) { WALEntryBatch batch = new WALEntryBatch(0, lastWalPath); batch.setLastWalPosition(-1L); batch.setEndOfFile(true); @@ -80,7 +80,7 @@ class WALEntryBatch { /** * @return the path of the last WAL that was read. */ - public Path getLastWalPath() { + public WALInfo getLastWalPath() { return lastWalPath; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 0393af4..463679b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -1,46 +1,11 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.hadoop.hbase.replication.regionserver; import java.io.Closeable; -import java.io.FileNotFoundException; import java.io.IOException; -import java.util.OptionalLong; -import java.util.concurrent.PriorityBlockingQueue; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; -import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WAL.Reader; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually @@ -48,383 +13,44 @@ import org.slf4j.LoggerFactory; * dequeues it and starts reading from the next. */ @InterfaceAudience.Private -@InterfaceStability.Evolving -class WALEntryStream implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class); - - private Reader reader; - private Path currentPath; - // cache of next entry for hasNext() - private Entry currentEntry; - // position for the current entry. As now we support peek, which means that the upper layer may - // choose to return before reading the current entry, so it is not safe to return the value below - // in getPosition. - private long currentPositionOfEntry = 0; - // position after reading current entry - private long currentPositionOfReader = 0; - private final PriorityBlockingQueue logQueue; - private final FileSystem fs; - private final Configuration conf; - private final WALFileLengthProvider walFileLengthProvider; - // which region server the WALs belong to - private final ServerName serverName; - private final MetricsSource metrics; - - /** - * Create an entry stream over the given queue at the given start position - * @param logQueue the queue of WAL paths - * @param fs {@link FileSystem} to use to create {@link Reader} for this stream - * @param conf {@link Configuration} to use to create {@link Reader} for this stream - * @param startPosition the position in the first WAL to start reading at - * @param serverName the server name which all WALs belong to - * @param metrics replication metrics - * @throws IOException - */ - public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs, Configuration conf, - long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, - MetricsSource metrics) throws IOException { - this.logQueue = logQueue; - this.fs = fs; - this.conf = conf; - this.currentPositionOfEntry = startPosition; - this.walFileLengthProvider = walFileLengthProvider; - this.serverName = serverName; - this.metrics = metrics; - } + @InterfaceStability.Evolving +public interface WALEntryStream extends Closeable { /** * @return true if there is another WAL {@link Entry} */ - public boolean hasNext() throws IOException { - if (currentEntry == null) { - tryAdvanceEntry(); - } - return currentEntry != null; - } + public boolean hasNext() throws IOException; /** * Returns the next WAL entry in this stream but does not advance. */ - public Entry peek() throws IOException { - return hasNext() ? currentEntry: null; - } + public Entry peek() throws IOException; /** * Returns the next WAL entry in this stream and advance the stream. */ - public Entry next() throws IOException { - Entry save = peek(); - currentPositionOfEntry = currentPositionOfReader; - currentEntry = null; - return save; - } + public Entry next() throws IOException; /** * {@inheritDoc} */ @Override - public void close() throws IOException { - closeReader(); - } + public void close() throws IOException; /** * @return the position of the last Entry returned by next() */ - public long getPosition() { - return currentPositionOfEntry; - } + public long getPosition(); /** - * @return the {@link Path} of the current WAL + * @return the {@link WALInfo} of the current WAL */ - public Path getCurrentPath() { - return currentPath; - } - - private String getCurrentPathStat() { - StringBuilder sb = new StringBuilder(); - if (currentPath != null) { - sb.append("currently replicating from: ").append(currentPath).append(" at position: ") - .append(currentPositionOfEntry).append("\n"); - } else { - sb.append("no replication ongoing, waiting for new log"); - } - return sb.toString(); - } + public WALInfo getCurrentWalInfo(); /** * Should be called if the stream is to be reused (i.e. used again after hasNext() has returned * false) */ - public void reset() throws IOException { - if (reader != null && currentPath != null) { - resetReader(); - } - } - - private void setPosition(long position) { - currentPositionOfEntry = position; - } - - private void setCurrentPath(Path path) { - this.currentPath = path; - } - - private void tryAdvanceEntry() throws IOException { - if (checkReader()) { - boolean beingWritten = readNextEntryAndRecordReaderPosition(); - if (currentEntry == null && !beingWritten) { - // no more entries in this log file, and the file is already closed, i.e, rolled - // Before dequeueing, we should always get one more attempt at reading. - // This is in case more entries came in after we opened the reader, and the log is rolled - // while we were reading. See HBASE-6758 - resetReader(); - readNextEntryAndRecordReaderPosition(); - if (currentEntry == null) { - if (checkAllBytesParsed()) { // now we're certain we're done with this log file - dequeueCurrentLog(); - if (openNextLog()) { - readNextEntryAndRecordReaderPosition(); - } - } - } - } - // if currentEntry != null then just return - // if currentEntry == null but the file is still being written, then we should not switch to - // the next log either, just return here and try next time to see if there are more entries in - // the current file - } - // do nothing if we don't have a WAL Reader (e.g. if there's no logs in queue) - } - - // HBASE-15984 check to see we have in fact parsed all data in a cleanly closed file - private boolean checkAllBytesParsed() throws IOException { - // -1 means the wal wasn't closed cleanly. - final long trailerSize = currentTrailerSize(); - FileStatus stat = null; - try { - stat = fs.getFileStatus(this.currentPath); - } catch (IOException exception) { - LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", - currentPath, trailerSize < 0 ? "was not" : "was", getCurrentPathStat()); - metrics.incrUnknownFileLengthForClosedWAL(); - } - // Here we use currentPositionOfReader instead of currentPositionOfEntry. - // We only call this method when currentEntry is null so usually they are the same, but there - // are two exceptions. One is we have nothing in the file but only a header, in this way - // the currentPositionOfEntry will always be 0 since we have no change to update it. The other - // is that we reach the end of file, then currentPositionOfEntry will point to the tail of the - // last valid entry, and the currentPositionOfReader will usually point to the end of the file. - if (stat != null) { - if (trailerSize < 0) { - if (currentPositionOfReader < stat.getLen()) { - final long skippedBytes = stat.getLen() - currentPositionOfReader; - LOG.debug( - "Reached the end of WAL file '{}'. It was not closed cleanly," + - " so we did not parse {} bytes of data. This is normally ok.", - currentPath, skippedBytes); - metrics.incrUncleanlyClosedWALs(); - metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes); - } - } else if (currentPositionOfReader + trailerSize < stat.getLen()) { - LOG.warn( - "Processing end of WAL file '{}'. At position {}, which is too far away from" + - " reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", - currentPath, currentPositionOfReader, stat.getLen(), getCurrentPathStat()); - setPosition(0); - resetReader(); - metrics.incrRestartedWALReading(); - metrics.incrRepeatedFileBytes(currentPositionOfReader); - return false; - } - } - if (LOG.isTraceEnabled()) { - LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + - (stat == null ? "N/A" : stat.getLen())); - } - metrics.incrCompletedWAL(); - return true; - } - - private void dequeueCurrentLog() throws IOException { - LOG.debug("Reached the end of log {}", currentPath); - closeReader(); - logQueue.remove(); - setPosition(0); - metrics.decrSizeOfLogQueue(); - } - - /** - * Returns whether the file is opened for writing. - */ - private boolean readNextEntryAndRecordReaderPosition() throws IOException { - Entry readEntry = reader.next(); - long readerPos = reader.getPosition(); - OptionalLong fileLength = walFileLengthProvider.getLogFileSizeIfBeingWritten(currentPath); - if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) { - // see HBASE-14004, for AsyncFSWAL which uses fan-out, it is possible that we read uncommitted - // data, so we need to make sure that we do not read beyond the committed file length. - if (LOG.isDebugEnabled()) { - LOG.debug("The provider tells us the valid length for " + currentPath + " is " + - fileLength.getAsLong() + ", but we have advanced to " + readerPos); - } - resetReader(); - return true; - } - if (readEntry != null) { - metrics.incrLogEditsRead(); - metrics.incrLogReadInBytes(readerPos - currentPositionOfEntry); - } - currentEntry = readEntry; // could be null - this.currentPositionOfReader = readerPos; - return fileLength.isPresent(); - } - - private void closeReader() throws IOException { - if (reader != null) { - reader.close(); - reader = null; - } - } - - // if we don't have a reader, open a reader on the next log - private boolean checkReader() throws IOException { - if (reader == null) { - return openNextLog(); - } - return true; - } - - // open a reader on the next log in queue - private boolean openNextLog() throws IOException { - Path nextPath = logQueue.peek(); - if (nextPath != null) { - openReader(nextPath); - if (reader != null) { - return true; - } - } else { - // no more files in queue, this could happen for recovered queue, or for a wal group of a sync - // replication peer which has already been transited to DA or S. - setCurrentPath(null); - } - return false; - } - - private Path getArchivedLog(Path path) throws IOException { - Path rootDir = FSUtils.getRootDir(conf); - - // Try found the log in old dir - Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); - Path archivedLogLocation = new Path(oldLogDir, path.getName()); - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } - - // Try found the log in the seperate old log dir - oldLogDir = - new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) - .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); - archivedLogLocation = new Path(oldLogDir, path.getName()); - if (fs.exists(archivedLogLocation)) { - LOG.info("Log " + path + " was moved to " + archivedLogLocation); - return archivedLogLocation; - } - - LOG.error("Couldn't locate log: " + path); - return path; - } - - private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { - // If the log was archived, continue reading from there - Path archivedLog = getArchivedLog(path); - if (!path.equals(archivedLog)) { - openReader(archivedLog); - } else { - throw fnfe; - } - } - - private void openReader(Path path) throws IOException { - try { - // Detect if this is a new file, if so get a new reader else - // reset the current reader so that we see the new data - if (reader == null || !getCurrentPath().equals(path)) { - closeReader(); - reader = WALFactory.createReader(fs, path, conf); - seek(); - setCurrentPath(path); - } else { - resetReader(); - } - } catch (FileNotFoundException fnfe) { - handleFileNotFound(path, fnfe); - } catch (RemoteException re) { - IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); - if (!(ioe instanceof FileNotFoundException)) throw ioe; - handleFileNotFound(path, (FileNotFoundException)ioe); - } catch (LeaseNotRecoveredException lnre) { - // HBASE-15019 the WAL was not closed due to some hiccup. - LOG.warn("Try to recover the WAL lease " + currentPath, lnre); - recoverLease(conf, currentPath); - reader = null; - } catch (NullPointerException npe) { - // Workaround for race condition in HDFS-4380 - // which throws a NPE if we open a file before any data node has the most recent block - // Just sleep and retry. Will require re-reading compressed WALs for compressionContext. - LOG.warn("Got NPE opening reader, will retry."); - reader = null; - } - } - - // For HBASE-15019 - private void recoverLease(final Configuration conf, final Path path) { - try { - final FileSystem dfs = FSUtils.getCurrentFileSystem(conf); - FSUtils fsUtils = FSUtils.getInstance(dfs, conf); - fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() { - @Override - public boolean progress() { - LOG.debug("recover WAL lease: " + path); - return true; - } - }); - } catch (IOException e) { - LOG.warn("unable to recover lease for WAL: " + path, e); - } - } - - private void resetReader() throws IOException { - try { - currentEntry = null; - reader.reset(); - seek(); - } catch (FileNotFoundException fnfe) { - // If the log was archived, continue reading from there - Path archivedLog = getArchivedLog(currentPath); - if (!currentPath.equals(archivedLog)) { - openReader(archivedLog); - } else { - throw fnfe; - } - } catch (NullPointerException npe) { - throw new IOException("NPE resetting reader, likely HDFS-4380", npe); - } - } - - private void seek() throws IOException { - if (currentPositionOfEntry != 0) { - reader.seek(currentPositionOfEntry); - } - } + public void reset() throws IOException; - private long currentTrailerSize() { - long size = -1L; - if (reader instanceof ProtobufLogReader) { - final ProtobufLogReader pblr = (ProtobufLogReader) reader; - size = pblr.trailerSize(); - } - return size; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java index 010fa69..685570d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java @@ -18,17 +18,12 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.util.OptionalLong; - -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.yetus.audience.InterfaceAudience; -/** - * Used by replication to prevent replicating unacked log entries. See - * https://issues.apache.org/jira/browse/HBASE-14004 for more details. - */ @InterfaceAudience.Private @FunctionalInterface public interface WALFileLengthProvider { - OptionalLong getLogFileSizeIfBeingWritten(Path path); + OptionalLong getLogFileSizeIfBeingWritten(WALInfo path); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index ccdc95f..ed9966c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -40,7 +41,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.FSRecoveredReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -59,6 +67,9 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti @InterfaceStability.Evolving public abstract class AbstractFSWALProvider> implements WALProvider { + // Path to the wals directories + // Path to the wal archive + private Path oldLogDir; private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWALProvider.class); /** Separate old log into different dir by regionserver name **/ @@ -89,6 +100,7 @@ public abstract class AbstractFSWALProvider> implemen * we synchronized on walCreateLock to prevent wal recreation in different threads */ private final Object walCreateLock = new Object(); + private Path walRootDir; /** * @param factory factory that made us, identity used for FS layout. may not be null @@ -104,6 +116,8 @@ public abstract class AbstractFSWALProvider> implemen this.factory = factory; this.conf = conf; this.providerId = providerId; + walRootDir = FSUtils.getWALRootDir(conf); + this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); // get log prefix StringBuilder sb = new StringBuilder().append(factory.factoryId); if (providerId != null) { @@ -540,4 +554,27 @@ public abstract class AbstractFSWALProvider> implemen public static long getWALStartTimeFromWALName(String name) { return Long.parseLong(getWALNameGroupFromWALName(name, 2)); } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + walFileLengthProvider, serverName, metrics); + } + + @Override + public WALMetaDataProvider getWalMetaDataProvider() throws IOException { + return new FSWALMetaDataProvider(CommonFSUtils.getWALFileSystem(conf)); + } + + @Override + public WALInfo createWalInfo(String wal) { + return new FSWALInfo(wal); + } + + @Override + public RecoveredReplicationSource getRecoveredReplicationSource() { + return new FSRecoveredReplicationSource(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 75439fe..eb14f4b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.wal; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName; + import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -24,6 +26,7 @@ import java.util.Map; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -31,10 +34,18 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; +import org.apache.hadoop.hbase.replication.regionserver.FSRecoveredReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -55,6 +66,10 @@ class DisabledWALProvider implements WALProvider { WAL disabled; + private Path oldLogDir; + + private Path walRootDir; + @Override public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { if (null != disabled) { @@ -64,6 +79,8 @@ class DisabledWALProvider implements WALProvider { providerId = "defaultDisabled"; } disabled = new DisabledWAL(new Path(FSUtils.getWALRootDir(conf), providerId), conf, null); + walRootDir = FSUtils.getWALRootDir(conf); + this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); } @Override @@ -116,21 +133,21 @@ class DisabledWALProvider implements WALProvider { } @Override - public byte[][] rollWriter() { + public byte[][] rollWriter(boolean force) { if (!listeners.isEmpty()) { for (WALActionsListener listener : listeners) { listener.logRollRequested(false); } for (WALActionsListener listener : listeners) { try { - listener.preLogRoll(path, path); + listener.preLogRoll(new FSWALInfo(path), new FSWALInfo(path)); } catch (IOException exception) { LOG.debug("Ignoring exception from listener.", exception); } } for (WALActionsListener listener : listeners) { try { - listener.postLogRoll(path, path); + listener.postLogRoll(new FSWALInfo(path), new FSWALInfo(path)); } catch (IOException exception) { LOG.debug("Ignoring exception from listener.", exception); } @@ -140,11 +157,6 @@ class DisabledWALProvider implements WALProvider { } @Override - public byte[][] rollWriter(boolean force) { - return rollWriter(); - } - - @Override public void shutdown() { if(closed.compareAndSet(false, true)) { if (!this.listeners.isEmpty()) { @@ -243,7 +255,7 @@ class DisabledWALProvider implements WALProvider { } @Override - public OptionalLong getLogFileSizeIfBeingWritten(Path path) { + public OptionalLong getLogFileSizeIfBeingWritten(WALInfo path) { return OptionalLong.empty(); } } @@ -262,4 +274,27 @@ class DisabledWALProvider implements WALProvider { public void addWALActionsListener(WALActionsListener listener) { disabled.registerWALActionsListener(listener); } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + walFileLengthProvider, serverName, metrics); + } + + @Override + public WALMetaDataProvider getWalMetaDataProvider() throws IOException { + return null; + } + + @Override + public WALInfo createWalInfo(String wal) { + return new FSWALInfo(wal); + } + + @Override + public RecoveredReplicationSource getRecoveredReplicationSource() { + return new FSRecoveredReplicationSource(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALInfo.java new file mode 100644 index 0000000..a5f0bfc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALInfo.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FSWALInfo implements WALInfo{ + private static final Pattern WAL_FILE_NAME_PATTERN = + Pattern.compile("(.+)\\.(\\d+)(\\.[0-9A-Za-z]+)?"); + private String name; + private Path path; + + public FSWALInfo(String name) { + this.path = new Path(name); + if (path != null) { + this.name = path.getName(); + } + } + + public FSWALInfo(Path path) { + this.path = path; + if(path !=null){ + this.name = path.getName(); + } + } + + @Override + public String getName() { + return name; + } + + @Override + public long getWalStartTime() { + return Long.parseLong(getWALNameGroupFromWALName(name, 2)); + } + + private static String getWALNameGroupFromWALName(String name, int group) { + Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name); + if (matcher.matches()) { + return matcher.group(group); + } else { + throw new IllegalArgumentException(name + " is not a valid wal file name"); + } + } + + @Override + public long getSize() throws IOException { + // TODO Implement WALInfo.getSize + return -1; + } + + /** + * @return {@link Path} object of the name encapsulated in WalInfo + */ + public Path getPath() { + return path; + } + + @Override + public int compareTo(WALInfo o) { + FSWALInfo that = (FSWALInfo)o; + return this.path.compareTo(that.getPath()); + } + + @Override + public String toString() { + return this.path.toString(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof FSWALInfo)) { + return false; + } + FSWALInfo that = (FSWALInfo) obj; + return this.path.equals(that.getPath()); + } + @Override + public int hashCode() { + return this.path.hashCode(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALMetaDataProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALMetaDataProvider.java new file mode 100644 index 0000000..29a76a5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALMetaDataProvider.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FSWALMetaDataProvider implements WALMetaDataProvider { + + private FileSystem fs; + + public FSWALMetaDataProvider(FileSystem walFileSystem) { + this.fs=walFileSystem; + } + + @Override + public boolean exists(String logLocation) throws IOException { + return fs.exists(new Path(logLocation)); + } + + @Override + public WALInfo[] list(WALInfo logDir) throws IOException { + FileStatus[] listStatus = fs.listStatus(((FSWALInfo)logDir).getPath()); + WALInfo[] walInfos = new FSWALInfo[listStatus.length]; + for (FileStatus fileStatus : listStatus) { + walInfos[0] = new FSWALInfo(fileStatus.getPath()); + } + return walInfos; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index 0b7b8da..93e5ab1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -20,21 +20,33 @@ package org.apache.hadoop.hbase.wal; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; // imports for classes still in regionserver.wal import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.FSRecoveredReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -88,6 +100,9 @@ public class RegionGroupingProvider implements WALProvider { } } + private Path oldLogDir; + private Path walRootDir; + /** * instantiate a strategy from a config property. * requires conf to have already been set (as well as anything the provider might need to read). @@ -107,6 +122,8 @@ public class RegionGroupingProvider implements WALProvider { try { final RegionGroupingStrategy result = clazz.getDeclaredConstructor().newInstance(); result.init(conf, providerId); + this.walRootDir = FSUtils.getWALRootDir(conf); + this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); return result; } catch (Exception e) { LOG.error("couldn't set up region grouping strategy, check config key " + @@ -121,6 +138,8 @@ public class RegionGroupingProvider implements WALProvider { /** delegate provider for WAL creation/roll/close */ public static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate.provider"; + public static final String DELEGATE_PROVIDER_CLASS = + "hbase.wal.regiongrouping.delegate.provider.class"; public static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider .name(); @@ -155,7 +174,8 @@ public class RegionGroupingProvider implements WALProvider { } this.providerId = sb.toString(); this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY); - this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER); + this.providerClass = factory.getProviderClass(DELEGATE_PROVIDER_CLASS, DELEGATE_PROVIDER, + DEFAULT_DELEGATE_PROVIDER); } private WALProvider createProvider(String group) throws IOException { @@ -285,4 +305,28 @@ public class RegionGroupingProvider implements WALProvider { // extra code actually works, then we will have other big problems. So leave it as is. listeners.add(listener); } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + walFileLengthProvider, serverName, metrics); + } + + @Override + public WALMetaDataProvider getWalMetaDataProvider() throws IOException { + return new FSWALMetaDataProvider(CommonFSUtils.getWALFileSystem(conf)); + } + + @Override + public WALInfo createWalInfo(String wal) { + return new FSWALInfo(wal); + } + + @Override + public RecoveredReplicationSource getRecoveredReplicationSource() { + return new FSRecoveredReplicationSource(); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 9859c20..dd8c393 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.function.BiPredicate; @@ -36,16 +37,26 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.regionserver.FSRecoveredReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.KeyLocker; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; @@ -99,6 +110,10 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen private final KeyLocker createLock = new KeyLocker<>(); + private Path oldLogDir; + + private Path walRootDir; + SyncReplicationWALProvider(WALProvider provider) { this.provider = provider; } @@ -115,6 +130,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen provider.init(factory, conf, providerId); this.conf = conf; this.factory = factory; + walRootDir = FSUtils.getWALRootDir(conf); + this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); Pair> eventLoopGroupAndChannelClass = NettyAsyncFSWALConfigHelper.getEventLoopConfig(conf); eventLoopGroup = eventLoopGroupAndChannelClass.getFirst(); @@ -349,4 +366,36 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen return provider; } + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + walFileLengthProvider, serverName, metrics); + } + + @Override + public WALMetaDataProvider getWalMetaDataProvider() throws IOException { + return new FSWALMetaDataProvider(CommonFSUtils.getWALFileSystem(conf)); + } + + @Override + public WALInfo createWalInfo(String wal) { + return new FSWALInfo(wal); + } + + @Override + public RecoveredReplicationSource getRecoveredReplicationSource() { + return new FSRecoveredReplicationSource(); + } + + public WALInfo getWalFromArchivePath(String wal) { + return new FSWALInfo(new Path(oldLogDir, wal)); + } + + public WALInfo getFullPath(ServerName serverName, String wal) { + Path walWithServerName = new Path(getWALDirectoryName(serverName.toString()), wal); + return new FSWALInfo(new Path(walRootDir,walWithServerName)); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index cf367cd..8be162b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -60,19 +60,6 @@ public interface WAL extends Closeable, WALFileLengthProvider { * The implementation is synchronized in order to make sure there's one rollWriter * running at any given time. * - * @return If lots of logs, flush the returned regions so next time through we - * can clean logs. Returns null if nothing to flush. Names are actual - * region names as returned by {@link RegionInfo#getEncodedName()} - */ - byte[][] rollWriter() throws FailedLogCloseException, IOException; - - /** - * Roll the log writer. That is, start writing log messages to a new file. - * - *

- * The implementation is synchronized in order to make sure there's one rollWriter - * running at any given time. - * * @param force * If true, force creation of a new writer even if no entries have * been written to the current writer diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 24ebe68..2d4cdda 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -80,8 +80,11 @@ public class WALFactory { public static final String WAL_PROVIDER = "hbase.wal.provider"; static final String DEFAULT_WAL_PROVIDER = Providers.defaultProvider.name(); + public static final String WAL_PROVIDER_CLASS = "hbase.wal.provider.class"; + static final Class DEFAULT_WAL_PROVIDER_CLASS = AsyncFSWALProvider.class; public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider"; + public static final String META_WAL_PROVIDER_CLASS = "hbase.wal.meta_provider.class"; final String factoryId; private final WALProvider provider; @@ -120,7 +123,25 @@ public class WALFactory { } @VisibleForTesting - public Class getProviderClass(String key, String defaultValue) { + /* + * @param clsKey config key for provider classname + * @param key config key for provider enum + * @param defaultValue default value for provider enum + * @return Class which extends WALProvider + */ + public Class getProviderClass(String clsKey, String key, + String defaultValue) { + String clsName = conf.get(clsKey); + if (clsName == null || clsName.isEmpty()) { + clsName = conf.get(key, defaultValue); + } + if (clsName != null && !clsName.isEmpty()) { + try { + return (Class) Class.forName(clsName); + } catch (ClassNotFoundException exception) { + // try with enum key next + } + } try { Providers provider = Providers.valueOf(conf.get(key, defaultValue)); if (provider != Providers.defaultProvider) { @@ -139,7 +160,7 @@ public class WALFactory { // Fall back to them specifying a class name // Note that the passed default class shouldn't actually be used, since the above only fails // when there is a config value present. - return conf.getClass(key, Providers.defaultProvider.clazz, WALProvider.class); + return conf.getClass(key, AsyncFSWALProvider.class, WALProvider.class); } } @@ -186,7 +207,8 @@ public class WALFactory { this.factoryId = factoryId; // end required early initialization if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) { - WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); + WALProvider provider = createProvider( + getProviderClass(WAL_PROVIDER_CLASS, WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); if (enableSyncReplicationWALProvider) { provider = new SyncReplicationWALProvider(provider); } @@ -250,7 +272,7 @@ public class WALFactory { if (provider != null) { return provider; } - provider = createProvider(getProviderClass(META_WAL_PROVIDER, + provider = createProvider(getProviderClass(META_WAL_PROVIDER_CLASS, META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER))); provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID); provider.addWALActionsListener(new MetricsWAL()); @@ -356,29 +378,6 @@ public class WALFactory { } } - /** - * Create a writer for the WAL. - * Uses defaults. - *

- * Should be package-private. public only for tests and - * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} - * @return A WAL writer. Close when done with it. - */ - public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, false); - } - - /** - * Should be package-private, visible for recovery testing. - * Uses defaults. - * @return an overwritable writer for recovered edits. caller should close. - */ - @VisibleForTesting - public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) - throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, true); - } - // These static methods are currently used where it's impractical to // untangle the reliance on state in the filesystem. They rely on singleton // WALFactory that just provides Reader / Writers. @@ -437,29 +436,6 @@ public class WALFactory { return getInstance(configuration).createReader(fs, path, null, false); } - /** - * If you already have a WALFactory, you should favor the instance method. - * Uses defaults. - * @return a Writer that will overwrite files. Caller must close. - */ - static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, - final Configuration configuration) - throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, true); - } - - /** - * If you already have a WALFactory, you should favor the instance method. - * Uses defaults. - * @return a writer that won't overwrite files. Caller must close. - */ - @VisibleForTesting - public static Writer createWALWriter(final FileSystem fs, final Path path, - final Configuration configuration) - throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, false); - } - public final WALProvider getWALProvider() { return this.provider; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALInfo.java new file mode 100644 index 0000000..1d62ca2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALInfo.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import org.apache.hadoop.fs.Path; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface WALInfo extends Comparable { + + WALInfo UNKNOWN = new WALInfo() { + + @Override + public long getWalStartTime() { + return 0; + } + + @Override + public long getSize() throws IOException{ + return 0; + } + + @Override + public String getName() { + return "UNKNOWN"; + } + + @Override + public int compareTo(WALInfo o) { + return 0; + } + }; + + /** + * For the FS based path, it will be just a file name of whole path + * For stream based, it will be name of the stream + * @return name of the wal + */ + String getName(); + + /** + * Starting time of the wal which help in sorting against the others + * @return start time of the wal + */ + long getWalStartTime(); + + /** + * Used for getting the size of the Wal + * @return size of the log stream or file + * @throws IOException + */ + long getSize() throws IOException; + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALMetaDataProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALMetaDataProvider.java new file mode 100644 index 0000000..9f64c15 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALMetaDataProvider.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * MetaData provider for the given WAL implementation + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface WALMetaDataProvider { + + /** + * @param log complete name of the log to check if it exists or not + * @return + * @throws IOException + */ + boolean exists(String log) throws IOException; + + /** + * @param walInfo it could be a namespace for a Stream or directory/path for a FS based storage + * @return + * @throws IOException + */ + WALInfo[] list(WALInfo walInfo) throws IOException; + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 244a636..6804b9f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -23,9 +23,14 @@ import java.io.IOException; import java.util.List; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.yetus.audience.InterfaceAudience; @@ -113,4 +118,38 @@ public interface WALProvider { return path -> getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)) .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); } + + /** + * Streaming implementation to retrieve WAL entries from given set of Wals. This class is given a queue of WAL + * @param logQueue Queue of wals + * @param conf configuration + * @param startPosition start position for the first wal in the queue + * @param walFileLengthProvider + * @param serverName name of the server + * @param metrics metric source + * @return + * @throws IOException + */ + WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException; + + /** + * @return MetaData provider for the given WAL implementation + * @throws IOException + */ + WALMetaDataProvider getWalMetaDataProvider() throws IOException; + + /** + * Creates WalInfo for Wal path/name + * @param wal + * @return + */ + WALInfo createWalInfo(String wal); + + /** + * Replication source to replicate edits of a dead regionserver + * @return + */ + RecoveredReplicationSource getRecoveredReplicationSource(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 65d5fb7..36123e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -802,7 +802,7 @@ public class WALSplitter { */ protected Writer createWriter(Path logfile) throws IOException { - return walFactory.createRecoveredEditsWriter(fs, logfile); + return FSHLogProvider.createWriter(conf, fs, logfile, true); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java index 1da31da..8f1acfb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALCoprocessor.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.wal.WALKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,13 +149,13 @@ public class SampleRegionWALCoprocessor implements WALCoprocessor, RegionCoproce @Override public void preWALRoll(ObserverContext ctx, - Path oldPath, Path newPath) throws IOException { + WALInfo oldPath, WALInfo newPath) throws IOException { preWALRollCalled = true; } @Override public void postWALRoll(ObserverContext ctx, - Path oldPath, Path newPath) throws IOException { + WALInfo oldPath, WALInfo newPath) throws IOException { postWALRollCalled = true; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java index 937b5dd..542c38e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorderMultiBlocks.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -155,7 +156,7 @@ public class TestBlockReorderMultiBlocks { // listen for successful log rolls final WALActionsListener listener = new WALActionsListener() { @Override - public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { + public void postLogRoll(final WALInfo oldPath, final WALInfo newPath) throws IOException { latch.countDown(); } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index da07c7b..91528c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -151,6 +151,7 @@ import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.FaultyFSLog; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; @@ -654,7 +655,7 @@ public class TestHRegion { for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = FSHLogProvider.createWriter(CONF, fs, recoveredEdits, true); long time = System.nanoTime(); WALEdit edit = new WALEdit(); @@ -705,7 +706,7 @@ public class TestHRegion { for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = FSHLogProvider.createWriter(CONF, fs, recoveredEdits, true); long time = System.nanoTime(); WALEdit edit = new WALEdit(); @@ -796,7 +797,7 @@ public class TestHRegion { for (long i = minSeqId; i <= maxSeqId; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = FSHLogProvider.createWriter(CONF, fs, recoveredEdits, true); long time = System.nanoTime(); WALEdit edit = null; @@ -908,7 +909,7 @@ public class TestHRegion { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = FSHLogProvider.createWriter(CONF, fs, recoveredEdits, true); long time = System.nanoTime(); @@ -1033,7 +1034,7 @@ public class TestHRegion { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); fs.create(recoveredEdits); - WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + WALProvider.Writer writer = FSHLogProvider.createWriter(walConf, fs, recoveredEdits, true); for (WAL.Entry entry : flushDescriptors) { writer.append(entry); @@ -6457,7 +6458,7 @@ public class TestHRegion { } for (int i = 0; i < testCount; i++) { - region.getWAL().rollWriter(); + region.getWAL().rollWriter(false); Thread.yield(); } } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index 9bbce09..9f1b114 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -490,7 +490,7 @@ public class TestPerColumnFamilyFlush { } // Roll the WAL. The log file count is less than maxLogs so no flush is triggered. int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion); - assertNull(getWAL(desiredRegion).rollWriter()); + assertNull(getWAL(desiredRegion).rollWriter(false)); while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) { Thread.sleep(100); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java index 599260b..7af9c68 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALMonotonicallyIncreasingSeqId.java @@ -223,7 +223,7 @@ public class TestWALMonotonicallyIncreasingSeqId { incThread.join(); Path logPath = ((AbstractFSWAL) region.getWAL()).getCurrentFileName(); - region.getWAL().rollWriter(); + region.getWAL().rollWriter(false); Thread.sleep(10); Path hbaseDir = new Path(walConf.get(HConstants.HBASE_DIR)); Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index 3e0bc55..482334e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -236,7 +236,7 @@ public abstract class AbstractTestFSWAL { } /** - * On rolling a wal after reaching the threshold, {@link WAL#rollWriter()} returns the list of + * On rolling a wal after reaching the threshold, {@link WAL#rollWriter(false)} returns the list of * regions which should be flushed in order to archive the oldest wal file. *

* This method tests this behavior by inserting edits and rolling the wal enough times to reach @@ -269,10 +269,10 @@ public abstract class AbstractTestFSWAL { } try { addEdits(wal, hri1, t1, 2, mvcc, scopes1); - wal.rollWriter(); + wal.rollWriter(false); // add some more edits and roll the wal. This would reach the log number threshold addEdits(wal, hri1, t1, 2, mvcc, scopes1); - wal.rollWriter(); + wal.rollWriter(false); // with above rollWriter call, the max logs limit is reached. assertTrue(wal.getNumRolledLogFiles() == 2); @@ -290,7 +290,7 @@ public abstract class AbstractTestFSWAL { // flush region 1, and roll the wal file. Only last wal which has entries for region1 should // remain. flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); - wal.rollWriter(); + wal.rollWriter(false); // only one wal should remain now (that is for the second region). assertEquals(1, wal.getNumRolledLogFiles()); // flush the second region @@ -301,11 +301,11 @@ public abstract class AbstractTestFSWAL { // add edits both to region 1 and region 2, and roll. addEdits(wal, hri1, t1, 2, mvcc, scopes1); addEdits(wal, hri2, t2, 2, mvcc, scopes2); - wal.rollWriter(); + wal.rollWriter(false); // add edits and roll the writer, to reach the max logs limit. assertEquals(1, wal.getNumRolledLogFiles()); addEdits(wal, hri1, t1, 2, mvcc, scopes1); - wal.rollWriter(); + wal.rollWriter(false); // it should return two regions to flush, as the oldest wal file has entries // for both regions. regionsToFlush = wal.findRegionsToForceFlush(); @@ -319,7 +319,7 @@ public abstract class AbstractTestFSWAL { addEdits(wal, hri1, t1, 2, mvcc, scopes1); // tests partial flush: roll on a partial flush, and ensure that wal is not archived. wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); - wal.rollWriter(); + wal.rollWriter(false); wal.completeCacheFlush(hri1.getEncodedNameAsBytes()); assertEquals(1, wal.getNumRolledLogFiles()); } finally { @@ -480,6 +480,6 @@ public abstract class AbstractTestFSWAL { AbstractFSWAL wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName, CONF, null, true, null, null); wal.close(); - wal.rollWriter(); + wal.rollWriter(false); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java index 9322c5e..09be6e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRollPeriod.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALInfo; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -130,10 +131,10 @@ public abstract class AbstractTestLogRollPeriod { private void checkMinLogRolls(final WAL log, final int minRolls) throws Exception { - final List paths = new ArrayList<>(); + final List paths = new ArrayList<>(); log.registerWALActionsListener(new WALActionsListener() { @Override - public void postLogRoll(Path oldFile, Path newFile) { + public void postLogRoll(WALInfo oldFile, WALInfo newFile) { LOG.debug("postLogRoll: oldFile="+oldFile+" newFile="+newFile); paths.add(newFile); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 610af61..918b666 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -200,7 +200,7 @@ public abstract class AbstractTestLogRolling { } // Now roll the log - log.rollWriter(); + log.rollWriter(false); int count = AbstractFSWALProvider.getNumRolledLogFiles(log); LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); @@ -266,7 +266,7 @@ public abstract class AbstractTestLogRolling { assertEquals(2, s.getStorefilesCount()); // Roll the log and compact table, to have compaction record in the 2nd WAL. - log.rollWriter(); + log.rollWriter(false); assertEquals("Should have WAL; one table is not flushed", 1, AbstractFSWALProvider.getNumRolledLogFiles(log)); admin.flush(table.getName()); @@ -280,14 +280,14 @@ public abstract class AbstractTestLogRolling { // Write some value to the table so the WAL cannot be deleted until table is flushed. doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table. - log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet. + log.rollWriter(false); // 1st WAL deleted, 2nd not deleted yet. assertEquals("Should have WAL; one table is not flushed", 1, AbstractFSWALProvider.getNumRolledLogFiles(log)); // Flush table to make latest WAL obsolete; write another record, and roll again. admin.flush(table.getName()); doPut(table, 1); - log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added. + log.rollWriter(false); // Now 2nd WAL is deleted and 3rd is added. assertEquals("Should have 1 WALs at the end", 1, AbstractFSWALProvider.getNumRolledLogFiles(log)); } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 3eed137..6201eea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -229,7 +229,7 @@ public class TestLogRollAbort { LOG.debug("Trying to roll the WAL."); try { - log.rollWriter(); + log.rollWriter(false); Assert.fail("rollWriter() did not throw any exception."); } catch (IOException ioe) { if (ioe.getCause() instanceof FileNotFoundException) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index e19361e..470cdd0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -52,8 +52,10 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALInfo; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.junit.BeforeClass; @@ -251,20 +253,20 @@ public class TestLogRolling extends AbstractTestLogRolling { server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); final WAL log = server.getWAL(region); - final List paths = new ArrayList<>(1); + final List paths = new ArrayList<>(1); final List preLogRolledCalled = new ArrayList<>(); - paths.add(AbstractFSWALProvider.getCurrentFileName(log)); + paths.add(new FSWALInfo(AbstractFSWALProvider.getCurrentFileName(log))); log.registerWALActionsListener(new WALActionsListener() { @Override - public void preLogRoll(Path oldFile, Path newFile) { + public void preLogRoll(WALInfo oldFile, WALInfo newFile) { LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile); preLogRolledCalled.add(new Integer(1)); } @Override - public void postLogRoll(Path oldFile, Path newFile) { + public void postLogRoll(WALInfo oldFile, WALInfo newFile) { paths.add(newFile); } }); @@ -315,15 +317,16 @@ public class TestLogRolling extends AbstractTestLogRolling { // read back the data written Set loggedRows = new HashSet<>(); FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration()); - for (Path p : paths) { + for (WALInfo wi : paths) { + FSWALInfo p = (FSWALInfo)wi; LOG.debug("recovering lease for " + p); - fsUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), + fsUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p.getPath(), TEST_UTIL.getConfiguration(), null); - LOG.debug("Reading WAL " + FSUtils.getPath(p)); + LOG.debug("Reading WAL " + FSUtils.getPath(p.getPath())); WAL.Reader reader = null; try { - reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration()); + reader = WALFactory.createReader(fs, p.getPath(), TEST_UTIL.getConfiguration()); WAL.Entry entry; while ((entry = reader.next()) != null) { LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells()); @@ -333,7 +336,7 @@ public class TestLogRolling extends AbstractTestLogRolling { } } } catch (EOFException e) { - LOG.debug("EOF reading file " + FSUtils.getPath(p)); + LOG.debug("EOF reading file " + FSUtils.getPath(p.getPath())); } finally { if (reader != null) reader.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index 819df67..d95fe80 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -155,7 +155,7 @@ public class TestLogRollingNoCluster { long now = System.currentTimeMillis(); // Roll every ten edits if (i % 10 == 0) { - this.wal.rollWriter(); + this.wal.rollWriter(false); } WALEdit edit = new WALEdit(); byte[] bytes = Bytes.toBytes(i); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index 0967a75..5b7d592 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.After; import org.junit.Before; @@ -119,7 +120,7 @@ public class TestWALActionsListener { wal.registerWALActionsListener(laterobserver); } if (i % 2 == 0) { - wal.rollWriter(); + wal.rollWriter(false); } } @@ -142,12 +143,12 @@ public class TestWALActionsListener { public int closedCount = 0; @Override - public void preLogRoll(Path oldFile, Path newFile) { + public void preLogRoll(WALInfo oldFile, WALInfo newFile) { preLogRollCounter++; } @Override - public void postLogRoll(Path oldFile, Path newFile) { + public void postLogRoll(WALInfo oldFile, WALInfo newFile) { postLogRollCounter++; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index 67f793d..89738e1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; @@ -33,6 +32,8 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALInfo; +import org.apache.hadoop.hbase.wal.WALProvider; /** * Source that does nothing at all, helpful to test ReplicationSourceManager @@ -42,15 +43,15 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { private ReplicationSourceManager manager; private ReplicationPeer replicationPeer; private String peerClusterId; - private Path currentPath; + private WALInfo currentWALInfo; private MetricsSource metrics; private WALFileLengthProvider walFileLengthProvider; private AtomicBoolean startup = new AtomicBoolean(false); @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, - UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics, WALProvider walProvider) throws IOException { this.manager = manager; this.peerClusterId = peerClusterId; @@ -60,14 +61,14 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { } @Override - public void enqueueLog(Path log) { - this.currentPath = log; + public void enqueueLog(WALInfo log) { + this.currentWALInfo = log; metrics.incrSizeOfLogQueue(); } @Override - public Path getCurrentPath() { - return this.currentPath; + public WALInfo getCurrentWALInfo() { + return this.currentWALInfo; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java index 1b98518..65d6a83 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SerialReplicationTestBase.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; @@ -184,7 +185,7 @@ public class SerialReplicationTestBase { protected final void setupWALWriter() throws IOException { logPath = new Path(LOG_DIR, name.getMethodName()); - WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); + WRITER = FSHLogProvider.createWriter(UTIL.getConfiguration(), FS, logPath, false); } protected final void waitUntilReplicationDone(int expectedEntries) throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index 37ca7dc..1b97009 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -695,7 +696,7 @@ public class TestMasterReplication { // listen for successful log rolls final WALActionsListener listener = new WALActionsListener() { @Override - public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { + public void postLogRoll(final WALInfo oldPath, final WALInfo newPath) throws IOException { latch.countDown(); } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index 225ca7f..13d35d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.BeforeClass; @@ -219,7 +220,7 @@ public class TestMultiSlaveReplication { // listen for successful log rolls final WALActionsListener listener = new WALActionsListener() { @Override - public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { + public void postLogRoll(final WALInfo oldPath, final WALInfo newPath) throws IOException { latch.countDown(); } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java index 4effe41..2f70389 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.ArrayList; import java.util.List; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.Waiter; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterfa import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSWALInfo; import org.apache.hadoop.hbase.wal.WAL; import org.junit.Before; import org.junit.ClassRule; @@ -69,7 +71,8 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() .getSources()) { ReplicationSource source = (ReplicationSource) rsi; - if (!currentFile.equals(source.getCurrentPath())) { + FSWALInfo wi = (FSWALInfo) source.getCurrentWALInfo(); + if (!currentFile.equals(wi.getPath())) { return false; } } @@ -104,8 +107,8 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase { for (int i = 0; i < numRs; i++) { HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i); Replication replicationService = (Replication) hrs.getReplicationSourceService(); - replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i)); - replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i)); + replicationService.getReplicationManager().preLogRoll(new FSWALInfo(emptyWalPaths.get(i))); + replicationService.getReplicationManager().postLogRoll(new FSWALInfo(emptyWalPaths.get(i))); RegionInfo regionInfo = utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java index 8ff4d84..4763c05 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALInfo; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; @@ -74,7 +75,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase { } rs = utility1.getRSForFirstRegionInTable(tableName); metrics = rs.getWalGroupsReplicationStatus(); - Path lastPath = null; + WALInfo lastPath = null; for (Map.Entry metric : metrics.entrySet()) { lastPath = metric.getValue().getCurrentPath(); Assert.assertEquals("peerId", PEER_ID2, metric.getValue().getPeerId()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java index bd800a8..4cb78bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; @@ -138,7 +139,7 @@ public class TestRaceWhenCreatingReplicationSource { Path dir = UTIL.getDataTestDirOnTestFS(); FS = UTIL.getTestFileSystem(); LOG_PATH = new Path(dir, "replicated"); - WRITER = WALFactory.createWALWriter(FS, LOG_PATH, UTIL.getConfiguration()); + WRITER = FSHLogProvider.createWriter(UTIL.getConfiguration(), FS, LOG_PATH, false); UTIL.getAdmin().addReplicationPeer(PEER_ID, ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java index 274ccab..48f883b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java @@ -57,9 +57,12 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.FSHLogProvider; +import org.apache.hadoop.hbase.wal.FSWALInfo; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; import org.junit.AfterClass; @@ -118,8 +121,8 @@ public class TestReplicationSource { Path logPath = new Path(logDir, "log"); if (!FS.exists(logDir)) FS.mkdirs(logDir); if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir); - WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath, - TEST_UTIL.getConfiguration()); + WALProvider.Writer writer = FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), FS, + logPath, false); for(int i = 0; i < 3; i++) { byte[] b = Bytes.toBytes(Integer.toString(i)); KeyValue kv = new KeyValue(b,b,b); @@ -174,8 +177,8 @@ public class TestReplicationSource { testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); - source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null, - p -> OptionalLong.empty(), null); + source.init(testConf, manager, null, mockPeer, null, "testPeer", null, + p -> OptionalLong.empty(), null, null); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit(new Runnable() { @@ -301,8 +304,8 @@ public class TestReplicationSource { String walGroupId = "fake-wal-group-id"; ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L); ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L); - PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); - queue.put(new Path("/www/html/test")); + PriorityBlockingQueue queue = new PriorityBlockingQueue<>(); + queue.put(new FSWALInfo(new Path("/www/html/test"))); RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class); Server server = Mockito.mock(Server.class); Mockito.when(server.getServerName()).thenReturn(serverName); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index febe764..a86e268 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -84,10 +84,13 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.FSWALInfo; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -195,7 +198,10 @@ public abstract class TestReplicationSourceManager { logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME); remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME); replication = new Replication(); - replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null); + DummyServer dummyServer = new DummyServer(); + WALFactory factory = + new WALFactory(conf, dummyServer.getServerName().toString()); + replication.initialize(dummyServer, factory.getWALProvider()); managerOfCluster = getManagerFromCluster(); if (managerOfCluster != null) { // After replication procedure, we need to add peer by hand (other than by receiving @@ -298,7 +304,7 @@ public abstract class TestReplicationSourceManager { // Testing normal log rolling every 20 for(long i = 1; i < 101; i++) { if(i > 1 && i % 20 == 0) { - wal.rollWriter(); + wal.rollWriter(false); } LOG.info(Long.toString(i)); final long txid = wal.append( @@ -330,14 +336,14 @@ public abstract class TestReplicationSourceManager { } assertEquals(6, logNumber); - wal.rollWriter(); + wal.rollWriter(false); ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); when(source.getQueueId()).thenReturn("1"); when(source.isRecovered()).thenReturn(false); when(source.isSyncReplication()).thenReturn(false); manager.logPositionAndCleanOldLogs(source, - new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); + new WALEntryBatch(0, manager.getSources().get(0).getCurrentWALInfo())); wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), @@ -572,7 +578,7 @@ public abstract class TestReplicationSourceManager { assertNotNull(source); final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); // Enqueue log and check if metrics updated - source.enqueueLog(new Path("abc")); + source.enqueueLog(new FSWALInfo(new Path("abc"))); assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); @@ -618,7 +624,7 @@ public abstract class TestReplicationSourceManager { // make sure that we can deal with files which does not exist String walNameNotExists = "remoteWAL-12345-" + slaveId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX; - Path wal = new Path(logDir, walNameNotExists); + WALInfo wal = new FSWALInfo(new Path(logDir, walNameNotExists)); manager.preLogRoll(wal); manager.postLogRoll(wal); @@ -629,7 +635,7 @@ public abstract class TestReplicationSourceManager { Path remoteWAL = new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); fs.create(remoteWAL).close(); - wal = new Path(logDir, walName); + wal = new FSWALInfo(new Path(logDir, walName)); manager.preLogRoll(wal); manager.postLogRoll(wal); @@ -803,9 +809,9 @@ public abstract class TestReplicationSourceManager { static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { @Override - public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, + public void init(Configuration conf, ReplicationSourceManager manager, ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, - UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) + UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics, WALProvider provider) throws IOException { throw new IOException("Failing deliberately"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index fac6f74..5e72986 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -56,10 +56,12 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.FSWALInfo; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALInfo; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; @@ -98,13 +100,15 @@ public class TestWALEntryStream { } private WAL log; - PriorityBlockingQueue walQueue; + PriorityBlockingQueue walQueue; private PathWatcher pathWatcher; @Rule public TestName tn = new TestName(); private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + private WALFactory wals; + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL = new HBaseTestingUtility(); @@ -124,7 +128,7 @@ public class TestWALEntryStream { public void setUp() throws Exception { walQueue = new PriorityBlockingQueue<>(); pathWatcher = new PathWatcher(); - final WALFactory wals = new WALFactory(CONF, tn.getMethodName()); + wals = new WALFactory(CONF, tn.getMethodName()); wals.getWALProvider().addWALActionsListener(pathWatcher); log = wals.getWAL(info); } @@ -153,10 +157,10 @@ public class TestWALEntryStream { appendToLogAndSync(walEditKVs); } - log.rollWriter(); + log.rollWriter(false); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"))) { int i = 0; while (entryStream.hasNext()) { assertNotNull(entryStream.next()); @@ -183,7 +187,7 @@ public class TestWALEntryStream { appendToLogAndSync(); long oldPos; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"))) { // There's one edit in the log, read it. Reading past it needs to throw exception assertTrue(entryStream.hasNext()); WAL.Entry entry = entryStream.peek(); @@ -197,7 +201,7 @@ public class TestWALEntryStream { appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos, + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, oldPos, log, null, new MetricsSource("1"))) { // Read the newly added entry, make sure we made progress WAL.Entry entry = entryStream.next(); @@ -208,10 +212,10 @@ public class TestWALEntryStream { // We rolled but we still should see the end of the first log and get that item appendToLogAndSync(); - log.rollWriter(); + log.rollWriter(false); appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, oldPos, + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, oldPos, log, null, new MetricsSource("1"))) { WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); @@ -237,11 +241,11 @@ public class TestWALEntryStream { appendToLog("1"); appendToLog("2");// 2 try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"))) { assertEquals("1", getRow(entryStream.next())); appendToLog("3"); // 3 - comes in after reader opened - log.rollWriter(); // log roll happening while we're reading + log.rollWriter(false); // log roll happening while we're reading appendToLog("4"); // 4 - this append is in the rolled log assertEquals("2", getRow(entryStream.next())); @@ -262,7 +266,7 @@ public class TestWALEntryStream { public void testNewEntriesWhileStreaming() throws Exception { appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"))) { entryStream.next(); // we've hit the end of the stream at this point // some new entries come in while we're streaming @@ -285,7 +289,7 @@ public class TestWALEntryStream { long lastPosition = 0; appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"))) { entryStream.next(); // we've hit the end of the stream at this point appendToLog("2"); appendToLog("3"); @@ -293,7 +297,7 @@ public class TestWALEntryStream { } // next stream should picks up where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) { assertEquals("2", getRow(entryStream.next())); assertEquals("3", getRow(entryStream.next())); assertFalse(entryStream.hasNext()); // done @@ -310,14 +314,14 @@ public class TestWALEntryStream { long lastPosition = 0; appendEntriesToLogAndSync(3); // read only one element - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, lastPosition, + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) { entryStream.next(); lastPosition = entryStream.getPosition(); } // there should still be two more entries from where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, lastPosition, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) { assertNotNull(entryStream.next()); assertNotNull(entryStream.next()); assertFalse(entryStream.hasNext()); @@ -328,7 +332,7 @@ public class TestWALEntryStream { @Test public void testEmptyStream() throws Exception { try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"))) { assertFalse(entryStream.hasNext()); } } @@ -343,6 +347,7 @@ public class TestWALEntryStream { when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); + when(source.getWalProvider()).thenReturn(wals.getWALProvider()); return source; } @@ -350,7 +355,7 @@ public class TestWALEntryStream { ReplicationSource source = mockReplicationSource(recovered, conf); when(source.isPeerEnabled()).thenReturn(true); ReplicationSourceWALReader reader = - new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); + new ReplicationSourceWALReader( conf, walQueue, 0, getDummyFilter(), source); reader.start(); return reader; } @@ -361,7 +366,7 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"))) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -369,7 +374,7 @@ public class TestWALEntryStream { } // start up a reader - Path walPath = walQueue.peek(); + WALInfo walPath = walQueue.peek(); ReplicationSourceWALReader reader = createReader(false, CONF); WALEntryBatch entryBatch = reader.take(); @@ -389,8 +394,8 @@ public class TestWALEntryStream { @Test public void testReplicationSourceWALReaderRecovered() throws Exception { appendEntriesToLogAndSync(10); - Path walPath = walQueue.peek(); - log.rollWriter(); + WALInfo walPath = walQueue.peek(); + log.rollWriter(false); appendEntriesToLogAndSync(5); log.shutdown(); @@ -422,14 +427,14 @@ public class TestWALEntryStream { @Test public void testReplicationSourceWALReaderWrongPosition() throws Exception { appendEntriesToLogAndSync(1); - Path walPath = walQueue.peek(); - log.rollWriter(); + WALInfo walPath = walQueue.peek(); + log.rollWriter(false); appendEntriesToLogAndSync(20); TEST_UTIL.waitFor(5000, new ExplainingPredicate() { @Override public boolean evaluate() throws Exception { - return fs.getFileStatus(walPath).getLen() > 0; + return fs.getFileStatus(((FSWALInfo)walPath).getPath()).getLen() > 0; } @Override @@ -438,7 +443,7 @@ public class TestWALEntryStream { } }); - long walLength = fs.getFileStatus(walPath).getLen(); + long walLength = fs.getFileStatus(((FSWALInfo)walPath).getPath()).getLen(); ReplicationSourceWALReader reader = createReader(false, CONF); @@ -449,20 +454,20 @@ public class TestWALEntryStream { assertEquals(1, entryBatch.getNbEntries()); assertTrue(entryBatch.isEndOfFile()); - Path walPath2 = walQueue.peek(); + WALInfo walPath2 = walQueue.peek(); entryBatch = reader.take(); assertEquals(walPath2, entryBatch.getLastWalPath()); assertEquals(20, entryBatch.getNbEntries()); assertFalse(entryBatch.isEndOfFile()); - log.rollWriter(); + log.rollWriter(false); appendEntriesToLogAndSync(10); entryBatch = reader.take(); assertEquals(walPath2, entryBatch.getLastWalPath()); assertEquals(0, entryBatch.getNbEntries()); assertTrue(entryBatch.isEndOfFile()); - Path walPath3 = walQueue.peek(); + WALInfo walPath3 = walQueue.peek(); entryBatch = reader.take(); assertEquals(walPath3, entryBatch.getLastWalPath()); assertEquals(10, entryBatch.getNbEntries()); @@ -476,7 +481,7 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) { + new FSWALEntryStream(fs, walQueue, CONF, 0, log, null, new MetricsSource("1"))) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -484,7 +489,7 @@ public class TestWALEntryStream { } // start up a reader - Path walPath = walQueue.peek(); + WALInfo walPath = walQueue.peek(); ReplicationSource source = mockReplicationSource(false, CONF); AtomicInteger invokeCount = new AtomicInteger(0); AtomicBoolean enabled = new AtomicBoolean(false); @@ -494,7 +499,7 @@ public class TestWALEntryStream { }); ReplicationSourceWALReader reader = - new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source); + new ReplicationSourceWALReader(CONF, walQueue, 0, getDummyFilter(), source); reader.start(); Future future = ForkJoinPool.commonPool().submit(() -> { return reader.take(); @@ -577,10 +582,10 @@ public class TestWALEntryStream { class PathWatcher implements WALActionsListener { - Path currentPath; + WALInfo currentPath; @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { + public void preLogRoll(WALInfo oldPath, WALInfo newPath) throws IOException { walQueue.add(newPath); currentPath = newPath; } @@ -592,7 +597,7 @@ public class TestWALEntryStream { appendToLog("2"); long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong(); AtomicLong fileLength = new AtomicLong(size - 1); - try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, CONF, 0, + try (WALEntryStream entryStream = new FSWALEntryStream(fs, walQueue, CONF, 0, p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) { assertTrue(entryStream.hasNext()); assertNotNull(entryStream.next()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index 453b742..574f5c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -21,22 +21,32 @@ package org.apache.hadoop.hbase.wal; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.DEFAULT_PROVIDER_ID; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.META_WAL_PROVIDER_ID; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.regionserver.FSRecoveredReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.FSWALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.WALEntryStream; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -89,6 +99,10 @@ public class IOTestProvider implements WALProvider { private String providerId; private List listeners = new ArrayList<>(); + + private Path oldLogDir; + + private Path walRootDir; /** * @param factory factory that made us, identity used for FS layout. may not be null * @param conf may not be null @@ -103,6 +117,8 @@ public class IOTestProvider implements WALProvider { this.factory = factory; this.conf = conf; this.providerId = providerId != null ? providerId : DEFAULT_PROVIDER_ID; + this.walRootDir = FSUtils.getWALRootDir(conf); + this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); } @Override @@ -285,4 +301,27 @@ public class IOTestProvider implements WALProvider { // TODO Implement WALProvider.addWALActionLister } + + @Override + public WALEntryStream getWalStream(PriorityBlockingQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException { + return new FSWALEntryStream(CommonFSUtils.getWALFileSystem(conf), logQueue, conf, startPosition, + walFileLengthProvider, serverName, metrics); + } + + @Override + public WALMetaDataProvider getWalMetaDataProvider() throws IOException { + return new FSWALMetaDataProvider(CommonFSUtils.getWALFileSystem(conf)); + } + + @Override + public WALInfo createWalInfo(String wal) { + return new FSWALInfo(new Path(wal)); + } + + @Override + public RecoveredReplicationSource getRecoveredReplicationSource() { + return new FSRecoveredReplicationSource(); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java index 3205d73..7b56412 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java @@ -210,12 +210,12 @@ public class TestFSHLogProvider { // Add a single edit and make sure that rolling won't remove the file // Before HBASE-3198 it used to delete it addEdits(log, hri, htd, 1, scopes1); - log.rollWriter(); + log.rollWriter(false); assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(log)); // See if there's anything wrong with more than 1 edit addEdits(log, hri, htd, 2, scopes1); - log.rollWriter(); + log.rollWriter(false); assertEquals(2, FSHLogProvider.getNumRolledLogFiles(log)); // Now mix edits from 2 regions, still no flushing @@ -223,7 +223,7 @@ public class TestFSHLogProvider { addEdits(log, hri2, htd2, 1, scopes2); addEdits(log, hri, htd, 1, scopes1); addEdits(log, hri2, htd2, 1, scopes2); - log.rollWriter(); + log.rollWriter(false); assertEquals(3, AbstractFSWALProvider.getNumRolledLogFiles(log)); // Flush the first region, we expect to see the first two files getting @@ -231,7 +231,7 @@ public class TestFSHLogProvider { addEdits(log, hri2, htd2, 1, scopes2); log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); - log.rollWriter(); + log.rollWriter(false); int count = AbstractFSWALProvider.getNumRolledLogFiles(log); assertEquals(2, count); @@ -241,7 +241,7 @@ public class TestFSHLogProvider { addEdits(log, hri2, htd2, 1, scopes2); log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames()); log.completeCacheFlush(hri2.getEncodedNameAsBytes()); - log.rollWriter(); + log.rollWriter(false); assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log)); } finally { if (wals != null) { @@ -290,27 +290,27 @@ public class TestFSHLogProvider { // variables to mock region sequenceIds. // start with the testing logic: insert a waledit, and roll writer addEdits(wal, hri1, table1, 1, scopes1); - wal.rollWriter(); + wal.rollWriter(false); // assert that the wal is rolled assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal)); // add edits in the second wal file, and roll writer. addEdits(wal, hri1, table1, 1, scopes1); - wal.rollWriter(); + wal.rollWriter(false); // assert that the wal is rolled assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal)); // add a waledit to table1, and flush the region. addEdits(wal, hri1, table1, 3, scopes1); flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getColumnFamilyNames()); // roll log; all old logs should be archived. - wal.rollWriter(); + wal.rollWriter(false); assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); // add an edit to table2, and roll writer addEdits(wal, hri2, table2, 1, scopes2); - wal.rollWriter(); + wal.rollWriter(false); assertEquals(1, AbstractFSWALProvider.getNumRolledLogFiles(wal)); // add edits for table1, and roll writer addEdits(wal, hri1, table1, 2, scopes1); - wal.rollWriter(); + wal.rollWriter(false); assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal)); // add edits for table2, and flush hri1. addEdits(wal, hri2, table2, 2, scopes2); @@ -320,12 +320,12 @@ public class TestFSHLogProvider { // log2: region1 (flushed) // log3: region2 (unflushed) // roll the writer; log2 should be archived. - wal.rollWriter(); + wal.rollWriter(false); assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(wal)); // flush region2, and all logs should be archived. addEdits(wal, hri2, table2, 2, scopes2); flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getColumnFamilyNames()); - wal.rollWriter(); + wal.rollWriter(false); assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(wal)); } finally { if (wals != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java index 8189cef..3b410ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSyncReplicationWALProvider.java @@ -124,7 +124,7 @@ public class TestSyncReplicationWALProvider { ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row, timestamp); } - wal.rollWriter(); + wal.rollWriter(false); DistributedFileSystem dfs = (DistributedFileSystem) UTIL.getDFSCluster().getFileSystem(); UTIL.waitFor(5000, new ExplainingPredicate() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 216407a..d7ec621 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -679,6 +679,7 @@ public class TestWALFactory { @Test public void testWALProviders() throws IOException { Configuration conf = new Configuration(); + conf.set(HConstants.HBASE_DIR, TestWALFactory.conf.get(HConstants.HBASE_DIR)); // if providers are not set but enable SyncReplicationWALProvider by default for master node // with not only system tables WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); @@ -695,28 +696,30 @@ public class TestWALFactory { @Test public void testOnlySetWALProvider() throws IOException { Configuration conf = new Configuration(); - conf.set(WAL_PROVIDER, WALFactory.Providers.multiwal.name()); + conf.set(WAL_PROVIDER, RegionGroupingProvider.class.getName()); + conf.set(HConstants.HBASE_DIR, TestWALFactory.conf.get(HConstants.HBASE_DIR)); WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); WALProvider wrappedWALProvider = ((SyncReplicationWALProvider) walFactory.getWALProvider()) .getWrappedProvider(); assertEquals(SyncReplicationWALProvider.class, walFactory.getWALProvider().getClass()); // class of WALProvider and metaWALProvider are the same when metaWALProvider is not set - assertEquals(WALFactory.Providers.multiwal.clazz, wrappedWALProvider.getClass()); - assertEquals(WALFactory.Providers.multiwal.clazz, walFactory.getMetaProvider().getClass()); + assertEquals(RegionGroupingProvider.class, wrappedWALProvider.getClass()); + assertEquals(RegionGroupingProvider.class, walFactory.getMetaProvider().getClass()); } @Test public void testOnlySetMetaWALProvider() throws IOException { Configuration conf = new Configuration(); - conf.set(META_WAL_PROVIDER, WALFactory.Providers.asyncfs.name()); + conf.set(META_WAL_PROVIDER, AsyncFSWALProvider.class.getName()); + conf.set(HConstants.HBASE_DIR, TestWALFactory.conf.get(HConstants.HBASE_DIR)); WALFactory walFactory = new WALFactory(conf, this.currentServername.toString()); WALProvider wrappedWALProvider = ((SyncReplicationWALProvider) walFactory.getWALProvider()) .getWrappedProvider(); assertEquals(SyncReplicationWALProvider.class, walFactory.getWALProvider().getClass()); - assertEquals(WALFactory.Providers.defaultProvider.clazz, wrappedWALProvider.getClass()); - assertEquals(WALFactory.Providers.asyncfs.clazz, walFactory.getMetaProvider().getClass()); + assertEquals(AsyncFSWALProvider.class, wrappedWALProvider.getClass()); + assertEquals(AsyncFSWALProvider.class, walFactory.getMetaProvider().getClass()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java index 40fad6a..b0bd9e5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java @@ -103,7 +103,7 @@ public class TestWALRootDir { log.sync(txid); assertEquals("Expect 1 log have been created", 1, getWALFiles(walFs, walRootDir).size()); - log.rollWriter(); + log.rollWriter(false); //Create 1 more WAL assertEquals(2, getWALFiles(walFs, new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME)).size()); @@ -112,7 +112,7 @@ public class TestWALRootDir { txid = log.append(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1), edit, true); log.sync(txid); - log.rollWriter(); + log.rollWriter(false); log.shutdown(); assertEquals("Expect 3 logs in WALs dir", 3, getWALFiles(walFs, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 0d5aa0d..8582424 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -420,7 +420,7 @@ public class TestWALSplit { FILENAME_BEING_SPLIT, conf); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); - WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); + FSHLogProvider.createWriter(conf, fs, p, true).close(); } private void useDifferentDFSClient() throws IOException { @@ -1148,7 +1148,7 @@ public class TestWALSplit { @Override protected Writer createWriter(Path logfile) throws IOException { - Writer writer = wals.createRecoveredEditsWriter(this.fs, logfile); + Writer writer = FSHLogProvider.createWriter(conf, this.fs, logfile, true); // After creating writer, simulate region's // replayRecoveredEditsIfAny() which gets SplitEditFiles of this // region and delete them, excluding files with '.temp' suffix. @@ -1207,7 +1207,7 @@ public class TestWALSplit { int seq = 0; int numRegionEventsAdded = 0; for (int i = 0; i < writers; i++) { - ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i)); + ws[i] = FSHLogProvider.createWriter(conf, fs, new Path(WALDIR, WAL_FILE_PREFIX + i), false); for (int j = 0; j < entries; j++) { int prefix = 0; for (String region : REGIONS) { @@ -1407,7 +1407,7 @@ public class TestWALSplit { private void injectEmptyFile(String suffix, boolean closeFile) throws IOException { Writer writer = - WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf); + FSHLogProvider.createWriter(conf, fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), false); if (closeFile) { writer.close(); }