diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 1c9202a..e1d4da6 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; @@ -129,7 +130,7 @@ public class HLog implements Syncable { private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; - + private final FileSystem fs; private final Path dir; private final Configuration conf; @@ -143,7 +144,7 @@ public class HLog implements Syncable { private volatile long syncedTillHere = 0; private long lastDeferredTxid; private final Path oldLogDir; - private boolean logRollRunning; + private volatile boolean logRollRunning; private static Class logWriterClass; private static Class logReaderClass; @@ -155,7 +156,7 @@ public class HLog implements Syncable { } private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer - // Minimum tolerable replicas, if the actual value is lower than it, + // Minimum tolerable replicas, if the actual value is lower than it, // rollWriter will be triggered private int minTolerableReplication; private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas @@ -181,7 +182,7 @@ public class HLog implements Syncable { /* * Current log file. */ - Writer writer; + final AtomicReference writerRef = new AtomicReference(); /* * Map of all log files but the current one. @@ -301,7 +302,7 @@ public class HLog implements Syncable { //For measuring slow HLog appends private static AtomicLong slowHLogAppendCount = new AtomicLong(); private static Metric slowHLogAppendTime = new Metric(); - + public static Metric getWriteTime() { return writeTime.get(); } @@ -317,11 +318,11 @@ public class HLog implements Syncable { public static long getSyncBatchSize() { return syncBatchSize.getAndSet(0); } - + public static long getSlowAppendCount() { return slowHLogAppendCount.get(); } - + public static Metric getSlowAppendTime() { return slowHLogAppendTime.get(); } @@ -577,7 +578,7 @@ public class HLog implements Syncable { public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException { // Return if nothing to flush. - if (!force && this.writer != null && this.numEntries.get() <= 0) { + if (!force && this.writerRef.get() != null && this.numEntries.get() <= 0) { return null; } byte [][] regionsToFlush = null; @@ -622,7 +623,7 @@ public class HLog implements Syncable { synchronized (updateLock) { // Clean up current writer. Path oldFile = cleanupCurrentWriter(currentFilenum); - this.writer = nextWriter; + this.writerRef.set(nextWriter); this.hdfs_out = nextHdfsOut; LOG.info((oldFile != null? @@ -658,7 +659,7 @@ public class HLog implements Syncable { /** * This method allows subclasses to inject different writers without having to * extend other methods like rollWriter(). - * + * * @param fs * @param path * @param conf @@ -830,7 +831,7 @@ public class HLog implements Syncable { */ Path cleanupCurrentWriter(final long currentfilenum) throws IOException { Path oldFile = null; - if (this.writer != null) { + if (this.writerRef.get() != null) { // Close the current writer, get a new one. try { // Wait till all current transactions are written to the hlog. @@ -842,8 +843,8 @@ public class HLog implements Syncable { " synced till here " + syncedTillHere); sync(); } - this.writer.close(); - this.writer = null; + this.writerRef.get().close(); + this.writerRef.set(null); closeErrorCount.set(0); } catch (IOException e) { LOG.error("Failed close of HLog writer", e); @@ -980,8 +981,9 @@ public class HLog implements Syncable { if (LOG.isDebugEnabled()) { LOG.debug("closing hlog writer in " + this.dir.toString()); } - if (this.writer != null) { - this.writer.close(); + Writer writer = writerRef.get(); + if (writer != null) { + writer.close(); } } } finally { @@ -1118,7 +1120,7 @@ public class HLog implements Syncable { } // Sync if catalog region, and if not then check if that table supports // deferred log flushing - if (doSync && + if (doSync && (info.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system @@ -1140,7 +1142,7 @@ public class HLog implements Syncable { * @return txid of this transaction * @throws IOException */ - public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, + public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd) throws IOException { return append(info, tableName, edits, clusterId, now, htd, false); @@ -1159,7 +1161,7 @@ public class HLog implements Syncable { * @return txid of this transaction * @throws IOException */ - public long append(HRegionInfo info, byte [] tableName, WALEdit edits, + public long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd) throws IOException { return append(info, tableName, edits, clusterId, now, htd, true); @@ -1172,13 +1174,13 @@ public class HLog implements Syncable { class LogSyncer extends HasThread { private final long optionalFlushInterval; - + private boolean closeLogSyncer = false; // List of pending writes to the HLog. There corresponds to transactions // that have not yet returned to the client. We keep them cached here - // instead of writing them to HDFS piecemeal, because the HDFS write - // method is pretty heavyweight as far as locking is concerned. The + // instead of writing them to HDFS piecemeal, because the HDFS write + // method is pretty heavyweight as far as locking is concerned. The // goal is to increase the batchsize for writing-to-hdfs as well as // sync-to-hdfs, so that we can get better system throughput. private List pendingWrites = new LinkedList(); @@ -1227,17 +1229,33 @@ public class HLog implements Syncable { } // writes out pending entries to the HLog - void hlogFlush(Writer writer) throws IOException { + void hlogFlush(AtomicReference writerRef) throws IOException { // Atomically fetch all existing pending writes. New writes // will start accumulating in a new list. List pending = getPendingWrites(); + if (pending.isEmpty()) { + return; + } - // write out all accumulated Entries to hdfs. - for (Entry e : pending) { - writer.append(e); + try { + Writer writer = writerRef.get(); + // write out all accumulated Entries to hdfs. + for (Entry e : pending) { + writer.append(e); + } + } catch (Exception ex) { + // ensure that we hold the updateLock, not to collide with logRoll. + // HBASE-5623 + synchronized (updateLock) { + Writer writer = writerRef.get(); + // write out all accumulated Entries to hdfs. + for (Entry e : pending) { + writer.append(e); + } + } } } - + void close(){ closeLogSyncer = true; } @@ -1250,10 +1268,9 @@ public class HLog implements Syncable { // sync all transactions upto the specified txid private void syncer(long txid) throws IOException { - synchronized (this.updateLock) { - if (this.closed) return; - } - // if the transaction that we are interested in is already + if (this.closed) return; + + // if the transaction that we are interested in is already // synced, then return immediately. if (txid <= this.syncedTillHere) { return; @@ -1264,21 +1281,24 @@ public class HLog implements Syncable { // Done in parallel for all writer threads, thanks to HDFS-895 boolean syncSuccessful = true; try { - // First flush all the pending writes to HDFS. Then + // First flush all the pending writes to HDFS. Then // issue the sync to HDFS. If sync is successful, then update // syncedTillHere to indicate that transactions till this // number has been successfully synced. - logSyncerThread.hlogFlush(this.writer); - this.writer.sync(); + logSyncerThread.hlogFlush(writerRef); + writerRef.get().sync(); syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); } catch(IOException io) { syncSuccessful = false; + } catch(NullPointerException npe) { + //HBASE-5623. When writer is rolling, the writer might have already been closed, and fields set to null + syncSuccessful = false; } if (!syncSuccessful) { synchronized (this.updateLock) { // HBASE-4387, retry with updateLock held - this.writer.sync(); + writerRef.get().sync(); syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); this.syncedTillHere = doneUpto; } @@ -1288,8 +1308,16 @@ public class HLog implements Syncable { syncTime.inc(System.currentTimeMillis() - now); if (!this.logRollRunning) { checkLowReplication(); - if (this.writer.getLength() > this.logrollsize) { - requestLogRoll(); + try { + if (writerRef.get().getLength() > this.logrollsize) { + requestLogRoll(); + } + } catch (Exception ex) { + // not really a problem + // if writer was just closed we're fine anyway + // other rolling condition will be checked again + // next time + LOG.debug("Log roll failed", ex); } } } catch (IOException e) { @@ -1609,7 +1637,7 @@ public class HLog implements Syncable { /** * Get LowReplication-Roller status - * + * * @return lowReplicationRollEnabled */ public boolean isLowReplicationRollEnabled() { @@ -1713,13 +1741,13 @@ public class HLog implements Syncable { /** * Get the directory we are making logs in. - * + * * @return dir */ protected Path getDir() { return dir; } - + public static boolean validateHLogFilename(String filename) { return pattern.matcher(filename).matches(); } diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java new file mode 100644 index 0000000..3f46736 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertFalse; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test many concurrent appenders to an {@link #HLog} while rolling the log. + */ +@Category(MediumTests.class) +public class TestLogRollingNoCluster { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static byte [] EMPTY_1K_ARRAY = new byte[1024]; + private static final int THREAD_COUNT = 100; // Spin up this many threads + + /** + * Spin up a bunch of threads and have them all append to a WAL. Roll the + * WAL frequently to try and trigger NPE. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testContendedLogRolling() throws IOException, InterruptedException { + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + Path dir = TEST_UTIL.getDataTestDir(); + HLog wal = new HLog(fs, new Path(dir, "logs"), new Path(dir, "oldlogs"), + TEST_UTIL.getConfiguration()); + Appender [] appenders = null; + + final int count = THREAD_COUNT; + appenders = new Appender[count]; + try { + for (int i = 0; i < count; i++) { + // Have each appending thread write 'count' entries + appenders[i] = new Appender(wal, i, count); + } + for (int i = 0; i < count; i++) { + appenders[i].start(); + } + for (int i = 0; i < count; i++) { + //ensure that all threads are joined before closing the wal + appenders[i].join(); + } + } finally { + wal.close(); + } + for (int i = 0; i < count; i++) { + assertFalse(appenders[i].isException()); + } + } + + /** + * Appender thread. Appends to passed wal file. + */ + static class Appender extends Thread { + private final Log log; + private final HLog wal; + private final int count; + private Exception e = null; + + Appender(final HLog wal, final int index, final int count) { + super("" + index); + this.wal = wal; + this.count = count; + this.log = LogFactory.getLog("Appender:" + getName()); + } + + /** + * @return Call when the thread is done. + */ + boolean isException() { + return !isAlive() && this.e != null; + } + + Exception getException() { + return this.e; + } + + @Override + public void run() { + this.log.info(getName() +" started"); + try { + for (int i = 0; i < this.count; i++) { + long now = System.currentTimeMillis(); + // Roll every ten edits if the log has anything in it. + if (i % 10 == 0 && this.wal.getNumEntries() > 0) { + this.wal.rollWriter(); + } + WALEdit edit = new WALEdit(); + byte[] bytes = Bytes.toBytes(i); + edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY)); + + this.wal.append(HRegionInfo.FIRST_META_REGIONINFO, + HTableDescriptor.META_TABLEDESC.getName(), + edit, now, HTableDescriptor.META_TABLEDESC); + } + String msg = getName() + " finished"; + if (isException()) + this.log.info(msg, getException()); + else + this.log.info(msg); + } catch (Exception e) { + this.e = e; + log.info("Caught exception from Appender:" + getName(), e); + } + } + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} \ No newline at end of file