Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java (revision 0) @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertFalse; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test many concurrent appenders to an {@link #HLog} while rolling the log. + */ +@Category(MediumTests.class) +public class TestLogRollingNoCluster { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static byte [] EMPTY_1K_ARRAY = new byte[1024]; + private static final int THREAD_COUNT = 100; // Spin up this many threads + + /** + * Spin up a bunch of threads and have them all append to a WAL. Roll the + * WAL frequently to try and trigger NPE. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testContendedLogRolling() throws IOException, InterruptedException { + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + Path dir = TEST_UTIL.getDataTestDir(); + HLog wal = new HLog(fs, new Path(dir, "logs"), new Path(dir, "oldlogs"), + TEST_UTIL.getConfiguration()); + Appender [] appenders = null; + + final int count = THREAD_COUNT; + appenders = new Appender[count]; + try { + for (int i = 0; i < count; i++) { + // Have each appending thread write 'count' entries + appenders[i] = new Appender(wal, i, count); + } + for (int i = 0; i < count; i++) { + appenders[i].start(); + } + for (int i = 0; i < count; i++) { + //ensure that all threads are joined before closing the wal + appenders[i].join(); + } + } finally { + wal.close(); + } + for (int i = 0; i < count; i++) { + assertFalse(appenders[i].isException()); + } + } + + /** + * Appender thread. Appends to passed wal file. + */ + static class Appender extends Thread { + private final Log log; + private final HLog wal; + private final int count; + private Exception e = null; + + Appender(final HLog wal, final int index, final int count) { + super("" + index); + this.wal = wal; + this.count = count; + this.log = LogFactory.getLog("Appender:" + getName()); + } + + /** + * @return Call when the thread is done. + */ + boolean isException() { + return !isAlive() && this.e != null; + } + + Exception getException() { + return this.e; + } + + @Override + public void run() { + this.log.info(getName() +" started"); + try { + for (int i = 0; i < this.count; i++) { + long now = System.currentTimeMillis(); + // Roll every ten edits if the log has anything in it. + if (i % 10 == 0 && this.wal.getNumEntries() > 0) { + this.wal.rollWriter(); + } + WALEdit edit = new WALEdit(); + byte[] bytes = Bytes.toBytes(i); + edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY)); + + this.wal.append(HRegionInfo.FIRST_META_REGIONINFO, + HTableDescriptor.META_TABLEDESC.getName(), + edit, now, HTableDescriptor.META_TABLEDESC); + } + String msg = getName() + " finished"; + if (isException()) + this.log.info(msg, getException()); + else + this.log.info(msg); + } catch (Exception e) { + this.e = e; + log.info("Caught exception from Appender:" + getName(), e); + } + } + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (revision 1304523) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (working copy) @@ -311,7 +311,12 @@ @Override public long getLength() throws IOException { - return this.writer.getLength(); + try { + return this.writer.getLength(); + } catch (NullPointerException x) { + // A concurrent close can cause this. + throw new IOException(x); + } } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1304523) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -1231,10 +1231,20 @@ // Atomically fetch all existing pending writes. New writes // will start accumulating in a new list. List pending = getPendingWrites(); - - // write out all accumulated Entries to hdfs. - for (Entry e : pending) { - writer.append(e); + try { + // write out all accumulated Entries to hdfs. + for (Entry e : pending) { + writer.append(e); + } + } catch (Exception x) { + // Try again with lock held (guard against concurrent close of writer). + // Writing some entries twice should OK, as the logged operations are + // idempotent. + synchronized(HLog.this.updateLock) { + for (Entry e : pending) { + HLog.this.writer.append(e); + } + } } } @@ -1250,8 +1260,10 @@ // sync all transactions upto the specified txid private void syncer(long txid) throws IOException { + Writer tempWriter; synchronized (this.updateLock) { if (this.closed) return; + tempWriter = this.writer; } // if the transaction that we are interested in is already // synced, then return immediately. @@ -1268,8 +1280,8 @@ // issue the sync to HDFS. If sync is successful, then update // syncedTillHere to indicate that transactions till this // number has been successfully synced. - logSyncerThread.hlogFlush(this.writer); - this.writer.sync(); + logSyncerThread.hlogFlush(tempWriter); + tempWriter.sync(); syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); } catch(IOException io) { @@ -1288,8 +1300,14 @@ syncTime.inc(System.currentTimeMillis() - now); if (!this.logRollRunning) { checkLowReplication(); - if (this.writer.getLength() > this.logrollsize) { - requestLogRoll(); + try { + if (this.writer.getLength() > this.logrollsize) { + requestLogRoll(); + } + } catch (IOException io) { + // ignore... + // If writer was closed concurrently is was rolled already. + // Otherwise the rolling condition will be checked again the next time. } } } catch (IOException e) {