Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java (revision 0) @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertFalse; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test many concurrent appenders to an {@link #HLog} while rolling the log. + */ +@Category(MediumTests.class) +public class TestLogRollingNoCluster { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static byte [] EMPTY_1K_ARRAY = new byte[1024]; + private static final int THREAD_COUNT = 100; // Spin up this many threads + + /** + * Spin up a bunch of threads and have them all append to a WAL. Roll the + * WAL frequently to try and trigger NPE. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testContendedLogRolling() throws IOException, InterruptedException { + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + Path dir = TEST_UTIL.getDataTestDir(); + HLog wal = new HLog(fs, new Path(dir, "logs"), new Path(dir, "oldlogs"), + TEST_UTIL.getConfiguration()); + Appender [] appenders = null; + + final int count = THREAD_COUNT; + appenders = new Appender[count]; + try { + for (int i = 0; i < count; i++) { + // Have each appending thread write 'count' entries + appenders[i] = new Appender(wal, i, count); + } + for (int i = 0; i < count; i++) { + appenders[i].start(); + } + for (int i = 0; i < count; i++) { + //ensure that all threads are joined before closing the wal + appenders[i].join(); + } + } finally { + wal.close(); + } + for (int i = 0; i < count; i++) { + assertFalse(appenders[i].isException()); + } + } + + /** + * Appender thread. Appends to passed wal file. + */ + static class Appender extends Thread { + private final Log log; + private final HLog wal; + private final int count; + private Exception e = null; + + Appender(final HLog wal, final int index, final int count) { + super("" + index); + this.wal = wal; + this.count = count; + this.log = LogFactory.getLog("Appender:" + getName()); + } + + /** + * @return Call when the thread is done. + */ + boolean isException() { + return !isAlive() && this.e != null; + } + + Exception getException() { + return this.e; + } + + @Override + public void run() { + this.log.info(getName() +" started"); + try { + for (int i = 0; i < this.count; i++) { + long now = System.currentTimeMillis(); + // Roll every ten edits if the log has anything in it. + if (i % 10 == 0 && this.wal.getNumEntries() > 0) { + this.wal.rollWriter(); + } + WALEdit edit = new WALEdit(); + byte[] bytes = Bytes.toBytes(i); + edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY)); + + this.wal.append(HRegionInfo.FIRST_META_REGIONINFO, + HTableDescriptor.META_TABLEDESC.getName(), + edit, now, HTableDescriptor.META_TABLEDESC); + } + String msg = getName() + " finished"; + if (isException()) + this.log.info(msg, getException()); + else + this.log.info(msg); + } catch (Exception e) { + this.e = e; + log.info("Caught exception from Appender:" + getName(), e); + } + } + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (revision 1304634) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (working copy) @@ -276,7 +276,12 @@ @Override public void append(HLog.Entry entry) throws IOException { entry.setCompressionContext(compressionContext); - this.writer.append(entry.getKey(), entry.getEdit()); + try { + this.writer.append(entry.getKey(), entry.getEdit()); + } catch (NullPointerException npe) { + // Concurrent close... + throw new IOException(npe); + } } @Override @@ -311,7 +316,12 @@ @Override public long getLength() throws IOException { - return this.writer.getLength(); + try { + return this.writer.getLength(); + } catch (NullPointerException npe) { + // Concurrent close... + throw new IOException(npe); + } } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1304634) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -143,7 +143,7 @@ private volatile long syncedTillHere = 0; private long lastDeferredTxid; private final Path oldLogDir; - private boolean logRollRunning; + private volatile boolean logRollRunning; private static Class logWriterClass; private static Class logReaderClass; @@ -1227,10 +1227,8 @@ } // writes out pending entries to the HLog - void hlogFlush(Writer writer) throws IOException { - // Atomically fetch all existing pending writes. New writes - // will start accumulating in a new list. - List pending = getPendingWrites(); + void hlogFlush(Writer writer, List pending) throws IOException { + if (pending == null) return; // write out all accumulated Entries to hdfs. for (Entry e : pending) { @@ -1250,8 +1248,10 @@ // sync all transactions upto the specified txid private void syncer(long txid) throws IOException { + Writer tempWriter; synchronized (this.updateLock) { if (this.closed) return; + tempWriter = this.writer; // guaranteed non-null } // if the transaction that we are interested in is already // synced, then return immediately. @@ -1262,23 +1262,23 @@ long doneUpto = this.unflushedEntries.get(); long now = System.currentTimeMillis(); // Done in parallel for all writer threads, thanks to HDFS-895 - boolean syncSuccessful = true; + List pending = logSyncerThread.getPendingWrites(); try { // First flush all the pending writes to HDFS. Then // issue the sync to HDFS. If sync is successful, then update // syncedTillHere to indicate that transactions till this // number has been successfully synced. - logSyncerThread.hlogFlush(this.writer); - this.writer.sync(); + logSyncerThread.hlogFlush(tempWriter, pending); + pending = null; + tempWriter.sync(); syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); } catch(IOException io) { - syncSuccessful = false; - } - if (!syncSuccessful) { synchronized (this.updateLock) { - // HBASE-4387, retry with updateLock held - this.writer.sync(); + // HBASE-4387, HBASE-5623, retry with updateLock held + tempWriter = this.writer; + logSyncerThread.hlogFlush(tempWriter, pending); + tempWriter.sync(); syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); this.syncedTillHere = doneUpto; } @@ -1288,8 +1288,15 @@ syncTime.inc(System.currentTimeMillis() - now); if (!this.logRollRunning) { checkLowReplication(); - if (this.writer.getLength() > this.logrollsize) { - requestLogRoll(); + try { + if (tempWriter.getLength() > this.logrollsize) { + requestLogRoll(); + } + } catch (IOException x) { + // Ignore. + // Writer might have been closed. + // In any case, we either don't have to do anything, + // or the log will be rolled the next time. } } } catch (IOException e) {