diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java index fbe95f5..596f7ad 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java @@ -64,6 +64,9 @@ String LOW_REPLICA_ROLL_REQUESTED = "lowReplicaRollRequest"; String LOW_REPLICA_ROLL_REQUESTED_DESC = "How many times a log roll was requested due to too few DN's in the write pipeline."; + String SYNC_FAILED_ROLL_REQUESTED = "syncFailedRollRequest"; + String SYNC_FAILED_ROLL_REQUESTED_DESC = + "How many times a log roll was requested due to sync failed."; String WRITTEN_BYTES = "writtenBytes"; String WRITTEN_BYTES_DESC = "Size (in bytes) of the data written to the WAL."; @@ -96,6 +99,8 @@ void incrementLowReplicationLogRoll(); + void incrementSyncFailedLogRoll(); + long getSlowAppendCount(); void incrementWrittenBytes(long val); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java index 2f35d4c..7095f99 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java @@ -39,6 +39,7 @@ private final MutableFastCounter slowAppendCount; private final MutableFastCounter logRollRequested; private final MutableFastCounter lowReplicationLogRollRequested; + private final MutableFastCounter syncFailedLogRollRequested; private final MutableFastCounter writtenBytes; public MetricsWALSourceImpl() { @@ -62,6 +63,8 @@ this.getMetricsRegistry().newCounter(ROLL_REQUESTED, ROLL_REQUESTED_DESC, 0L); lowReplicationLogRollRequested = this.getMetricsRegistry() .newCounter(LOW_REPLICA_ROLL_REQUESTED, LOW_REPLICA_ROLL_REQUESTED_DESC, 0L); + syncFailedLogRollRequested = this.getMetricsRegistry() + .newCounter(SYNC_FAILED_ROLL_REQUESTED, SYNC_FAILED_ROLL_REQUESTED_DESC, 0L); writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, WRITTEN_BYTES_DESC, 0L); } @@ -101,6 +104,11 @@ } @Override + public void incrementSyncFailedLogRoll() { + syncFailedLogRollRequested.incr(); + } + + @Override public long getSlowAppendCount() { return slowAppendCount.value(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index d1645f8..48ffcb4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -43,12 +43,12 @@ * Create {@link FanOutOneBlockAsyncDFSOutput} for {@link DistributedFileSystem}, and a simple * implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}. */ - public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, + public static AsyncFSOutput createOutput(FileSystem fs, Path f, Path oldPath, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException, CommonFSUtils.StreamLacksCapabilityException { if (fs instanceof DistributedFileSystem) { - return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, + return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, oldPath, overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass); } final FSDataOutputStream out; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index ea9a0d8..131e119 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -18,11 +18,9 @@ package org.apache.hadoop.hbase.io.asyncfs; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; -import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE; @@ -44,6 +42,7 @@ import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; @@ -346,7 +345,7 @@ this.alloc = alloc; this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize()); this.state = State.STREAMING; - setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); + setupReceiver(conf.getInt(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, AbstractFSWAL.DEFAULT_WAL_SYNC_TIMEOUT_MS)); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 0e5cf81..8aada6e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -48,6 +48,7 @@ import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; @@ -682,7 +683,7 @@ DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); - int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); + int timeoutMs = conf.getInt(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, AbstractFSWAL.DEFAULT_WAL_SYNC_TIMEOUT_MS); ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); blockCopy.setNumBytes(locatedBlock.getBlockSize()); ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() @@ -740,7 +741,7 @@ } } - private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, + private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, Path oldPath, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { Configuration conf = dfs.getConf(); @@ -751,6 +752,24 @@ int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY; + if (oldPath != null) { + String oldPathStr = oldPath.toUri().getPath(); + long len = namenode.getFileInfo(oldPathStr).getLen(); + for(LocatedBlock block : namenode.getBlockLocations(oldPathStr, Math.max(0, len - 1), len) + .getLocatedBlocks()) { + for(DatanodeInfo dn : block.getLocations()) { + excludesNodes = ArrayUtils.add(excludesNodes, dn); + } + } + if (LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder("create new output because old wal sync failed, old path is: "); + sb.append(oldPathStr).append(", newPath excludesNodes are :"); + for(DatanodeInfo info : excludesNodes) { + sb.append(info.getInfoAddr()).append(";"); + } + LOG.debug(sb.toString()); + } + } for (int retry = 0;; retry++) { HdfsFileStatus stat; try { @@ -837,7 +856,7 @@ * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it * inside an {@link EventLoop}. */ - public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, + public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, Path oldPath, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { return new FileSystemLinkResolver() { @@ -845,7 +864,7 @@ @Override public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, UnresolvedLinkException { - return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, + return createOutput(dfs, p.toUri().getPath(), oldPath, overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 05a8fdf..6cb6df0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALClosedException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; @@ -57,7 +58,8 @@ private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class); private final ReentrantLock rollLock = new ReentrantLock(); private final AtomicBoolean rollLog = new AtomicBoolean(false); - private final ConcurrentHashMap walNeedsRoll = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> walNeedsRoll = + new ConcurrentHashMap>(); private final Server server; protected final RegionServerServices services; private volatile long lastrolltime = System.currentTimeMillis(); @@ -70,11 +72,15 @@ private volatile boolean running = true; public void addWAL(final WAL wal) { - if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) { + if (null == walNeedsRoll.putIfAbsent(wal, new Pair(Boolean.FALSE, Boolean.FALSE))) { wal.registerWALActionsListener(new WALActionsListener() { @Override - public void logRollRequested(boolean lowReplicas) { - walNeedsRoll.put(wal, Boolean.TRUE); + public void logRollRequested(boolean lowReplicas, boolean syncFaild) { + Pair walInfo = walNeedsRoll.get(wal); + walInfo.setFirst(Boolean.TRUE); + if (syncFaild) { + walInfo.setSecond(Boolean.TRUE); + } // TODO logs will contend with each other here, replace with e.g. DelayedQueue synchronized(rollLog) { rollLog.set(true); @@ -87,7 +93,7 @@ public void requestRollAll() { for (WAL wal : walNeedsRoll.keySet()) { - walNeedsRoll.put(wal, Boolean.TRUE); + walNeedsRoll.put(wal, new Pair(Boolean.TRUE, Boolean.FALSE)); } synchronized(rollLog) { rollLog.set(true); @@ -122,9 +128,9 @@ */ void checkLowReplication(long now) { try { - for (Entry entry : walNeedsRoll.entrySet()) { + for (Entry> entry : walNeedsRoll.entrySet()) { WAL wal = entry.getKey(); - boolean needRollAlready = entry.getValue(); + boolean needRollAlready = entry.getValue().getFirst(); if (needRollAlready || !(wal instanceof AbstractFSWAL)) { continue; } @@ -180,16 +186,18 @@ rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH try { this.lastrolltime = now; - for (Iterator> iter = walNeedsRoll.entrySet().iterator(); iter + for (Iterator>> iter = walNeedsRoll.entrySet().iterator(); iter .hasNext();) { - Entry entry = iter.next(); + Entry> entry = iter.next(); final WAL wal = entry.getKey(); + Pair walInfo = entry.getValue(); + boolean syncFailed = walInfo.getSecond().booleanValue(); // Force the roll if the logroll.period is elapsed or if a roll was requested. // The returned value is an array of actual region names. try { - final byte[][] regionsToFlush = - wal.rollWriter(periodic || entry.getValue().booleanValue()); - walNeedsRoll.put(wal, Boolean.FALSE); + final byte[][] regionsToFlush = wal.rollWriter(periodic || + walInfo.getFirst().booleanValue() || syncFailed, syncFailed); + walInfo.setFirst(Boolean.FALSE); if (regionsToFlush != null) { for (byte[] r : regionsToFlush) { scheduleFlush(r); @@ -247,8 +255,8 @@ * @return true if all WAL roll finished */ public boolean walRollFinished() { - for (boolean needRoll : walNeedsRoll.values()) { - if (needRoll) { + for (Pair walInfo : walNeedsRoll.values()) { + if (walInfo.getFirst().booleanValue()) { return false; } } @@ -271,7 +279,7 @@ } @VisibleForTesting - Map getWalNeedsRoll() { + Map> getWalNeedsRoll() { return this.walNeedsRoll; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 6c8cbfa..eb25aee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -122,8 +122,8 @@ private static final String ROLL_ON_SYNC_TIME_MS = "hbase.regionserver.hlog.roll.on.sync.ms"; protected static final int DEFAULT_ROLL_ON_SYNC_TIME_MS = 10000; // in ms - private static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.hlog.sync.timeout"; - private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min + public static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.hlog.sync.timeout"; + public static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 60 * 1000; // in ms, 5min /** * file system instance @@ -516,7 +516,7 @@ @Override public byte[][] rollWriter() throws FailedLogCloseException, IOException { - return rollWriter(false); + return rollWriter(false, false); } /** @@ -795,22 +795,25 @@ } @Override - public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + public byte[][] rollWriter(boolean force, boolean syncFailed) throws FailedLogCloseException, IOException { rollWriterLock.lock(); try { if (this.closed) { throw new WALClosedException("WAL has been closed"); } // Return if nothing to flush. - if (!force && this.writer != null && this.numEntries.get() <= 0) { + if (!force && !syncFailed && (this.writer != null && this.numEntries.get() <= 0)) { return null; } byte[][] regionsToFlush = null; try (TraceScope scope = TraceUtil.createTrace("FSHLog.rollWriter")) { Path oldPath = getOldPath(); Path newPath = getNewPath(); + if (syncFailed){ + LOG.info(oldPath + " rolled because of syncFailed!"); + } // Any exception from here on is catastrophic, non-recoverable so we currently abort. - W nextWriter = this.createWriterInstance(newPath); + W nextWriter = this.createWriterInstance(newPath, syncFailed ? oldPath : null); tellListenersAboutPreLogRoll(oldPath, newPath); // NewPath could be equal to oldPath if replaceWriter fails. newPath = replaceWriter(oldPath, newPath, nextWriter); @@ -841,11 +844,6 @@ /** @return the size of log files in use */ public long getLogFileSize() { return this.totalLogSize.get(); - } - - // public only until class moves to o.a.h.h.wal - public void requestLogRoll() { - requestLogRoll(false); } /** @@ -925,10 +923,11 @@ return cachedSyncFutures.get().reset(sequence); } - protected final void requestLogRoll(boolean tooFewReplicas) { + // public only until class moves to o.a.h.h.wal + public final void requestLogRoll(boolean tooFewReplicas, boolean syncFaild) { if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { - i.logRollRequested(tooFewReplicas); + i.logRollRequested(tooFewReplicas, syncFaild); } } } @@ -1083,7 +1082,7 @@ protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; - protected abstract W createWriterInstance(Path path) + protected abstract W createWriterInstance(Path path, Path oldPath) throws IOException, CommonFSUtils.StreamLacksCapabilityException; protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) @@ -1105,7 +1104,7 @@ try { lastTimeCheckLowReplication = now; if (doCheckLogLowReplication()) { - requestLogRoll(true); + requestLogRoll(true, false); } } finally { rollWriterLock.unlock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index ff2864d..7378993 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -154,8 +154,8 @@ return doCompress; } - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, - long blocksize) throws IOException, StreamLacksCapabilityException { + public void init(FileSystem fs, Path path, Path oldPath, Configuration conf, + boolean overwritable, long blocksize) throws IOException, StreamLacksCapabilityException { this.conf = conf; boolean doCompress = initializeCompressionContext(conf, path); this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); @@ -163,7 +163,7 @@ short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path)); - initOutput(fs, path, overwritable, bufferSize, replication, blocksize); + initOutput(fs, path, oldPath, overwritable, bufferSize, replication, blocksize); boolean doTagCompress = doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); @@ -239,8 +239,9 @@ } } - protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException, StreamLacksCapabilityException; + protected abstract void initOutput(FileSystem fs, Path path, Path oldPath, + boolean overwritable, int bufferSize, short replication, long blockSize) + throws IOException, StreamLacksCapabilityException; /** * return the file length after written. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 553ff3d..2f5def1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -317,7 +317,7 @@ highestUnsyncedTxid = highestSyncedTxid.get(); if (shouldRequestLogRoll) { // request a roll. - requestLogRoll(); + requestLogRoll(false, true); } } @@ -341,7 +341,7 @@ return; } rollRequested = true; - requestLogRoll(); + requestLogRoll(false, false); } private void sync(AsyncWriter writer) { @@ -659,14 +659,15 @@ } } - protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException { - return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize, + protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path, Path oldPath) + throws IOException { + return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, oldPath, false, this.blocksize, eventLoopGroup, channelClass); } @Override - protected AsyncWriter createWriterInstance(Path path) throws IOException { - return createAsyncWriter(fs, path); + protected AsyncWriter createWriterInstance(Path path, Path oldPath) throws IOException { + return createAsyncWriter(fs, path, oldPath); } private void waitForSafePoint() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 37c6f00..aecab2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -164,9 +164,9 @@ } @Override - protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, + protected void initOutput(FileSystem fs, Path path, Path oldPath, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException, StreamLacksCapabilityException { - this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, + this.output = AsyncFSOutputHelper.createOutput(fs, path, oldPath, overwritable, false, replication, blockSize, eventLoopGroup, channelClass); this.asyncOutputWrapper = new OutputStreamWrapper(output); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java index bf5b96d..38b7e48 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java @@ -68,8 +68,8 @@ } @Override - protected AsyncWriter createWriterInstance(Path path) throws IOException { - AsyncWriter localWriter = super.createWriterInstance(path); + protected AsyncWriter createWriterInstance(Path path, Path oldPath) throws IOException { + AsyncWriter localWriter = super.createWriterInstance(path, oldPath); // retry forever if we can not create the remote writer to prevent aborting the RS due to log // rolling error, unless the skipRemoteWal is set to true. // TODO: since for now we only have one thread doing log rolling, this may block the rolling for @@ -81,7 +81,7 @@ } AsyncWriter remoteWriter; try { - remoteWriter = createAsyncWriter(remoteFs, remoteWAL); + remoteWriter = createAsyncWriter(remoteFs, remoteWAL, null); } catch (IOException e) { LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 68cd338..4dc543b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -273,8 +273,8 @@ * @return Writer instance */ @Override - protected Writer createWriterInstance(final Path path) throws IOException { - Writer writer = FSHLogProvider.createWriter(conf, fs, path, false, this.blocksize); + protected Writer createWriterInstance(final Path path, final Path oldPath) throws IOException { + Writer writer = FSHLogProvider.createWriter(conf, fs, path, oldPath, false, this.blocksize); if (writer instanceof ProtobufLogWriter) { preemptiveSync((ProtobufLogWriter) writer); } @@ -601,14 +601,14 @@ syncCount += releaseSyncFutures(currentSequence, lastException); if (lastException != null) { wasRollRequested = true; - requestLogRoll(); + requestLogRoll(false, true); } else { wasRollRequested = checkLogRoll(); } } boolean doRequestRoll = postSync(System.nanoTime() - start, syncCount); if (!wasRollRequested && doRequestRoll) { - requestLogRoll(); + requestLogRoll(false, false); } } catch (InterruptedException e) { // Presume legit interrupt. @@ -635,7 +635,7 @@ rollWriterLock.unlock(); } if (lowReplication || (writer != null && writer.getLength() > logrollsize)) { - requestLogRoll(lowReplication); + requestLogRoll(lowReplication, false); return true; } return false; @@ -1023,7 +1023,7 @@ this.syncFuturesCount.get()); } catch (Exception e) { // Should NEVER get here. - requestLogRoll(); + requestLogRoll(false, false); this.exception = new DamagedWALException("Failed offering sync", e); } } @@ -1090,7 +1090,7 @@ String msg = "Append sequenceId=" + entry.getKey().getSequenceId() + ", requesting roll of WAL"; LOG.warn(msg, e); - requestLogRoll(); + requestLogRoll(false, false); throw new DamagedWALException(msg, e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java index 900e55f..523e619 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java @@ -73,10 +73,13 @@ } @Override - public void logRollRequested(boolean underReplicated) { + public void logRollRequested(boolean underReplicated, boolean syncFailed) { source.incrementLogRollRequested(); if (underReplicated) { source.incrementLowReplicationLogRoll(); } + if (syncFailed) { + source.incrementSyncFailedLogRoll(); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 5c8e0d2..d3f411c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -92,7 +92,7 @@ @SuppressWarnings("deprecation") @Override - protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, + protected void initOutput(FileSystem fs, Path path, Path oldPath, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException, StreamLacksCapabilityException { this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication, blockSize, false); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index 13ffac7..3508b87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -65,7 +65,7 @@ /** * A request was made that the WAL be rolled. */ - default void logRollRequested(boolean tooFewReplicas) {} + default void logRollRequested(boolean tooFewReplicas, boolean syncFaild) {} /** * The WAL is about to close. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 8741c1c..85060e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -236,7 +236,7 @@ */ @VisibleForTesting static void requestLogRoll(final WAL wal) { - ((AbstractFSWAL) wal).requestLogRoll(); + ((AbstractFSWAL) wal).requestLogRoll(false, false); } // should be package private; more visible for use in AbstractFSWAL diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 062b368..be7de0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -55,8 +55,8 @@ * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that * meet the needs of the given Writer implementation. */ - void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) - throws IOException, CommonFSUtils.StreamLacksCapabilityException; + void init(FileSystem fs, Path path, Path oldPath, Configuration c, boolean overwritable, + long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException; } private EventLoopGroup eventLoopGroup; @@ -84,17 +84,17 @@ * Public because of AsyncFSWAL. Should be package-private */ public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, - boolean overwritable, EventLoopGroup eventLoopGroup, + Path oldPath, boolean overwritable, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { - return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path), - eventLoopGroup, channelClass); + return createAsyncWriter(conf, fs, path, oldPath, overwritable, + WALUtil.getWALBlockSize(conf, fs, path), eventLoopGroup, channelClass); } /** * Public because of AsyncFSWAL. Should be package-private */ public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, - boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup, + Path oldPath, boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { // Configuration already does caching for the Class lookup. Class logWriterClass = conf.getClass( @@ -102,7 +102,7 @@ try { AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) .newInstance(eventLoopGroup, channelClass); - writer.init(fs, path, conf, overwritable, blocksize); + writer.init(fs, path, oldPath, conf, overwritable, blocksize); return writer; } catch (Exception e) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 75439fe..51d50f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -119,7 +119,7 @@ public byte[][] rollWriter() { if (!listeners.isEmpty()) { for (WALActionsListener listener : listeners) { - listener.logRollRequested(false); + listener.logRollRequested(false, false); } for (WALActionsListener listener : listeners) { try { @@ -140,7 +140,7 @@ } @Override - public byte[][] rollWriter(boolean force) { + public byte[][] rollWriter(boolean force, boolean syncFailed) { return rollWriter(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index 3b91c24..e90b4ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -48,8 +48,8 @@ * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that * meet the needs of the given Writer implementation. */ - void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) - throws IOException, CommonFSUtils.StreamLacksCapabilityException; + void init(FileSystem fs, Path path, Path oldPath, Configuration c, boolean overwritable, + long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException; } /** @@ -58,8 +58,8 @@ * for WAL it is false. Thus we can distinguish WAL and recovered edits by this. */ public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, - final boolean overwritable) throws IOException { - return createWriter(conf, fs, path, overwritable, + final Path oldPath, final boolean overwritable) throws IOException { + return createWriter(conf, fs, path, oldPath, overwritable, WALUtil.getWALBlockSize(conf, fs, path, overwritable)); } @@ -67,7 +67,7 @@ * Public because of FSHLog. Should be package-private */ public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, - final boolean overwritable, long blocksize) throws IOException { + final Path oldPath, final boolean overwritable, long blocksize) throws IOException { // Configuration already does caching for the Class lookup. Class logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class, @@ -76,7 +76,7 @@ try { writer = logWriterClass.getDeclaredConstructor().newInstance(); FileSystem rootFs = FileSystem.get(path.toUri(), conf); - writer.init(rootFs, path, conf, overwritable, blocksize); + writer.init(rootFs, path, oldPath, conf, overwritable, blocksize); return writer; } catch (Exception e) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index cf367cd..1956c13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -80,7 +80,8 @@ * can clean logs. Returns null if nothing to flush. Names are actual * region names as returned by {@link RegionInfo#getEncodedName()} */ - byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException; + byte[][] rollWriter(boolean force, boolean syncFailed) + throws FailedLogCloseException, IOException; /** * Stop accepting new writes. If we have unsynced writes still in buffer, sync them. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 8bde6d2..7790236 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -385,7 +385,7 @@ * @return A WAL writer. Close when done with it. */ public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, false); + return FSHLogProvider.createWriter(conf, fs, path, null, false); } /** @@ -396,7 +396,7 @@ @VisibleForTesting public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, true); + return FSHLogProvider.createWriter(conf, fs, path, null, true); } // These static methods are currently used where it's impractical to @@ -465,7 +465,7 @@ static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, final Configuration configuration) throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, true); + return FSHLogProvider.createWriter(configuration, fs, path, null, true); } /** @@ -477,7 +477,7 @@ public static Writer createWALWriter(final FileSystem fs, final Path path, final Configuration configuration) throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, false); + return FSHLogProvider.createWriter(configuration, fs, path, null, false); } public final WALProvider getWALProvider() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index dde020d..e2a22b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -393,7 +393,7 @@ assertFalse(cp.isPreWALRollCalled()); assertFalse(cp.isPostWALRollCalled()); - wal.rollWriter(true); + wal.rollWriter(true, false); assertTrue(cp.isPreWALRollCalled()); assertTrue(cp.isPostWALRollCalled()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 6be44e9..1cf901e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -87,6 +88,7 @@ @BeforeClass public static void setUp() throws Exception { TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); + TEST_UTIL.getConfiguration().setInt(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, READ_TIMEOUT_MS); TEST_UTIL.startMiniDFSCluster(3); FS = TEST_UTIL.getDFSCluster().getFileSystem(); EVENT_LOOP_GROUP = new NioEventLoopGroup(); @@ -135,8 +137,8 @@ public void test() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); writeAndVerify(FS, f, out); } @@ -144,8 +146,8 @@ public void testRecover() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); out.write(b, 0, b.length); @@ -173,8 +175,8 @@ public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. writeAndVerify(FS, f, out); @@ -188,7 +190,7 @@ Path f = new Path("/" + name.getMethodName() + "/test"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try { - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); fail("should fail with parent does not exist"); } catch (RemoteException e) { @@ -212,7 +214,7 @@ Path f = new Path("/test"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, - f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) { + f, null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); } finally { @@ -224,8 +226,8 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, null, + true, false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS); byte[] b = new byte[50 * 1024 * 1024]; ThreadLocalRandom.current().nextBytes(b); out.write(b); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index 406af17..661f99d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -60,7 +60,7 @@ FSUtils.StreamLacksCapabilityException { Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); - AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, + AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, null, false, true, fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index cf0ffa2..4190dc2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -213,7 +213,7 @@ private void test(Path file) throws IOException, InterruptedException, ExecutionException { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, - true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 1490653..6c1eb28 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -114,15 +114,15 @@ } @Override - public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { - byte[][] regions = super.rollWriter(force); + public byte[][] rollWriter(boolean force, boolean syncFailed) throws FailedLogCloseException, IOException { + byte[][] regions = super.rollWriter(force, syncFailed); rolls.getAndIncrement(); return regions; } @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); + protected Writer createWriterInstance(Path path, Path oldPath) throws IOException { + final Writer w = super.createWriterInstance(path, oldPath); return new Writer() { @Override public void close() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index a2664ce..67f5df6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1157,8 +1157,8 @@ } @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); + protected Writer createWriterInstance(Path newPath, Path oldPath) throws IOException { + final Writer w = super.createWriterInstance(newPath, oldPath); return new Writer() { @Override public void close() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index 0e7c019..a19a03c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -521,7 +521,7 @@ assertEquals(MutableSegment.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize()); // let WAL cleanOldLogs - assertNull(getWAL(desiredRegion).rollWriter(true)); + assertNull(getWAL(desiredRegion).rollWriter(true, false)); assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs); } finally { TEST_UTIL.shutdownMiniCluster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 0e20252..4ad0173 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -162,8 +162,8 @@ } @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); + protected Writer createWriterInstance(Path newPath, Path oldPath) throws IOException { + final Writer w = super.createWriterInstance(newPath, oldPath); return new Writer() { @Override public void close() throws IOException { @@ -353,8 +353,8 @@ } @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); + protected Writer createWriterInstance(Path path, Path oldPath) throws IOException { + final Writer w = super.createWriterInstance(path, oldPath); return new Writer() { @Override public void close() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index 68eebc1..e7dcf4c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -297,7 +297,7 @@ assertEquals(1, wal.getNumRolledLogFiles()); // flush the second region flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); - wal.rollWriter(true); + wal.rollWriter(true, false); // no wal should remain now. assertEquals(0, wal.getNumRolledLogFiles()); // add edits both to region 1 and region 2, and roll. @@ -315,7 +315,7 @@ // flush both regions flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); - wal.rollWriter(true); + wal.rollWriter(true, false); assertEquals(0, wal.getNumRolledLogFiles()); // Add an edit to region1, and roll the wal. addEdits(wal, hri1, t1, 2, mvcc, scopes1); @@ -339,12 +339,12 @@ HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); long filenum = System.currentTimeMillis(); Path path = wal.computeFilename(filenum); - wal.createWriterInstance(path); + wal.createWriterInstance(path, null); Path parent = path.getParent(); path = wal.computeFilename(filenum + 1); Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting"); FS.rename(parent, newPath); - wal.createWriterInstance(path); + wal.createWriterInstance(path, null); fail("It should fail to create the new WAL"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 4c19aa0..651f779 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -169,7 +169,7 @@ final WAL newLog = wals.getWAL(null); try { // Now roll the log before we write anything. - newLog.rollWriter(true); + newLog.rollWriter(true, false); } finally { wals.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index f2fd591..20a0cd7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -1230,7 +1230,7 @@ StreamLacksCapabilityException { fs.mkdirs(file.getParent()); ProtobufLogWriter writer = new ProtobufLogWriter(); - writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file)); + writer.init(fs, file, null, conf, true, WALUtil.getWALBlockSize(conf, fs, file)); for (FSWALEntry entry : entries) { writer.append(entry); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java index 8afae06..7f4aade 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java @@ -20,6 +20,10 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.client.RegionInfo; @@ -53,7 +57,7 @@ @Test public void testLogRollOnDatanodeDeath() throws IOException, InterruptedException { - dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 3, true, null, null); + dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 2, true, null, null); tableName = getName(); Table table = createTestTable(tableName); TEST_UTIL.waitUntilAllRegionsAssigned(table.getName()); @@ -67,5 +71,13 @@ TEST_UTIL.getDFSCluster().restartDataNode(dnProp); doPut(table, 2); assertEquals(numRolledLogFiles + 1, AsyncFSWALProvider.getNumRolledLogFiles(wal)); + + // Test HBASE-20902 + DatanodeInfo[] newDNs = wal.getPipeline(); + Set dns = new HashSet<>(); + Collections.addAll(dns, dnInfos); + Collections.addAll(dns, newDNs); + // when syncfailed choose diff dns + assertEquals(dns.size(), dnInfos.length + newDNs.length); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java index 7626dcf..fb0c720 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java @@ -61,6 +61,7 @@ @Override protected Writer createWriter(Path path) throws IOException { return new WriterOverAsyncWriter(AsyncFSWALProvider.createAsyncWriter( - TEST_UTIL.getConfiguration(), fs, path, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS)); + TEST_UTIL.getConfiguration(), fs, path, null, false, + EVENT_LOOP_GROUP.next(), CHANNEL_CLASS)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java index f73b4f1..a8ebdfb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java @@ -105,9 +105,9 @@ FileSystem fs = UTIL.getTestFileSystem(); Configuration conf = UTIL.getConfiguration(); try ( - AsyncWriter writer1 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path1, false, + AsyncWriter writer1 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path1, null, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); - AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, false, + AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, null, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); CombinedAsyncWriter writer = CombinedAsyncWriter.create(writer1, writer2)) { ProtobufLogTestHelper.doWrite(new WriterOverAsyncWriter(writer), withTrailer, tableName, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 3eed137..1d10d03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -167,7 +167,7 @@ LOG.info("Restarted datanodes"); try { - log.rollWriter(true); + log.rollWriter(true, false); } catch (FailedLogCloseException flce) { // Expected exception. We used to expect that there would be unsynced appends but this // not reliable now that sync plays a roll in wall rolling. The above puts also now call diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index e19361e..3640655 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -144,12 +144,16 @@ RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); final FSHLog log = (FSHLog) server.getWAL(region); final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); + final AtomicBoolean syncFailedHookCalled = new AtomicBoolean(false); log.registerWALActionsListener(new WALActionsListener() { @Override - public void logRollRequested(boolean lowReplication) { + public void logRollRequested(boolean lowReplication, boolean syncFailed) { if (lowReplication) { lowReplicationHookCalled.lazySet(true); + } + if (syncFailed) { + syncFailedHookCalled.lazySet(true); } } }); @@ -218,7 +222,7 @@ // Force roll writer. The new log file will have the default replications, // and the LowReplication Roller will be enabled. - log.rollWriter(true); + log.rollWriter(true, false); batchWriteAndWait(table, log, 13, true, 10000); replication = log.getLogReplication(); assertTrue("New log file should have the default replication instead of " + replication, @@ -308,7 +312,7 @@ writeData(table, 1005); // force a log roll to read back and verify previously written logs - log.rollWriter(true); + log.rollWriter(true, false); assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), preLogRolledCalled.size() >= 1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java index c0d3416..4042b41 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java @@ -41,8 +41,8 @@ public void testLogRollRequested() throws Exception { MetricsWALSource source = mock(MetricsWALSourceImpl.class); MetricsWAL metricsWAL = new MetricsWAL(source); - metricsWAL.logRollRequested(false); - metricsWAL.logRollRequested(true); + metricsWAL.logRollRequested(false, false); + metricsWAL.logRollRequested(true, false); // Log roll was requested twice verify(source, times(2)).incrementLogRollRequested(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java index d429a01..bc484fa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java @@ -36,6 +36,6 @@ @Override protected Writer createWriter(Path path) throws IOException { - return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false); + return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, null, false); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java index 62000b4..9ee576d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java @@ -112,7 +112,7 @@ } @Override - protected AsyncWriter createWriterInstance(Path path) throws IOException { + protected AsyncWriter createWriterInstance(Path path, Path oldPath) throws IOException { if (arrive != null) { arrive.countDown(); try { @@ -123,7 +123,7 @@ if (localBroken || remoteBroken) { throw new IOException("WAL broken"); } - return super.createWriterInstance(path); + return super.createWriterInstance(path, oldPath); } public void setLocalBroken() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java index 4effe41..f86b74b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java @@ -109,7 +109,7 @@ RegionInfo regionInfo = utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); - wal.rollWriter(true); + wal.rollWriter(true, false); } // ReplicationSource should advance past the empty wal, or else the test will fail diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java index d01a0ac..5e37c1b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java @@ -160,7 +160,8 @@ for (int i = 0; i < WAL_NUMBER; i++) { try (ProtobufLogWriter writer = new ProtobufLogWriter()) { Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep"); - writer.init(fs, wal, conf, true, WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir)); + writer.init(fs, wal, null, conf, true, + WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir)); List entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT); for (Entry entry : entries) { writer.append(entry); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index d062c77..869d9bd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -202,7 +202,7 @@ // creatWriterInstance is where the new pipeline is set up for doing file rolls // if we are skipping it, just keep returning the same writer. @Override - protected Writer createWriterInstance(final Path path) throws IOException { + protected Writer createWriterInstance(final Path path, final Path oldPath) throws IOException { // we get called from the FSHLog constructor (!); always roll in this case since // we don't know yet if we're supposed to generally roll and // we need an initial file in the case of doing appends but no rolls. @@ -210,7 +210,7 @@ LOG.info("creating new writer instance."); final ProtobufLogWriter writer = new IOTestWriter(); try { - writer.init(fs, path, conf, false, this.blocksize); + writer.init(fs, path, oldPath, conf, false, this.blocksize); } catch (CommonFSUtils.StreamLacksCapabilityException exception) { throw new IOException("Can't create writer instance because underlying FileSystem " + "doesn't support needed stream capabilities.", exception); @@ -237,8 +237,9 @@ private boolean doSyncs; @Override - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, - long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException { + public void init(FileSystem fs, Path path, Path oldPath, Configuration conf, + boolean overwritable, long blocksize) + throws IOException, CommonFSUtils.StreamLacksCapabilityException { Collection operations = conf.getStringCollection(ALLOWED_OPERATIONS); if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) { doAppends = doSyncs = true; @@ -250,7 +251,7 @@ } LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") + " and syncs " + (doSyncs ? "enabled" : "disabled")); - super.init(fs, path, conf, overwritable, blocksize); + super.init(fs, path, oldPath, conf, overwritable, blocksize); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 8fbe09d..8bf1c4d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -212,7 +212,7 @@ walKey.getWriteEntry(); } log.sync(); - log.rollWriter(true); + log.rollWriter(true, false); } } wals.shutdown();