diff --git src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 1c9202a..15d382b 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; @@ -129,7 +130,7 @@ public class HLog implements Syncable { private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; - + private final FileSystem fs; private final Path dir; private final Configuration conf; @@ -143,7 +144,7 @@ public class HLog implements Syncable { private volatile long syncedTillHere = 0; private long lastDeferredTxid; private final Path oldLogDir; - private boolean logRollRunning; + private volatile boolean logRollRunning; private static Class logWriterClass; private static Class logReaderClass; @@ -155,7 +156,7 @@ public class HLog implements Syncable { } private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer - // Minimum tolerable replicas, if the actual value is lower than it, + // Minimum tolerable replicas, if the actual value is lower than it, // rollWriter will be triggered private int minTolerableReplication; private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas @@ -181,7 +182,7 @@ public class HLog implements Syncable { /* * Current log file. */ - Writer writer; + final AtomicReference writerRef = new AtomicReference(); /* * Map of all log files but the current one. @@ -301,7 +302,7 @@ public class HLog implements Syncable { //For measuring slow HLog appends private static AtomicLong slowHLogAppendCount = new AtomicLong(); private static Metric slowHLogAppendTime = new Metric(); - + public static Metric getWriteTime() { return writeTime.get(); } @@ -317,11 +318,11 @@ public class HLog implements Syncable { public static long getSyncBatchSize() { return syncBatchSize.getAndSet(0); } - + public static long getSlowAppendCount() { return slowHLogAppendCount.get(); } - + public static Metric getSlowAppendTime() { return slowHLogAppendTime.get(); } @@ -577,7 +578,7 @@ public class HLog implements Syncable { public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException { // Return if nothing to flush. - if (!force && this.writer != null && this.numEntries.get() <= 0) { + if (!force && this.writerRef.get() != null && this.numEntries.get() <= 0) { return null; } byte [][] regionsToFlush = null; @@ -622,7 +623,7 @@ public class HLog implements Syncable { synchronized (updateLock) { // Clean up current writer. Path oldFile = cleanupCurrentWriter(currentFilenum); - this.writer = nextWriter; + this.writerRef.set(nextWriter); this.hdfs_out = nextHdfsOut; LOG.info((oldFile != null? @@ -658,7 +659,7 @@ public class HLog implements Syncable { /** * This method allows subclasses to inject different writers without having to * extend other methods like rollWriter(). - * + * * @param fs * @param path * @param conf @@ -830,7 +831,7 @@ public class HLog implements Syncable { */ Path cleanupCurrentWriter(final long currentfilenum) throws IOException { Path oldFile = null; - if (this.writer != null) { + if (this.writerRef.get() != null) { // Close the current writer, get a new one. try { // Wait till all current transactions are written to the hlog. @@ -842,8 +843,8 @@ public class HLog implements Syncable { " synced till here " + syncedTillHere); sync(); } - this.writer.close(); - this.writer = null; + this.writerRef.get().close(); + this.writerRef.set(null); closeErrorCount.set(0); } catch (IOException e) { LOG.error("Failed close of HLog writer", e); @@ -980,8 +981,8 @@ public class HLog implements Syncable { if (LOG.isDebugEnabled()) { LOG.debug("closing hlog writer in " + this.dir.toString()); } - if (this.writer != null) { - this.writer.close(); + if (this.writerRef.get() != null) { + this.writerRef.get().close(); } } } finally { @@ -1118,7 +1119,7 @@ public class HLog implements Syncable { } // Sync if catalog region, and if not then check if that table supports // deferred log flushing - if (doSync && + if (doSync && (info.isMetaRegion() || !htd.isDeferredLogFlush())) { // sync txn to file system @@ -1140,7 +1141,7 @@ public class HLog implements Syncable { * @return txid of this transaction * @throws IOException */ - public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, + public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd) throws IOException { return append(info, tableName, edits, clusterId, now, htd, false); @@ -1159,7 +1160,7 @@ public class HLog implements Syncable { * @return txid of this transaction * @throws IOException */ - public long append(HRegionInfo info, byte [] tableName, WALEdit edits, + public long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, final long now, HTableDescriptor htd) throws IOException { return append(info, tableName, edits, clusterId, now, htd, true); @@ -1172,13 +1173,13 @@ public class HLog implements Syncable { class LogSyncer extends HasThread { private final long optionalFlushInterval; - + private boolean closeLogSyncer = false; // List of pending writes to the HLog. There corresponds to transactions // that have not yet returned to the client. We keep them cached here - // instead of writing them to HDFS piecemeal, because the HDFS write - // method is pretty heavyweight as far as locking is concerned. The + // instead of writing them to HDFS piecemeal, because the HDFS write + // method is pretty heavyweight as far as locking is concerned. The // goal is to increase the batchsize for writing-to-hdfs as well as // sync-to-hdfs, so that we can get better system throughput. private List pendingWrites = new LinkedList(); @@ -1227,17 +1228,33 @@ public class HLog implements Syncable { } // writes out pending entries to the HLog - void hlogFlush(Writer writer) throws IOException { + void hlogFlush(AtomicReference writerRef) throws IOException { // Atomically fetch all existing pending writes. New writes // will start accumulating in a new list. List pending = getPendingWrites(); + if (pending.isEmpty()) { + return; + } - // write out all accumulated Entries to hdfs. - for (Entry e : pending) { - writer.append(e); + try { + Writer writer = writerRef.get(); + // write out all accumulated Entries to hdfs. + for (Entry e : pending) { + writer.append(e); + } + } catch (Exception ex) { + // ensure that we hold the updateLock, not to collide with logRoll. + // HBASE-5623 + synchronized (updateLock) { + Writer writer = writerRef.get(); + // write out all accumulated Entries to hdfs. + for (Entry e : pending) { + writer.append(e); + } + } } } - + void close(){ closeLogSyncer = true; } @@ -1250,10 +1267,9 @@ public class HLog implements Syncable { // sync all transactions upto the specified txid private void syncer(long txid) throws IOException { - synchronized (this.updateLock) { - if (this.closed) return; - } - // if the transaction that we are interested in is already + if (this.closed) return; + + // if the transaction that we are interested in is already // synced, then return immediately. if (txid <= this.syncedTillHere) { return; @@ -1264,21 +1280,24 @@ public class HLog implements Syncable { // Done in parallel for all writer threads, thanks to HDFS-895 boolean syncSuccessful = true; try { - // First flush all the pending writes to HDFS. Then + // First flush all the pending writes to HDFS. Then // issue the sync to HDFS. If sync is successful, then update // syncedTillHere to indicate that transactions till this // number has been successfully synced. - logSyncerThread.hlogFlush(this.writer); - this.writer.sync(); + logSyncerThread.hlogFlush(writerRef); + writerRef.get().sync(); syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); } catch(IOException io) { syncSuccessful = false; + } catch(NullPointerException npe) { + //HBASE-5623. When writer is rolling, the writer might have already been closed, and fields set to null + syncSuccessful = false; } if (!syncSuccessful) { synchronized (this.updateLock) { // HBASE-4387, retry with updateLock held - this.writer.sync(); + writerRef.get().sync(); syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); this.syncedTillHere = doneUpto; } @@ -1288,8 +1307,16 @@ public class HLog implements Syncable { syncTime.inc(System.currentTimeMillis() - now); if (!this.logRollRunning) { checkLowReplication(); - if (this.writer.getLength() > this.logrollsize) { - requestLogRoll(); + try { + if (writerRef.get().getLength() > this.logrollsize) { + requestLogRoll(); + } + } catch (Exception ex) { + // not really a problem + // if writer was just closed we're fine anyway + // other rolling condition will be checked again + // next time + LOG.debug("Log roll failed", ex); } } } catch (IOException e) { @@ -1609,7 +1636,7 @@ public class HLog implements Syncable { /** * Get LowReplication-Roller status - * + * * @return lowReplicationRollEnabled */ public boolean isLowReplicationRollEnabled() { @@ -1713,13 +1740,13 @@ public class HLog implements Syncable { /** * Get the directory we are making logs in. - * + * * @return dir */ protected Path getDir() { return dir; } - + public static boolean validateHLogFilename(String filename) { return pattern.matcher(filename).matches(); }