diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index b118ecd..d813e7f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; @@ -79,6 +78,8 @@ import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALPrettyPrinter; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.FSWALRoller; +import org.apache.hadoop.hbase.util.HDFSPipeline; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.util.StringUtils; import org.apache.htrace.NullScope; @@ -131,7 +132,7 @@ public class FSHLog implements WAL { // Calls to append now also wait until the append has been done on the consumer side of the // disruptor. We used to not wait but it makes the implemenation easier to grok if we have // the region edit/sequence id after the append returns. - // + // // TODO: Handlers need to coordinate appending AND syncing. Can we have the threads contend // once only? Probably hard given syncs take way longer than an append. // @@ -222,7 +223,7 @@ public class FSHLog implements WAL { private final String logFilePrefix; /** - * Suffix included on generated wal file names + * Suffix included on generated wal file names */ private final String logFileSuffix; @@ -245,7 +246,7 @@ public class FSHLog implements WAL { public void registerWALActionsListener(final WALActionsListener listener) { this.listeners.add(listener); } - + @Override public boolean unregisterWALActionsListener(final WALActionsListener listener) { return this.listeners.remove(listener); @@ -257,36 +258,6 @@ public class FSHLog implements WAL { } /** - * FSDataOutputStream associated with the current SequenceFile.writer - */ - private FSDataOutputStream hdfs_out; - - // All about log rolling if not enough replicas outstanding. - - // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered - private final int minTolerableReplication; - - // DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection. - private final Method getNumCurrentReplicas; - private final Method getPipeLine; // refers to DFSOutputStream.getPipeLine - private final int slowSyncNs; - - private final static Object [] NO_ARGS = new Object []{}; - - // If live datanode count is lower than the default replicas value, - // RollWriter will be triggered in each sync(So the RollWriter will be - // triggered one by one in a short time). Using it as a workaround to slow - // down the roll frequency triggered by checkLowReplication(). - private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0); - - private final int lowReplicationRollLimit; - - // If consecutiveLogRolls is larger than lowReplicationRollLimit, - // then disable the rolling in checkLowReplication(). - // Enable it if the replications recover. - private volatile boolean lowReplicationRollEnabled = true; - - /** * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding * sequence id as yet not flushed as well as the most recent edit sequence id appended to the * WAL. Has facility for answering questions such as "Is it safe to GC a WAL?". @@ -301,15 +272,6 @@ public class FSHLog implements WAL { /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */ private final DrainBarrier closeBarrier = new DrainBarrier(); - /** - * This lock makes sure only one log roll runs at a time. Should not be taken while any other - * lock is held. We don't just use synchronized because that results in bogus and tedious - * findbugs warning when it thinks synchronized controls writer thread safety. It is held when - * we are actually rolling the log. It is checked when we are looking to see if we should roll - * the log or not. - */ - private final ReentrantLock rollWriterLock = new ReentrantLock(true); - private volatile boolean closed = false; private final AtomicBoolean shutdown = new AtomicBoolean(false); @@ -319,9 +281,6 @@ public class FSHLog implements WAL { // Number of transactions in the current Wal. private final AtomicInteger numEntries = new AtomicInteger(0); - // If > than this size, roll the log. - private final long logrollsize; - /** * The total size of wal */ @@ -385,6 +344,36 @@ public class FSHLog implements WAL { } } + private final FSWALRoller walRoller; + + public static class FSHLogRollable implements FSWALRoller.Rollable { + private final FSHLog log; + + public FSHLogRollable(FSHLog log) { + this.log = log; + } + + @Override + public int getNumEntries() { + return log.numEntries.get(); + } + + @Override + public long getWriterLength() throws IOException { + return log.writer.getLength(); + } + + @Override + public byte[][] rollWriter(HDFSPipeline hdfsPipeline, boolean force) throws IOException { + return log.rollWriter(hdfsPipeline, force); + } + + @Override + public void requestLogRoll(boolean tooFewReplicas) { + log.requestLogRoll(tooFewReplicas); + } + } + /** * Constructor. * @@ -497,32 +486,30 @@ public class FSHLog implements WAL { // (it costs a little x'ing bocks) final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir)); - this.logrollsize = + long logRollSize = (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); - this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication", - FSUtils.getDefaultReplication(fs, this.fullPathLogDir)); - this.lowReplicationRollLimit = + int minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication", + FSUtils.getDefaultReplication(fs, fullPathLogDir)); + int lowReplicationRollLimit = conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5); this.closeErrorsTolerated = conf.getInt("hbase.regionserver.logroll.errors.tolerated", 0); int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); + int slowSyncNs = + 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", + DEFAULT_SLOW_SYNC_TIME_MS); + LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + - ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + + ", rollsize=" + StringUtils.byteDesc(logRollSize) + ", prefix=" + this.logFilePrefix + ", suffix=" + logFileSuffix + ", logDir=" + this.fullPathLogDir + ", archiveDir=" + this.fullPathArchiveDir); // rollWriter sets this.hdfs_out if it can. - rollWriter(); - - this.slowSyncNs = - 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", - DEFAULT_SLOW_SYNC_TIME_MS); - // handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with - // HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection. - this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); - this.getPipeLine = getGetPipeline(this.hdfs_out); + walRoller = new FSWALRoller(new FSHLogRollable(this), + logRollSize, minTolerableReplication, lowReplicationRollLimit, slowSyncNs); + walRoller.rollWriter(); // This is the 'writer' -- a single threaded executor. This single thread 'consumes' what is // put on the ring buffer. @@ -572,7 +559,7 @@ public class FSHLog implements WAL { */ @VisibleForTesting OutputStream getOutputStream() { - return this.hdfs_out.getWrappedStream(); + return this.walRoller.getOutputStream(); } @Override @@ -606,7 +593,7 @@ public class FSHLog implements WAL { /** * Tell listeners about pre log roll. - * @throws IOException + * @throws IOException */ private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) throws IOException { @@ -619,7 +606,7 @@ public class FSHLog implements WAL { /** * Tell listeners about post log roll. - * @throws IOException + * @throws IOException */ private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) throws IOException { @@ -647,51 +634,51 @@ public class FSHLog implements WAL { } @Override - public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException { - rollWriterLock.lock(); + public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + return walRoller.rollWriter(force); + } + + public byte[][] rollWriter(HDFSPipeline hdfsPipeline, boolean force) + throws FailedLogCloseException, IOException { + // Return if nothing to flush. + if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null; + byte [][] regionsToFlush = null; + if (this.closed) { + LOG.debug("WAL closed. Skipping rolling of writer"); + return regionsToFlush; + } + if (!closeBarrier.beginOp()) { + LOG.debug("WAL closing. Skipping rolling of writer"); + return regionsToFlush; + } + TraceScope scope = Trace.startSpan("FSHLog.rollWriter"); try { - // Return if nothing to flush. - if (!force && (this.writer != null && this.numEntries.get() <= 0)) return null; - byte [][] regionsToFlush = null; - if (this.closed) { - LOG.debug("WAL closed. Skipping rolling of writer"); - return regionsToFlush; - } - if (!closeBarrier.beginOp()) { - LOG.debug("WAL closing. Skipping rolling of writer"); - return regionsToFlush; + Path oldPath = getOldPath(); + Path newPath = getNewPath(); + // Any exception from here on is catastrophic, non-recoverable so we currently abort. + Writer nextWriter = this.createWriterInstance(newPath); + FSDataOutputStream nextHdfsOut = null; + if (nextWriter instanceof ProtobufLogWriter) { + nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream(); + // If a ProtobufLogWriter, go ahead and try and sync to force setup of pipeline. + // If this fails, we just keep going.... it is an optimization, not the end of the world. + preemptiveSync((ProtobufLogWriter)nextWriter); } - TraceScope scope = Trace.startSpan("FSHLog.rollWriter"); - try { - Path oldPath = getOldPath(); - Path newPath = getNewPath(); - // Any exception from here on is catastrophic, non-recoverable so we currently abort. - Writer nextWriter = this.createWriterInstance(newPath); - FSDataOutputStream nextHdfsOut = null; - if (nextWriter instanceof ProtobufLogWriter) { - nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream(); - // If a ProtobufLogWriter, go ahead and try and sync to force setup of pipeline. - // If this fails, we just keep going.... it is an optimization, not the end of the world. - preemptiveSync((ProtobufLogWriter)nextWriter); - } - tellListenersAboutPreLogRoll(oldPath, newPath); - // NewPath could be equal to oldPath if replaceWriter fails. - newPath = replaceWriter(oldPath, newPath, nextWriter, nextHdfsOut); - tellListenersAboutPostLogRoll(oldPath, newPath); - // Can we delete any of the old log files? - if (getNumRolledLogFiles() > 0) { - cleanOldLogs(); - regionsToFlush = findRegionsToForceFlush(); - } - } finally { - closeBarrier.endOp(); - assert scope == NullScope.INSTANCE || !scope.isDetached(); - scope.close(); + tellListenersAboutPreLogRoll(oldPath, newPath); + // NewPath could be equal to oldPath if replaceWriter fails. + newPath = replaceWriter(oldPath, newPath, hdfsPipeline, nextWriter, nextHdfsOut); + tellListenersAboutPostLogRoll(oldPath, newPath); + // Can we delete any of the old log files? + if (getNumRolledLogFiles() > 0) { + cleanOldLogs(); + regionsToFlush = findRegionsToForceFlush(); } - return regionsToFlush; } finally { - rollWriterLock.unlock(); + closeBarrier.endOp(); + assert scope == NullScope.INSTANCE || !scope.isDetached(); + scope.close(); } + return regionsToFlush; } /** @@ -775,9 +762,9 @@ public class FSHLog implements WAL { * @return the passed in newPath * @throws IOException if there is a problem flushing or closing the underlying FS */ - Path replaceWriter(final Path oldPath, final Path newPath, Writer nextWriter, - final FSDataOutputStream nextHdfsOut) - throws IOException { + Path replaceWriter(final Path oldPath, final Path newPath, + final HDFSPipeline hdfsPipeline, Writer nextWriter, final FSDataOutputStream nextHdfsOut) + throws IOException { // Ask the ring buffer writer to pause at a safe point. Once we do this, the writer // thread will eventually pause. An error hereafter needs to release the writer thread // regardless -- hence the finally block below. Note, this method is called from the FSHLog @@ -825,7 +812,7 @@ public class FSHLog implements WAL { } } this.writer = nextWriter; - this.hdfs_out = nextHdfsOut; + hdfsPipeline.reset(nextHdfsOut); int oldNumEntries = this.numEntries.get(); this.numEntries.set(0); final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath)); @@ -1035,12 +1022,12 @@ public class FSHLog implements WAL { // we use HLogKey here instead of WALKey directly to support legacy coprocessors. return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce); } - + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION", justification="Will never be null") @Override public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key, - final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, + final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, final List memstoreCells) throws IOException { if (this.closed) throw new IOException("Cannot append; log is closed"); // Make a trace scope for the append. It is closed on other side of the ring buffer by the @@ -1083,9 +1070,9 @@ public class FSHLog implements WAL { private class SyncRunner extends HasThread { private volatile long sequence; private final BlockingQueue syncFutures; - + /** - * UPDATE! + * UPDATE! * @param syncs the batch of calls to sync that arrived as this thread was starting; when done, * we will put the result of the actual hdfs sync call as the result. * @param sequence The sequence number on the ring buffer when this thread was set running. @@ -1133,7 +1120,7 @@ public class FSHLog implements WAL { // This function releases one sync future only. return 1; } - + /** * Release all SyncFutures whose sequence is <= currentSequence. * @param currentSequence @@ -1221,7 +1208,9 @@ public class FSHLog implements WAL { syncCount += releaseSyncFutures(currentSequence, t); if (t != null) { requestLogRoll(); - } else checkLogRoll(); + } else { + walRoller.checkLogRoll(); + } } postSync(System.nanoTime() - start, syncCount); } catch (InterruptedException e) { @@ -1234,77 +1223,6 @@ public class FSHLog implements WAL { } } - /** - * Schedule a log roll if needed. - */ - void checkLogRoll() { - // Will return immediately if we are in the middle of a WAL log roll currently. - if (!rollWriterLock.tryLock()) return; - boolean lowReplication; - try { - lowReplication = checkLowReplication(); - } finally { - rollWriterLock.unlock(); - } - try { - if (lowReplication || writer != null && writer.getLength() > logrollsize) { - requestLogRoll(lowReplication); - } - } catch (IOException e) { - LOG.warn("Writer.getLength() failed; continuing", e); - } - } - - /* - * @return true if number of replicas for the WAL is lower than threshold - */ - private boolean checkLowReplication() { - boolean logRollNeeded = false; - // if the number of replicas in HDFS has fallen below the configured - // value, then roll logs. - try { - int numCurrentReplicas = getLogReplication(); - if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) { - if (this.lowReplicationRollEnabled) { - if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) { - LOG.warn("HDFS pipeline error detected. " + "Found " - + numCurrentReplicas + " replicas but expecting no less than " - + this.minTolerableReplication + " replicas. " - + " Requesting close of wal. current pipeline: " - + Arrays.toString(getPipeLine())); - logRollNeeded = true; - // If rollWriter is requested, increase consecutiveLogRolls. Once it - // is larger than lowReplicationRollLimit, disable the - // LowReplication-Roller - this.consecutiveLogRolls.getAndIncrement(); - } else { - LOG.warn("Too many consecutive RollWriter requests, it's a sign of " - + "the total number of live datanodes is lower than the tolerable replicas."); - this.consecutiveLogRolls.set(0); - this.lowReplicationRollEnabled = false; - } - } - } else if (numCurrentReplicas >= this.minTolerableReplication) { - if (!this.lowReplicationRollEnabled) { - // The new writer's log replicas is always the default value. - // So we should not enable LowReplication-Roller. If numEntries - // is lower than or equals 1, we consider it as a new writer. - if (this.numEntries.get() <= 1) { - return logRollNeeded; - } - // Once the live datanode number and the replicas return to normal, - // enable the LowReplication-Roller. - this.lowReplicationRollEnabled = true; - LOG.info("LowReplication-Roller was enabled."); - } - } - } catch (Exception e) { - LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + - " still proceeding ahead..."); - } - return logRollNeeded; - } - private SyncFuture publishSyncOnRingBuffer() { return publishSyncOnRingBuffer(null); } @@ -1356,7 +1274,7 @@ public class FSHLog implements WAL { } private void postSync(final long timeInNanos, final int handlerSyncs) { - if (timeInNanos > this.slowSyncNs) { + if (timeInNanos > this.walRoller.getSlowSyncNs()) { String msg = new StringBuilder().append("Slow sync cost: ") .append(timeInNanos / 1000000).append(" ms, current pipeline: ") @@ -1385,35 +1303,6 @@ public class FSHLog implements WAL { } /** - * Find the 'getNumCurrentReplicas' on the passed os stream. - * This is used for getting current replicas of a file being written. - * @return Method or null. - */ - private Method getGetNumCurrentReplicas(final FSDataOutputStream os) { - // TODO: Remove all this and use the now publically available - // HdfsDataOutputStream#getCurrentBlockReplication() - Method m = null; - if (os != null) { - Class wrappedStreamClass = os.getWrappedStream().getClass(); - try { - m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class[] {}); - m.setAccessible(true); - } catch (NoSuchMethodException e) { - LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " + - "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName()); - } catch (SecurityException e) { - LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " + - "not available; fsOut=" + wrappedStreamClass.getName(), e); - m = null; // could happen on setAccessible() - } - } - if (m != null) { - if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas"); - } - return m; - } - - /** * This method gets the datanode replication count for the current WAL. * * If the pipeline isn't started yet or is empty, you will get the default @@ -1428,14 +1317,7 @@ public class FSHLog implements WAL { @VisibleForTesting int getLogReplication() throws IllegalArgumentException, IllegalAccessException, InvocationTargetException { - final OutputStream stream = getOutputStream(); - if (this.getNumCurrentReplicas != null && stream != null) { - Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS); - if (repl instanceof Integer) { - return ((Integer)repl).intValue(); - } - } - return 0; + return walRoller.getLogReplication(); } @Override @@ -1519,7 +1401,7 @@ public class FSHLog implements WAL { @VisibleForTesting boolean isLowReplicationRollEnabled() { - return lowReplicationRollEnabled; + return walRoller.isLowReplicationRollEnabled(); } public static final long FIXED_OVERHEAD = ClassSize.align( @@ -1566,7 +1448,7 @@ public class FSHLog implements WAL { * 'safe point' while the orchestrating thread does some work that requires the first thread * paused: e.g. holding the WAL writer while its WAL is swapped out from under it by another * thread. - * + * *

Thread A signals Thread B to hold when it gets to a 'safe point'. Thread A wait until * Thread B gets there. When the 'safe point' has been attained, Thread B signals Thread A. * Thread B then holds at the 'safe point'. Thread A on notification that Thread B is paused, @@ -1574,7 +1456,7 @@ public class FSHLog implements WAL { * it flags B and then Thread A and Thread B continue along on their merry way. Pause and * signalling 'zigzags' between the two participating threads. We use two latches -- one the * inverse of the other -- pausing and signaling when states are achieved. - * + * *

To start up the drama, Thread A creates an instance of this class each time it would do * this zigzag dance and passes it to Thread B (these classes use Latches so it is one shot * only). Thread B notices the new instance (via reading a volatile reference or how ever) and it @@ -1596,7 +1478,7 @@ public class FSHLog implements WAL { * Latch to wait on. Will be released when we can proceed. */ private volatile CountDownLatch safePointReleasedLatch = new CountDownLatch(1); - + /** * For Thread A to call when it is ready to wait on the 'safe point' to be attained. * Thread A will be held in here until Thread B calls {@link #safePointAttained()} @@ -1605,7 +1487,7 @@ public class FSHLog implements WAL { * @throws InterruptedException * @throws ExecutionException * @return The passed syncFuture - * @throws FailedSyncBeforeLogCloseException + * @throws FailedSyncBeforeLogCloseException */ SyncFuture waitSafePoint(final SyncFuture syncFuture) throws InterruptedException, FailedSyncBeforeLogCloseException { @@ -1617,7 +1499,7 @@ public class FSHLog implements WAL { } return syncFuture; } - + /** * Called by Thread B when it attains the 'safe point'. In this method, Thread B signals * Thread A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} @@ -1812,8 +1694,8 @@ public class FSHLog implements WAL { // so region sequenceids will also be in order. regionSequenceId = entry.stampRegionSequenceId(); - // Edits are empty, there is nothing to append. Maybe empty when we are looking for a - // region sequence id only, a region edit/sequence id that is not associated with an actual + // Edits are empty, there is nothing to append. Maybe empty when we are looking for a + // region sequence id only, a region edit/sequence id that is not associated with an actual // edit. It has to go through all the rigmarole to be sure we have the right ordering. if (entry.getEdit().isEmpty()) { return; @@ -1922,51 +1804,17 @@ public class FSHLog implements WAL { System.exit(-1); } } - - /** - * Find the 'getPipeline' on the passed os stream. - * @return Method or null. - */ - private Method getGetPipeline(final FSDataOutputStream os) { - Method m = null; - if (os != null) { - Class wrappedStreamClass = os.getWrappedStream() - .getClass(); - try { - m = wrappedStreamClass.getDeclaredMethod("getPipeline", - new Class[] {}); - m.setAccessible(true); - } catch (NoSuchMethodException e) { - LOG.info("FileSystem's output stream doesn't support" - + " getPipeline; not available; fsOut=" - + wrappedStreamClass.getName()); - } catch (SecurityException e) { - LOG.info( - "Doesn't have access to getPipeline on " - + "FileSystems's output stream ; fsOut=" - + wrappedStreamClass.getName(), e); - m = null; // could happen on setAccessible() - } - } - return m; - } /** * This method gets the pipeline for the current WAL. */ @VisibleForTesting DatanodeInfo[] getPipeLine() { - if (this.getPipeLine != null && this.hdfs_out != null) { - Object repl; - try { - repl = this.getPipeLine.invoke(getOutputStream(), NO_ARGS); - if (repl instanceof DatanodeInfo[]) { - return ((DatanodeInfo[]) repl); - } - } catch (Exception e) { - LOG.info("Get pipeline failed", e); - } - } - return new DatanodeInfo[0]; + return walRoller.getPipeLine(); + } + + @VisibleForTesting + HDFSPipeline getHDFSPipeline() { + return walRoller.getHDFSPipeline(); } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HDFSPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HDFSPipeline.java new file mode 100644 index 0000000..2709bfc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HDFSPipeline.java @@ -0,0 +1,165 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + +@InterfaceAudience.Private +public class HDFSPipeline { + private static final Log LOG = LogFactory.getLog(HDFSPipeline.class); + + private final static Object [] NO_ARGS = new Object []{}; + + private Method getNumCurrentReplicas; + private Method getPipeLine; // refers to DFSOutputStream.getPipeLine + + private FSDataOutputStream hdfsOutputStream; + + public HDFSPipeline() { + } + + public void reset(final FSDataOutputStream os) { + final boolean sameClass = hdfsOutputStream == null ? false : + os.getWrappedStream().getClass().equals(hdfsOutputStream.getWrappedStream().getClass()); + + this.hdfsOutputStream = os; + + // handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with + // HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection. + if (!sameClass) { + this.getNumCurrentReplicas = getGetNumCurrentReplicasMethod(os); + this.getPipeLine = getGetPipelineMethod(os); + } + } + + /** + * Find the 'getNumCurrentReplicas' on the passed os stream. + * This is used for getting current replicas of a file being written. + * @return Method or null. + */ + private Method getGetNumCurrentReplicasMethod(final FSDataOutputStream os) { + // TODO: Remove all this and use the now publically available + // HdfsDataOutputStream#getCurrentBlockReplication() + Method m = null; + if (os != null) { + Class wrappedStreamClass = os.getWrappedStream().getClass(); + try { + m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class[] {}); + m.setAccessible(true); + } catch (NoSuchMethodException e) { + LOG.info("FileSystem's output stream doesn't support getNumCurrentReplicas; " + + "HDFS-826 not available; fsOut=" + wrappedStreamClass.getName()); + } catch (SecurityException e) { + LOG.info("No access to getNumCurrentReplicas on FileSystems's output stream; HDFS-826 " + + "not available; fsOut=" + wrappedStreamClass.getName(), e); + m = null; // could happen on setAccessible() + } + } + if (m != null) { + if (LOG.isTraceEnabled()) LOG.trace("Using getNumCurrentReplicas"); + } + return m; + } + + /** + * Find the 'getPipeline' on the passed os stream. + * @return Method or null. + */ + private Method getGetPipelineMethod(final FSDataOutputStream os) { + Method m = null; + if (os != null) { + Class wrappedStreamClass = os.getWrappedStream().getClass(); + try { + m = wrappedStreamClass.getDeclaredMethod("getPipeline", + new Class[] {}); + m.setAccessible(true); + } catch (NoSuchMethodException e) { + LOG.info("FileSystem's output stream doesn't support" + + " getPipeline; not available; fsOut=" + + wrappedStreamClass.getName()); + } catch (SecurityException e) { + LOG.info( + "Doesn't have access to getPipeline on " + + "FileSystems's output stream ; fsOut=" + + wrappedStreamClass.getName(), e); + m = null; // could happen on setAccessible() + } + } + return m; + } + + public FSDataOutputStream getStream() { + return this.hdfsOutputStream; + } + + public OutputStream getWrappedStream() { + return this.hdfsOutputStream.getWrappedStream(); + } + + /** + * This method gets the datanode replication count for the current FSDataOutputStream. + * + * If the pipeline isn't started yet or is empty, you will get the default + * replication factor. Therefore, if this function returns 0, it means you + * are not properly running with the HDFS-826 patch. + * @throws InvocationTargetException + * @throws IllegalAccessException + * @throws IllegalArgumentException + * @throws Exception + */ + public int getDataNodeReplication() + throws IllegalArgumentException, IllegalAccessException, InvocationTargetException { + final OutputStream stream = getWrappedStream(); + if (this.getNumCurrentReplicas != null && stream != null) { + Object repl = this.getNumCurrentReplicas.invoke(stream, NO_ARGS); + if (repl instanceof Integer) { + return ((Integer)repl).intValue(); + } + } + return 0; + } + + /** + * This method gets the pipeline for the current FSDataOutputStream. + */ + public DatanodeInfo[] getPipeLine() { + if (this.getPipeLine != null && hdfsOutputStream != null) { + Object repl; + try { + repl = this.getPipeLine.invoke(getWrappedStream(), NO_ARGS); + if (repl instanceof DatanodeInfo[]) { + return ((DatanodeInfo[]) repl); + } + } catch (Exception e) { + LOG.info("Get pipeline failed", e); + } + } + return new DatanodeInfo[0]; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALRoller.java new file mode 100644 index 0000000..4a39558 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSWALRoller.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.HDFSPipeline; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + +import com.google.common.annotations.VisibleForTesting; + +@InterfaceAudience.Private +public class FSWALRoller { + private static final Log LOG = LogFactory.getLog(FSWALRoller.class); + + public interface Rollable { + int getNumEntries(); + long getWriterLength() throws IOException; + void requestLogRoll(boolean tooFewReplicas); + TRoll rollWriter(HDFSPipeline hdfsPipeline, boolean force) throws IOException; + } + + private final Rollable rollable; + + // All about log rolling if not enough replicas outstanding. + + // Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered + private final int minTolerableReplication; + + // DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection. + private final HDFSPipeline hdfsPipeline = new HDFSPipeline(); + private final int slowSyncNs; + + // If live datanode count is lower than the default replicas value, + // RollWriter will be triggered in each sync(So the RollWriter will be + // triggered one by one in a short time). Using it as a workaround to slow + // down the roll frequency triggered by checkLowReplication(). + private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0); + + private final int lowReplicationRollLimit; + + // If consecutiveLogRolls is larger than lowReplicationRollLimit, + // then disable the rolling in checkLowReplication(). + // Enable it if the replications recover. + private volatile boolean lowReplicationRollEnabled = true; + + /** + * This lock makes sure only one log roll runs at a time. Should not be taken while any other + * lock is held. We don't just use synchronized because that results in bogus and tedious + * findbugs warning when it thinks synchronized controls writer thread safety. It is held when + * we are actually rolling the log. It is checked when we are looking to see if we should roll + * the log or not. + */ + private final ReentrantLock rollWriterLock = new ReentrantLock(true); + + // If > than this size, roll the log. + private final long logRollSize; + + public FSWALRoller(Rollable rollable, long logRollSize, int minTolerableReplication, + int lowReplicationRollLimit, int slowSyncNs) { + this.rollable = rollable; + + this.logRollSize = logRollSize; + this.minTolerableReplication = minTolerableReplication; + this.lowReplicationRollLimit = lowReplicationRollLimit; + this.slowSyncNs = slowSyncNs; + } + + public TRoll rollWriter() throws IOException { + return rollWriter(false); + } + + public TRoll rollWriter(boolean force) throws IOException { + rollWriterLock.lock(); + try { + return rollable.rollWriter(hdfsPipeline, force); + } finally { + rollWriterLock.unlock(); + } + } + + /** + * Currently, we need to expose the writer's OutputStream to tests so that they can manipulate + * the default behavior (such as setting the maxRecoveryErrorCount value for example (see + * {@link TestWALReplay#testReplayEditsWrittenIntoWAL()}). This is done using reflection on the + * underlying HDFS OutputStream. + * NOTE: This could be removed once Hadoop1 support is removed. + * @return null if underlying stream is not ready. + */ + @VisibleForTesting + public OutputStream getOutputStream() { + return this.hdfsPipeline.getWrappedStream(); + } + + /** + * Schedule a log roll if needed. + */ + public void checkLogRoll() { + // Will return immediately if we are in the middle of a WAL log roll currently. + if (!rollWriterLock.tryLock()) return; + boolean lowReplication; + try { + lowReplication = checkLowReplication(); + } finally { + rollWriterLock.unlock(); + } + try { + if (lowReplication || rollable.getWriterLength() > logRollSize) { + requestLogRoll(lowReplication); + } + } catch (IOException e) { + LOG.warn("Writer.getLength() failed; continuing", e); + } + } + + /* + * @return true if number of replicas for the WAL is lower than threshold + */ + private boolean checkLowReplication() { + boolean logRollNeeded = false; + // if the number of replicas in HDFS has fallen below the configured + // value, then roll logs. + try { + int numCurrentReplicas = getLogReplication(); + if (numCurrentReplicas != 0 && numCurrentReplicas < this.minTolerableReplication) { + if (this.lowReplicationRollEnabled) { + if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) { + LOG.warn("HDFS pipeline error detected. " + "Found " + + numCurrentReplicas + " replicas but expecting no less than " + + this.minTolerableReplication + " replicas. " + + " Requesting close of wal. current pipeline: " + + Arrays.toString(getPipeLine())); + logRollNeeded = true; + // If rollWriter is requested, increase consecutiveLogRolls. Once it + // is larger than lowReplicationRollLimit, disable the + // LowReplication-Roller + this.consecutiveLogRolls.getAndIncrement(); + } else { + LOG.warn("Too many consecutive RollWriter requests, it's a sign of " + + "the total number of live datanodes is lower than the tolerable replicas."); + this.consecutiveLogRolls.set(0); + this.lowReplicationRollEnabled = false; + } + } + } else if (numCurrentReplicas >= this.minTolerableReplication) { + if (!this.lowReplicationRollEnabled) { + // The new writer's log replicas is always the default value. + // So we should not enable LowReplication-Roller. If numEntries + // is lower than or equals 1, we consider it as a new writer. + if (rollable.getNumEntries() <= 1) { + return logRollNeeded; + } + // Once the live datanode number and the replicas return to normal, + // enable the LowReplication-Roller. + this.lowReplicationRollEnabled = true; + LOG.info("LowReplication-Roller was enabled."); + } + } + } catch (Exception e) { + LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + + " still proceeding ahead..."); + } + return logRollNeeded; + } + + public void requestLogRoll() { + requestLogRoll(false); + } + + private void requestLogRoll(boolean lowReplication) { + rollable.requestLogRoll(lowReplication); + } + + public long getLogRollSize() { + return logRollSize; + } + + public int getSlowSyncNs() { + return slowSyncNs; + } + + @VisibleForTesting + public boolean isLowReplicationRollEnabled() { + return lowReplicationRollEnabled; + } + + /** + * This method gets the datanode replication count for the current WAL. + * + * If the pipeline isn't started yet or is empty, you will get the default + * replication factor. Therefore, if this function returns 0, it means you + * are not properly running with the HDFS-826 patch. + * @throws InvocationTargetException + * @throws IllegalAccessException + * @throws IllegalArgumentException + * + * @throws Exception + */ + @VisibleForTesting + public int getLogReplication() + throws IllegalArgumentException, IllegalAccessException, InvocationTargetException { + return hdfsPipeline.getDataNodeReplication(); + } + + /** + * This method gets the pipeline for the current WAL. + */ + @VisibleForTesting + public DatanodeInfo[] getPipeLine() { + return hdfsPipeline.getPipeLine(); + } + + @VisibleForTesting + public HDFSPipeline getHDFSPipeline() { + return hdfsPipeline; + } +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 4e07040..a61b21b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -31,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -57,6 +59,9 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; + /** * Tests for conditions that should trigger RegionServer aborts when * rolling the current WAL fails. @@ -191,7 +196,7 @@ public class TestLogRollAbort { HRegionInfo regioninfo = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); final WAL log = wals.getWAL(regioninfo.getEncodedNameAsBytes()); - + final AtomicLong sequenceId = new AtomicLong(1); final int total = 20; @@ -205,7 +210,11 @@ public class TestLogRollAbort { } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); - ((FSHLog) log).replaceWriter(((FSHLog)log).getOldPath(), null, null, null); + + FSDataOutputStream nextHdfsOut = Mockito.mock(FSDataOutputStream.class); + when(nextHdfsOut.getWrappedStream()).thenReturn(Mockito.mock(FSDataOutputStream.class)); + ((FSHLog) log).replaceWriter(((FSHLog)log).getOldPath(), null, + ((FSHLog)log).getHDFSPipeline(), null, nextHdfsOut); /* code taken from MasterFileSystem.getLogDirs(), which is called from MasterFileSystem.splitLog() * handles RS shutdowns (as observed by the splitting process)