diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java index fbe95f5..596f7ad 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSource.java @@ -64,6 +64,9 @@ String LOW_REPLICA_ROLL_REQUESTED = "lowReplicaRollRequest"; String LOW_REPLICA_ROLL_REQUESTED_DESC = "How many times a log roll was requested due to too few DN's in the write pipeline."; + String SYNC_FAILED_ROLL_REQUESTED = "syncFailedRollRequest"; + String SYNC_FAILED_ROLL_REQUESTED_DESC = + "How many times a log roll was requested due to sync failed."; String WRITTEN_BYTES = "writtenBytes"; String WRITTEN_BYTES_DESC = "Size (in bytes) of the data written to the WAL."; @@ -96,6 +99,8 @@ void incrementLowReplicationLogRoll(); + void incrementSyncFailedLogRoll(); + long getSlowAppendCount(); void incrementWrittenBytes(long val); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java index 2f35d4c..7095f99 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWALSourceImpl.java @@ -39,6 +39,7 @@ private final MutableFastCounter slowAppendCount; private final MutableFastCounter logRollRequested; private final MutableFastCounter lowReplicationLogRollRequested; + private final MutableFastCounter syncFailedLogRollRequested; private final MutableFastCounter writtenBytes; public MetricsWALSourceImpl() { @@ -62,6 +63,8 @@ this.getMetricsRegistry().newCounter(ROLL_REQUESTED, ROLL_REQUESTED_DESC, 0L); lowReplicationLogRollRequested = this.getMetricsRegistry() .newCounter(LOW_REPLICA_ROLL_REQUESTED, LOW_REPLICA_ROLL_REQUESTED_DESC, 0L); + syncFailedLogRollRequested = this.getMetricsRegistry() + .newCounter(SYNC_FAILED_ROLL_REQUESTED, SYNC_FAILED_ROLL_REQUESTED_DESC, 0L); writtenBytes = this.getMetricsRegistry().newCounter(WRITTEN_BYTES, WRITTEN_BYTES_DESC, 0L); } @@ -101,6 +104,11 @@ } @Override + public void incrementSyncFailedLogRoll() { + syncFailedLogRollRequested.incr(); + } + + @Override public long getSlowAppendCount() { return slowAppendCount.value(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index f9d04b9..2f5e053 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -43,12 +43,12 @@ * Create {@link FanOutOneBlockAsyncDFSOutput} for {@link DistributedFileSystem}, and a simple * implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}. */ - public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, + public static AsyncFSOutput createOutput(FileSystem fs, Path f, Path oldPath, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException, CommonFSUtils.StreamLacksCapabilityException { if (fs instanceof DistributedFileSystem) { - return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, + return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, oldPath, overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass); } final FSDataOutputStream out; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 7ffd3da..d203686 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -44,6 +44,7 @@ import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; @@ -346,7 +347,7 @@ this.alloc = alloc; this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize()); this.state = State.STREAMING; - setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); + setupReceiver(conf.getInt(AbstractFSWAL.WAL_SYNC_TIMEOUT, AbstractFSWAL.DEFAULT_WAL_SYNC_TIMEOUT_MS)); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 0e5cf81..29085d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -741,7 +741,7 @@ } private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, - boolean overwrite, boolean createParent, short replication, long blockSize, + Path oldPath, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { Configuration conf = dfs.getConf(); FSUtils fsUtils = FSUtils.getInstance(dfs, conf); @@ -751,6 +751,13 @@ int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES); DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY; + if (oldPath != null) { //sync failed has happend + for(LocatedBlock block : namenode.getBlockLocations(oldPath.toString(), 0, 0).getLocatedBlocks()) { + for(DatanodeInfo dn : block.getLocations()) { + excludesNodes = ArrayUtils.add(excludesNodes, dn); + } + } + } for (int retry = 0;; retry++) { HdfsFileStatus stat; try { @@ -838,14 +845,14 @@ * inside an {@link EventLoop}. */ public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, - boolean overwrite, boolean createParent, short replication, long blockSize, + Path oldPath, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { return new FileSystemLinkResolver() { @Override public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, UnresolvedLinkException { - return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, + return createOutput(dfs, p.toUri().getPath(), oldPath, overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 05a8fdf..2c2884c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALClosedException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; @@ -57,7 +58,8 @@ private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class); private final ReentrantLock rollLock = new ReentrantLock(); private final AtomicBoolean rollLog = new AtomicBoolean(false); - private final ConcurrentHashMap walNeedsRoll = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> walNeedsRoll = + new ConcurrentHashMap>(); private final Server server; protected final RegionServerServices services; private volatile long lastrolltime = System.currentTimeMillis(); @@ -70,11 +72,16 @@ private volatile boolean running = true; public void addWAL(final WAL wal) { - if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) { + if (null == walNeedsRoll.putIfAbsent(wal, new Pair(Boolean.FALSE, + Boolean.FALSE))) { wal.registerWALActionsListener(new WALActionsListener() { @Override - public void logRollRequested(boolean lowReplicas) { - walNeedsRoll.put(wal, Boolean.TRUE); + public void logRollRequested(boolean lowReplicas, boolean syncFaild) { + Pair walInfo = walNeedsRoll.get(wal); + walInfo.setFirst(Boolean.TRUE); + if (syncFaild) { + walInfo.setSecond(Boolean.TRUE); + } // TODO logs will contend with each other here, replace with e.g. DelayedQueue synchronized(rollLog) { rollLog.set(true); @@ -87,7 +94,7 @@ public void requestRollAll() { for (WAL wal : walNeedsRoll.keySet()) { - walNeedsRoll.put(wal, Boolean.TRUE); + walNeedsRoll.put(wal, new Pair(Boolean.TRUE, Boolean.FALSE)); } synchronized(rollLog) { rollLog.set(true); @@ -122,9 +129,9 @@ */ void checkLowReplication(long now) { try { - for (Entry entry : walNeedsRoll.entrySet()) { + for (Entry> entry : walNeedsRoll.entrySet()) { WAL wal = entry.getKey(); - boolean needRollAlready = entry.getValue(); + boolean needRollAlready = entry.getValue().getFirst(); if (needRollAlready || !(wal instanceof AbstractFSWAL)) { continue; } @@ -180,16 +187,18 @@ rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH try { this.lastrolltime = now; - for (Iterator> iter = walNeedsRoll.entrySet().iterator(); iter - .hasNext();) { - Entry entry = iter.next(); + for (Iterator>> iter = + walNeedsRoll.entrySet().iterator(); iter.hasNext();) { + Entry> entry = iter.next(); final WAL wal = entry.getKey(); // Force the roll if the logroll.period is elapsed or if a roll was requested. // The returned value is an array of actual region names. + Pair walInfo = entry.getValue(); + boolean syncFailed = walInfo.getSecond().booleanValue(); try { - final byte[][] regionsToFlush = - wal.rollWriter(periodic || entry.getValue().booleanValue()); - walNeedsRoll.put(wal, Boolean.FALSE); + final byte[][] regionsToFlush = wal.rollWriter(periodic || + walInfo.getFirst().booleanValue() || syncFailed, syncFailed); + walInfo.setFirst(Boolean.FALSE); if (regionsToFlush != null) { for (byte[] r : regionsToFlush) { scheduleFlush(r); @@ -247,8 +256,8 @@ * @return true if all WAL roll finished */ public boolean walRollFinished() { - for (boolean needRoll : walNeedsRoll.values()) { - if (needRoll) { + for (Pair walInfo : walNeedsRoll.values()) { + if (walInfo.getFirst().booleanValue()) { return false; } } @@ -271,7 +280,7 @@ } @VisibleForTesting - Map getWalNeedsRoll() { + Map> getWalNeedsRoll() { return this.walNeedsRoll; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 2b45a04..c06a505 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -117,7 +117,8 @@ protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms - private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min + public static final String WAL_SYNC_TIMEOUT = "hbase.regionserver.hlog.sync.timeout"; + public static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 10 * 1000; // 10s /** * file system instance @@ -428,7 +429,7 @@ this.slowSyncNs = TimeUnit.MILLISECONDS .toNanos(conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS)); this.walSyncTimeoutNs = TimeUnit.MILLISECONDS - .toNanos(conf.getLong("hbase.regionserver.hlog.sync.timeout", DEFAULT_WAL_SYNC_TIMEOUT_MS)); + .toNanos(conf.getLong(WAL_SYNC_TIMEOUT, DEFAULT_WAL_SYNC_TIMEOUT_MS)); int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); // Presize our map of SyncFutures by handler objects. this.syncFuturesByHandler = new ConcurrentHashMap<>(maxHandlersCount); @@ -498,7 +499,7 @@ @Override public byte[][] rollWriter() throws FailedLogCloseException, IOException { - return rollWriter(false); + return rollWriter(false, false); } /** @@ -752,14 +753,15 @@ } @Override - public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + public byte[][] rollWriter(boolean force, boolean syncFailed) + throws FailedLogCloseException, IOException { rollWriterLock.lock(); try { if (this.closed) { throw new WALClosedException("WAL has been closed"); } // Return if nothing to flush. - if (!force && this.writer != null && this.numEntries.get() <= 0) { + if (!force && !syncFailed && this.writer != null && this.numEntries.get() <= 0) { return null; } byte[][] regionsToFlush = null; @@ -767,7 +769,7 @@ Path oldPath = getOldPath(); Path newPath = getNewPath(); // Any exception from here on is catastrophic, non-recoverable so we currently abort. - W nextWriter = this.createWriterInstance(newPath); + W nextWriter = this.createWriterInstance(newPath, syncFailed ? oldPath : null); tellListenersAboutPreLogRoll(oldPath, newPath); // NewPath could be equal to oldPath if replaceWriter fails. newPath = replaceWriter(oldPath, newPath, nextWriter); @@ -798,11 +800,6 @@ /** @return the size of log files in use */ public long getLogFileSize() { return this.totalLogSize.get(); - } - - // public only until class moves to o.a.h.h.wal - public void requestLogRoll() { - requestLogRoll(false); } /** @@ -884,10 +881,11 @@ .reset(sequence); } - protected final void requestLogRoll(boolean tooFewReplicas) { + // public only until class moves to o.a.h.h.wal + public final void requestLogRoll(boolean tooFewReplicas, boolean syncFaild) { if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { - i.logRollRequested(tooFewReplicas); + i.logRollRequested(tooFewReplicas, syncFaild); } } } @@ -1037,7 +1035,7 @@ protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; - protected abstract W createWriterInstance(Path path) + protected abstract W createWriterInstance(Path path, Path oldPath) throws IOException, CommonFSUtils.StreamLacksCapabilityException; protected abstract void doReplaceWriter(Path oldPath, Path newPath, W nextWriter) @@ -1059,7 +1057,7 @@ try { lastTimeCheckLowReplication = now; if (doCheckLogLowReplication()) { - requestLogRoll(true); + requestLogRoll(true, false); } } finally { rollWriterLock.unlock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index ae084a4..3ff4f0d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -153,8 +153,8 @@ return doCompress; } - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, - long blocksize) throws IOException, StreamLacksCapabilityException { + public void init(FileSystem fs, Path path, Path oldPath, Configuration conf, + boolean overwritable, long blocksize) throws IOException, StreamLacksCapabilityException { this.conf = conf; boolean doCompress = initializeCompressionContext(conf, path); this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); @@ -162,7 +162,7 @@ short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path)); - initOutput(fs, path, overwritable, bufferSize, replication, blocksize); + initOutput(fs, path, oldPath, overwritable, bufferSize, replication, blocksize); boolean doTagCompress = doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); @@ -238,8 +238,9 @@ } } - protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException, StreamLacksCapabilityException; + protected abstract void initOutput(FileSystem fs, Path path, Path oldPath, boolean overwritable, + int bufferSize, short replication, long blockSize) + throws IOException, StreamLacksCapabilityException; /** * return the file length after written. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index b0c2549..edadb18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -315,7 +315,7 @@ highestUnsyncedTxid = highestSyncedTxid.get(); if (shouldRequestLogRoll) { // request a roll. - requestLogRoll(); + requestLogRoll(false, true); } } @@ -338,7 +338,7 @@ return; } rollRequested = true; - requestLogRoll(); + requestLogRoll(false, false); } private void sync(AsyncWriter writer) { @@ -656,14 +656,15 @@ } } - protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path) throws IOException { - return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize, + protected final AsyncWriter createAsyncWriter(FileSystem fs, Path path, Path oldPath) + throws IOException { + return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, oldPath, false, this.blocksize, eventLoopGroup, channelClass); } @Override - protected AsyncWriter createWriterInstance(Path path) throws IOException { - return createAsyncWriter(fs, path); + protected AsyncWriter createWriterInstance(Path path, Path oldPath) throws IOException { + return createAsyncWriter(fs, path, oldPath); } private void waitForSafePoint() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 6368fb7..29dfdb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -162,10 +162,11 @@ } @Override - protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException, StreamLacksCapabilityException { - this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, - blockSize, eventLoopGroup, channelClass); + protected void initOutput(FileSystem fs, Path path, Path oldPath, boolean overwritable, + int bufferSize, short replication, long blockSize) + throws IOException, StreamLacksCapabilityException { + this.output = AsyncFSOutputHelper.createOutput(fs, path, oldPath, overwritable, false, + replication, blockSize, eventLoopGroup, channelClass); this.asyncOutputWrapper = new OutputStreamWrapper(output); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java index bf5b96d..6952200 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java @@ -68,20 +68,20 @@ } @Override - protected AsyncWriter createWriterInstance(Path path) throws IOException { - AsyncWriter localWriter = super.createWriterInstance(path); + protected AsyncWriter createWriterInstance(Path newPath, Path oldPath) throws IOException { + AsyncWriter localWriter = super.createWriterInstance(newPath, oldPath); // retry forever if we can not create the remote writer to prevent aborting the RS due to log // rolling error, unless the skipRemoteWal is set to true. // TODO: since for now we only have one thread doing log rolling, this may block the rolling for // other wals - Path remoteWAL = new Path(remoteWALDir, path.getName()); + Path remoteWAL = new Path(remoteWALDir, newPath.getName()); for (int retry = 0;; retry++) { if (skipRemoteWAL) { return localWriter; } AsyncWriter remoteWriter; try { - remoteWriter = createAsyncWriter(remoteFs, remoteWAL); + remoteWriter = createAsyncWriter(remoteFs, remoteWAL, oldPath); } catch (IOException e) { LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e); try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index baa87a4..4bbdcab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -270,8 +270,8 @@ * @return Writer instance */ @Override - protected Writer createWriterInstance(final Path path) throws IOException { - Writer writer = FSHLogProvider.createWriter(conf, fs, path, false, this.blocksize); + protected Writer createWriterInstance(final Path path, final Path oldPath) throws IOException { + Writer writer = FSHLogProvider.createWriter(conf, fs, path, oldPath, false, this.blocksize); if (writer instanceof ProtobufLogWriter) { preemptiveSync((ProtobufLogWriter) writer); } @@ -596,7 +596,7 @@ // Can we release other syncs? syncCount += releaseSyncFutures(currentSequence, lastException); if (lastException != null) { - requestLogRoll(); + requestLogRoll(false, true); } else { checkLogRoll(); } @@ -627,7 +627,7 @@ rollWriterLock.unlock(); } if (lowReplication || (writer != null && writer.getLength() > logrollsize)) { - requestLogRoll(lowReplication); + requestLogRoll(lowReplication, false); } } @@ -1013,7 +1013,7 @@ this.syncFuturesCount.get()); } catch (Exception e) { // Should NEVER get here. - requestLogRoll(); + requestLogRoll(false, false); this.exception = new DamagedWALException("Failed offering sync", e); } } @@ -1080,7 +1080,7 @@ String msg = "Append sequenceId=" + entry.getKey().getSequenceId() + ", requesting roll of WAL"; LOG.warn(msg, e); - requestLogRoll(); + requestLogRoll(false, false); throw new DamagedWALException(msg, e); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java index 900e55f..523e619 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java @@ -73,10 +73,13 @@ } @Override - public void logRollRequested(boolean underReplicated) { + public void logRollRequested(boolean underReplicated, boolean syncFailed) { source.incrementLogRollRequested(); if (underReplicated) { source.incrementLowReplicationLogRoll(); } + if (syncFailed) { + source.incrementSyncFailedLogRoll(); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index b4e2cbf..cefcc51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -88,8 +88,9 @@ @SuppressWarnings("deprecation") @Override - protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize) throws IOException, StreamLacksCapabilityException { + protected void initOutput(FileSystem fs, Path path, Path oldPath, boolean overwritable, + int bufferSize, short replication, long blockSize) + throws IOException, StreamLacksCapabilityException { this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null); if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index 13ffac7..3508b87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -65,7 +65,7 @@ /** * A request was made that the WAL be rolled. */ - default void logRollRequested(boolean tooFewReplicas) {} + default void logRollRequested(boolean tooFewReplicas, boolean syncFaild) {} /** * The WAL is about to close. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index ccdc95f..e36e48a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -222,7 +222,7 @@ */ @VisibleForTesting static void requestLogRoll(final WAL wal) { - ((AbstractFSWAL) wal).requestLogRoll(); + ((AbstractFSWAL) wal).requestLogRoll(false, false); } // should be package private; more visible for use in AbstractFSWAL diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 062b368..be7de0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -55,8 +55,8 @@ * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that * meet the needs of the given Writer implementation. */ - void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) - throws IOException, CommonFSUtils.StreamLacksCapabilityException; + void init(FileSystem fs, Path path, Path oldPath, Configuration c, boolean overwritable, + long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException; } private EventLoopGroup eventLoopGroup; @@ -84,17 +84,17 @@ * Public because of AsyncFSWAL. Should be package-private */ public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, - boolean overwritable, EventLoopGroup eventLoopGroup, + Path oldPath, boolean overwritable, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { - return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path), - eventLoopGroup, channelClass); + return createAsyncWriter(conf, fs, path, oldPath, overwritable, + WALUtil.getWALBlockSize(conf, fs, path), eventLoopGroup, channelClass); } /** * Public because of AsyncFSWAL. Should be package-private */ public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, - boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup, + Path oldPath, boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup, Class channelClass) throws IOException { // Configuration already does caching for the Class lookup. Class logWriterClass = conf.getClass( @@ -102,7 +102,7 @@ try { AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) .newInstance(eventLoopGroup, channelClass); - writer.init(fs, path, conf, overwritable, blocksize); + writer.init(fs, path, oldPath, conf, overwritable, blocksize); return writer; } catch (Exception e) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 75439fe..51d50f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -119,7 +119,7 @@ public byte[][] rollWriter() { if (!listeners.isEmpty()) { for (WALActionsListener listener : listeners) { - listener.logRollRequested(false); + listener.logRollRequested(false, false); } for (WALActionsListener listener : listeners) { try { @@ -140,7 +140,7 @@ } @Override - public byte[][] rollWriter(boolean force) { + public byte[][] rollWriter(boolean force, boolean syncFailed) { return rollWriter(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index 44f692d..3803466 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -49,23 +49,24 @@ * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that * meet the needs of the given Writer implementation. */ - void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize) - throws IOException, CommonFSUtils.StreamLacksCapabilityException; + void init(FileSystem fs, Path path, Path oldPath, Configuration c, boolean overwritable, + long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException; } /** * Public because of FSHLog. Should be package-private */ public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, - final boolean overwritable) throws IOException { - return createWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path)); + final Path oldPath, final boolean overwritable) throws IOException { + return createWriter(conf, fs, path, oldPath, overwritable, + WALUtil.getWALBlockSize(conf, fs, path)); } /** * Public because of FSHLog. Should be package-private */ public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, - final boolean overwritable, long blocksize) throws IOException { + final Path oldPath, final boolean overwritable, long blocksize) throws IOException { // Configuration already does caching for the Class lookup. Class logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class, @@ -74,7 +75,7 @@ try { writer = logWriterClass.getDeclaredConstructor().newInstance(); FileSystem rootFs = FileSystem.get(path.toUri(), conf); - writer.init(rootFs, path, conf, overwritable, blocksize); + writer.init(rootFs, path, oldPath, conf, overwritable, blocksize); return writer; } catch (Exception e) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index cf367cd..1956c13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -80,7 +80,8 @@ * can clean logs. Returns null if nothing to flush. Names are actual * region names as returned by {@link RegionInfo#getEncodedName()} */ - byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException; + byte[][] rollWriter(boolean force, boolean syncFailed) + throws FailedLogCloseException, IOException; /** * Stop accepting new writes. If we have unsynced writes still in buffer, sync them. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 8eb1690..51864d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -364,7 +364,7 @@ * @return A WAL writer. Close when done with it. */ public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, false); + return FSHLogProvider.createWriter(conf, fs, path, null, false); } /** @@ -375,7 +375,7 @@ @VisibleForTesting public Writer createRecoveredEditsWriter(final FileSystem fs, final Path path) throws IOException { - return FSHLogProvider.createWriter(conf, fs, path, true); + return FSHLogProvider.createWriter(conf, fs, path, null, true); } // These static methods are currently used where it's impractical to @@ -444,7 +444,7 @@ static Writer createRecoveredEditsWriter(final FileSystem fs, final Path path, final Configuration configuration) throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, true); + return FSHLogProvider.createWriter(configuration, fs, path, null, true); } /** @@ -456,7 +456,7 @@ public static Writer createWALWriter(final FileSystem fs, final Path path, final Configuration configuration) throws IOException { - return FSHLogProvider.createWriter(configuration, fs, path, false); + return FSHLogProvider.createWriter(configuration, fs, path, null, false); } public final WALProvider getWALProvider() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index dde020d..e2a22b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -393,7 +393,7 @@ assertFalse(cp.isPreWALRollCalled()); assertFalse(cp.isPostWALRollCalled()); - wal.rollWriter(true); + wal.rollWriter(true, false); assertTrue(cp.isPreWALRollCalled()); assertTrue(cp.isPostWALRollCalled()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 6be44e9..2cfb856 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -135,8 +135,8 @@ public void test() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, null, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); writeAndVerify(FS, f, out); } @@ -144,8 +144,8 @@ public void testRecover() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, null, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); out.write(b, 0, b.length); @@ -173,8 +173,8 @@ public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, null, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. writeAndVerify(FS, f, out); @@ -188,7 +188,7 @@ Path f = new Path("/" + name.getMethodName() + "/test"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try { - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); fail("should fail with parent does not exist"); } catch (RemoteException e) { @@ -212,7 +212,7 @@ Path f = new Path("/test"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, - f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) { + f, null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); } finally { @@ -224,8 +224,8 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS); + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, null, + true, false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS); byte[] b = new byte[50 * 1024 * 1024]; ThreadLocalRandom.current().nextBytes(b); out.write(b); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index 406af17..661f99d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -60,7 +60,7 @@ FSUtils.StreamLacksCapabilityException { Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); - AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, + AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, null, false, true, fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index 09b1d56..a9faa9b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -247,7 +247,7 @@ private void test(Path file) throws IOException, InterruptedException, ExecutionException { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, - true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); + null, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index 1490653..77ba5f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -114,15 +114,15 @@ } @Override - public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { - byte[][] regions = super.rollWriter(force); + public byte[][] rollWriter(boolean force, boolean syncFailed) throws FailedLogCloseException, IOException { + byte[][] regions = super.rollWriter(force, syncFailed); rolls.getAndIncrement(); return regions; } @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); + protected Writer createWriterInstance(Path newPath, Path oldPath) throws IOException { + final Writer w = super.createWriterInstance(newPath, oldPath); return new Writer() { @Override public void close() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index da07c7b..ccb8549 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -1124,8 +1124,8 @@ } @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); + protected Writer createWriterInstance(Path newPath, Path oldPath) throws IOException { + final Writer w = super.createWriterInstance(newPath, oldPath); return new Writer() { @Override public void close() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index 9bbce09..88b694e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -528,7 +528,7 @@ assertEquals(MutableSegment.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize()); // let WAL cleanOldLogs - assertNull(getWAL(desiredRegion).rollWriter(true)); + assertNull(getWAL(desiredRegion).rollWriter(true, false)); assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs); } finally { TEST_UTIL.shutdownMiniCluster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 84b8d6c..f1e51b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -163,8 +163,8 @@ } @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); + protected Writer createWriterInstance(Path newPath, Path oldPath) throws IOException { + final Writer w = super.createWriterInstance(newPath, oldPath); return new Writer() { @Override public void close() throws IOException { @@ -354,8 +354,8 @@ } @Override - protected Writer createWriterInstance(Path path) throws IOException { - final Writer w = super.createWriterInstance(path); + protected Writer createWriterInstance(Path newPath, Path oldPath) throws IOException { + final Writer w = super.createWriterInstance(newPath, oldPath); return new Writer() { @Override public void close() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index 3e0bc55..12943fa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -295,7 +295,7 @@ assertEquals(1, wal.getNumRolledLogFiles()); // flush the second region flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); - wal.rollWriter(true); + wal.rollWriter(true, false); // no wal should remain now. assertEquals(0, wal.getNumRolledLogFiles()); // add edits both to region 1 and region 2, and roll. @@ -313,7 +313,7 @@ // flush both regions flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames()); - wal.rollWriter(true); + wal.rollWriter(true, false); assertEquals(0, wal.getNumRolledLogFiles()); // Add an edit to region1, and roll the wal. addEdits(wal, hri1, t1, 2, mvcc, scopes1); @@ -337,12 +337,12 @@ HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); long filenum = System.currentTimeMillis(); Path path = wal.computeFilename(filenum); - wal.createWriterInstance(path); + wal.createWriterInstance(path, null); Path parent = path.getParent(); path = wal.computeFilename(filenum + 1); Path newPath = new Path(parent.getParent(), parent.getName() + "-splitting"); FS.rename(parent, newPath); - wal.createWriterInstance(path); + wal.createWriterInstance(path, null); fail("It should fail to create the new WAL"); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 610af61..dc86a0d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -167,7 +167,7 @@ final WAL newLog = wals.getWAL(null); try { // Now roll the log before we write anything. - newLog.rollWriter(true); + newLog.rollWriter(true, false); } finally { wals.close(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 93c379c..c495bb0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -1227,7 +1227,7 @@ StreamLacksCapabilityException { fs.mkdirs(file.getParent()); ProtobufLogWriter writer = new ProtobufLogWriter(); - writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file)); + writer.init(fs, file, null, conf, true, WALUtil.getWALBlockSize(conf, fs, file)); for (FSWALEntry entry : entries) { writer.append(entry); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java index 7626dcf..29aa123 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java @@ -61,6 +61,6 @@ @Override protected Writer createWriter(Path path) throws IOException { return new WriterOverAsyncWriter(AsyncFSWALProvider.createAsyncWriter( - TEST_UTIL.getConfiguration(), fs, path, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS)); + TEST_UTIL.getConfiguration(), fs, path, null, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java index f73b4f1..a8ebdfb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java @@ -105,9 +105,9 @@ FileSystem fs = UTIL.getTestFileSystem(); Configuration conf = UTIL.getConfiguration(); try ( - AsyncWriter writer1 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path1, false, + AsyncWriter writer1 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path1, null, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); - AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, false, + AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, null, false, EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); CombinedAsyncWriter writer = CombinedAsyncWriter.create(writer1, writer2)) { ProtobufLogTestHelper.doWrite(new WriterOverAsyncWriter(writer), withTrailer, tableName, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 3eed137..1d10d03 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -167,7 +167,7 @@ LOG.info("Restarted datanodes"); try { - log.rollWriter(true); + log.rollWriter(true, false); } catch (FailedLogCloseException flce) { // Expected exception. We used to expect that there would be unsynced appends but this // not reliable now that sync plays a roll in wall rolling. The above puts also now call diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index f3cf2bf..d59b15b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -145,12 +145,16 @@ RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); final FSHLog log = (FSHLog) server.getWAL(region); final AtomicBoolean lowReplicationHookCalled = new AtomicBoolean(false); + final AtomicBoolean syncFailedHookCalled = new AtomicBoolean(false); log.registerWALActionsListener(new WALActionsListener() { @Override - public void logRollRequested(boolean lowReplication) { + public void logRollRequested(boolean lowReplication, boolean syncFailed) { if (lowReplication) { lowReplicationHookCalled.lazySet(true); + } + if (syncFailed) { + syncFailedHookCalled.lazySet(true); } } }); @@ -219,7 +223,7 @@ // Force roll writer. The new log file will have the default replications, // and the LowReplication Roller will be enabled. - log.rollWriter(true); + log.rollWriter(true, false); batchWriteAndWait(table, log, 13, true, 10000); replication = log.getLogReplication(); assertTrue("New log file should have the default replication instead of " + replication, @@ -309,7 +313,7 @@ writeData(table, 1005); // force a log roll to read back and verify previously written logs - log.rollWriter(true); + log.rollWriter(true, false); assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), preLogRolledCalled.size() >= 1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java index c0d3416..4042b41 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java @@ -41,8 +41,8 @@ public void testLogRollRequested() throws Exception { MetricsWALSource source = mock(MetricsWALSourceImpl.class); MetricsWAL metricsWAL = new MetricsWAL(source); - metricsWAL.logRollRequested(false); - metricsWAL.logRollRequested(true); + metricsWAL.logRollRequested(false, false); + metricsWAL.logRollRequested(true, false); // Log roll was requested twice verify(source, times(2)).incrementLogRollRequested(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java index d429a01..bc484fa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java @@ -36,6 +36,6 @@ @Override protected Writer createWriter(Path path) throws IOException { - return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false); + return FSHLogProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, null, false); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java index 62000b4..4c5d58d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java @@ -112,7 +112,7 @@ } @Override - protected AsyncWriter createWriterInstance(Path path) throws IOException { + protected AsyncWriter createWriterInstance(Path newPath, Path oldPath) throws IOException { if (arrive != null) { arrive.countDown(); try { @@ -123,7 +123,7 @@ if (localBroken || remoteBroken) { throw new IOException("WAL broken"); } - return super.createWriterInstance(path); + return super.createWriterInstance(newPath, oldPath); } public void setLocalBroken() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java index 4effe41..f86b74b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java @@ -109,7 +109,7 @@ RegionInfo regionInfo = utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); WAL wal = hrs.getWAL(regionInfo); - wal.rollWriter(true); + wal.rollWriter(true, false); } // ReplicationSource should advance past the empty wal, or else the test will fail diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java index d01a0ac..5e37c1b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestRecoverStandbyProcedure.java @@ -160,7 +160,8 @@ for (int i = 0; i < WAL_NUMBER; i++) { try (ProtobufLogWriter writer = new ProtobufLogWriter()) { Path wal = new Path(peerRemoteWALDir, "srv1,8888." + i + ".syncrep"); - writer.init(fs, wal, conf, true, WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir)); + writer.init(fs, wal, null, conf, true, + WALUtil.getWALBlockSize(conf, fs, peerRemoteWALDir)); List entries = setupWALEntries(i * ROW_COUNT, (i + 1) * ROW_COUNT); for (Entry entry : entries) { writer.append(entry); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index 453b742..f9d4553 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -199,7 +199,7 @@ // creatWriterInstance is where the new pipeline is set up for doing file rolls // if we are skipping it, just keep returning the same writer. @Override - protected Writer createWriterInstance(final Path path) throws IOException { + protected Writer createWriterInstance(final Path newPath, final Path oldPath) throws IOException { // we get called from the FSHLog constructor (!); always roll in this case since // we don't know yet if we're supposed to generally roll and // we need an initial file in the case of doing appends but no rolls. @@ -207,7 +207,7 @@ LOG.info("creating new writer instance."); final ProtobufLogWriter writer = new IOTestWriter(); try { - writer.init(fs, path, conf, false, this.blocksize); + writer.init(fs, newPath, oldPath, conf, false, this.blocksize); } catch (CommonFSUtils.StreamLacksCapabilityException exception) { throw new IOException("Can't create writer instance because underlying FileSystem " + "doesn't support needed stream capabilities.", exception); @@ -234,8 +234,9 @@ private boolean doSyncs; @Override - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable, - long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException { + public void init(FileSystem fs, Path newPath, Path oldPath, Configuration conf, + boolean overwritable, long blocksize) + throws IOException, CommonFSUtils.StreamLacksCapabilityException { Collection operations = conf.getStringCollection(ALLOWED_OPERATIONS); if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) { doAppends = doSyncs = true; @@ -247,7 +248,7 @@ } LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") + " and syncs " + (doSyncs ? "enabled" : "disabled")); - super.init(fs, path, conf, overwritable, blocksize); + super.init(fs, newPath, oldPath, conf, overwritable, blocksize); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index fd2b3c4..7176dbf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -209,7 +209,7 @@ walKey.getWriteEntry(); } log.sync(); - log.rollWriter(true); + log.rollWriter(true, false); } } wals.shutdown();