diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 1c9202a..271b94a 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -143,7 +143,7 @@ public class HLog implements Syncable { private volatile long syncedTillHere = 0; private long lastDeferredTxid; private final Path oldLogDir; - private boolean logRollRunning; + private volatile boolean logRollRunning; private static Class logWriterClass; private static Class logReaderClass; @@ -1268,7 +1268,9 @@ public class HLog implements Syncable { // issue the sync to HDFS. If sync is successful, then update // syncedTillHere to indicate that transactions till this // number has been successfully synced. - logSyncerThread.hlogFlush(this.writer); + synchronized (this.updateLock) { + logSyncerThread.hlogFlush(this.writer); + } this.writer.sync(); syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); diff --git src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java new file mode 100644 index 0000000..97e018b --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertFalse; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test many concurrent appenders to an {@link #HLog} while rolling the log. + */ +@Category(MediumTests.class) +public class TestLogRollingNoCluster { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static byte [] EMPTY_1K_ARRAY = new byte[1024]; + + /** + * Spin up a bunch of threads and have them all append to a WAL. Roll the + * WAL frequently to try and trigger NPE. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testContendedLogRolling() throws IOException, InterruptedException { + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + Path dir = TEST_UTIL.getDataTestDir(); + HLog wal = new HLog(fs, new Path(dir, "logs"), new Path(dir, "oldlogs"), + TEST_UTIL.getConfiguration()); + try { + // Spin up this many threads + final int count = 100; + Appender [] appenders = new Appender[count]; + for (int i = 0; i < count; i++) { + // Have each appending thread write 'count' entries + appenders[i] = new Appender(wal, i, count); + appenders[i].start(); + } + for (int i = 0; i < count; i++) { + appenders[i].join(); + assertFalse(appenders[i].isException()); + } + } finally { + wal.close(); + } + } + + /** + * Appender thread. Appends to passed wal file. + */ + static class Appender extends Thread { + private final Log log; + private final HLog wal; + private final int count; + private Exception e = null; + + Appender(final HLog wal, final int index, final int count) { + super("" + index); + this.wal = wal; + this.count = count; + this.log = LogFactory.getLog("Appender:" + getName()); + } + + /** + * @return Call when the thread is done. + */ + boolean isException() { + return !isAlive() && this.e != null; + } + + Exception getException() { + return this.e; + } + + @Override + public void run() { + this.log.info(getName() +" started"); + for (int i = 0; i < this.count; i++) { + long now = System.currentTimeMillis(); + // Roll every ten edits if the log has anything in it. + if (i % 10 == 0 && this.wal.getNumEntries() > 0) { + try { + this.wal.rollWriter(); + } catch (FailedLogCloseException e) { + this.e = e; + break; + } catch (IOException e) { + this.e = e; + break; + } + } + WALEdit edit = new WALEdit(); + byte [] bytes = Bytes.toBytes(i); + edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY)); + try { + this.wal.append(HRegionInfo.FIRST_META_REGIONINFO, + HTableDescriptor.META_TABLEDESC.getName(), edit, now, + HTableDescriptor.META_TABLEDESC); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + String msg = getName() + " finished"; + if (isException()) this.log.info(msg, getException()); + else this.log.info(msg); + } + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} \ No newline at end of file