diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 9593286..a9f5d34 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -136,11 +136,11 @@ public class HLog implements Syncable { private final Path dir; private final Configuration conf; private final LogRollListener listener; + private boolean logRollRequested; private final long optionalFlushInterval; private final long blocksize; private final int flushlogentries; private final String prefix; - private final AtomicInteger unflushedEntries = new AtomicInteger(0); private final Path oldLogDir; private final List actionListeners = Collections.synchronizedList(new ArrayList()); @@ -211,6 +211,7 @@ public class HLog implements Syncable { // We synchronize on updateLock to prevent updates and to prevent a log roll // during an update + // locked during appends private final Object updateLock = new Object(); private final boolean enabled; @@ -222,11 +223,6 @@ public class HLog implements Syncable { */ private final int maxLogs; - /** - * Thread that handles group commit - */ - private final LogSyncer logSyncerThread; - private final List logEntryVisitors = new CopyOnWriteArrayList(); @@ -371,10 +367,6 @@ public class HLog implements Syncable { } else { LOG.info("getNumCurrentReplicas--HDFS-826 not available" ); } - - logSyncerThread = new LogSyncer(this.optionalFlushInterval); - Threads.setDaemonThreadRunning(logSyncerThread, - Thread.currentThread().getName() + ".logSyncer"); } /** @@ -488,6 +480,7 @@ public class HLog implements Syncable { } } this.numEntries.set(0); + this.logRollRequested = false; } } finally { this.cacheFlushLock.unlock(); @@ -709,14 +702,6 @@ public class HLog implements Syncable { * @throws IOException */ public void close() throws IOException { - try { - logSyncerThread.interrupt(); - // Make sure we synced everything - logSyncerThread.join(this.optionalFlushInterval*2); - } catch (InterruptedException e) { - LOG.error("Exception while waiting for syncer thread to die", e); - } - cacheFlushLock.lock(); try { synchronized (updateLock) { @@ -782,12 +767,11 @@ public class HLog implements Syncable { // is greater than or equal to the value in lastSeqWritten. this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum)); doWrite(regionInfo, logKey, logEdit); - this.unflushedEntries.incrementAndGet(); this.numEntries.incrementAndGet(); } // sync txn to file system - this.sync(regionInfo.isMetaRegion()); + this.sync(); } /** @@ -834,119 +818,13 @@ public class HLog implements Syncable { doWrite(info, logKey, edits); this.numEntries.incrementAndGet(); - // Only count 1 row as an unflushed entry. - this.unflushedEntries.incrementAndGet(); } // sync txn to file system - this.sync(info.isMetaRegion()); + this.sync(); } - /** - * This thread is responsible to call syncFs and buffer up the writers while - * it happens. - */ - class LogSyncer extends Thread { - - // Using fairness to make sure locks are given in order - private final ReentrantLock lock = new ReentrantLock(true); - - // Condition used to wait until we have something to sync - private final Condition queueEmpty = lock.newCondition(); - - // Condition used to signal that the sync is done - private final Condition syncDone = lock.newCondition(); - - private final long optionalFlushInterval; - - private boolean syncerShuttingDown = false; - - LogSyncer(long optionalFlushInterval) { - this.optionalFlushInterval = optionalFlushInterval; - } - - @Override - public void run() { - try { - lock.lock(); - // awaiting with a timeout doesn't always - // throw exceptions on interrupt - while(!this.isInterrupted()) { - - // Wait until something has to be hflushed or do it if we waited - // enough time (useful if something appends but does not hflush). - // 0 or less means that it timed out and maybe waited a bit more. - if (!(queueEmpty.awaitNanos( - this.optionalFlushInterval*1000000) <= 0)) { - forceSync = true; - } - - // We got the signal, let's hflush. We currently own the lock so new - // writes are waiting to acquire it in addToSyncQueue while the ones - // we hflush are waiting on await() - hflush(); - - // Release all the clients waiting on the hflush. Notice that we still - // own the lock until we get back to await at which point all the - // other threads waiting will first acquire and release locks - syncDone.signalAll(); - } - } catch (IOException e) { - LOG.error("Error while syncing, requesting close of hlog ", e); - requestLogRoll(); - } catch (InterruptedException e) { - LOG.debug(getName() + "interrupted while waiting for sync requests"); - } finally { - syncerShuttingDown = true; - syncDone.signalAll(); - lock.unlock(); - LOG.info(getName() + " exiting"); - } - } - - /** - * This method first signals the thread that there's a sync needed - * and then waits for it to happen before returning. - */ - public void addToSyncQueue(boolean force) { - - // Don't bother if somehow our append was already hflushed - if (unflushedEntries.get() == 0) { - return; - } - lock.lock(); - try { - if (syncerShuttingDown) { - LOG.warn(getName() + " was shut down while waiting for sync"); - return; - } - if(force) { - forceSync = true; - } - // Wake the thread - queueEmpty.signal(); - - // Wait for it to hflush - syncDone.await(); - } catch (InterruptedException e) { - LOG.debug(getName() + " was interrupted while waiting for sync", e); - } - finally { - lock.unlock(); - } - } - } - - public void sync(){ - sync(false); - } - - /** - * This method calls the LogSyncer in order to group commit the sync - * with other threads. - * @param force For catalog regions, force the sync to happen - */ - public void sync(boolean force) { - logSyncerThread.addToSyncQueue(force); + public void sync() throws IOException { + hflush(); } public void hflush() throws IOException { @@ -954,44 +832,47 @@ public class HLog implements Syncable { if (this.closed) { return; } - boolean logRollRequested = false; - if (this.forceSync || - this.unflushedEntries.get() >= this.flushlogentries) { - try { - long now = System.currentTimeMillis(); - this.writer.sync(); - syncTime += System.currentTimeMillis() - now; - syncOps++; - this.forceSync = false; - this.unflushedEntries.set(0); - - // if the number of replicas in HDFS has fallen below the initial - // value, then roll logs. - try { - int numCurrentReplicas = getLogReplication(); - if (numCurrentReplicas != 0 && - numCurrentReplicas < this.initialReplication) { - LOG.warn("HDFS pipeline error detected. " + - "Found " + numCurrentReplicas + " replicas but expecting " + - this.initialReplication + " replicas. " + - " Requesting close of hlog."); - requestLogRoll(); - logRollRequested = true; - } - } catch (Exception e) { - LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + - " still proceeding ahead..."); + } + try { + long now = System.currentTimeMillis(); + this.writer.sync(); + synchronized (this) { + this.syncTime += System.currentTimeMillis() - now; + this.syncOps++; + } + + synchronized (this.updateLock) { + if (!logRollRequested) { + checkLowReplication(); + if (this.writer.getLength() > this.logrollsize) { + requestLogRoll(); } - } catch (IOException e) { - LOG.fatal("Could not append. Requesting close of hlog", e); - requestLogRoll(); - throw e; } } + } catch (IOException e) { + LOG.fatal("Could not append. Requesting close of hlog", e); + requestLogRoll(); + throw e; + } + } - if (!logRollRequested && (this.writer.getLength() > this.logrollsize)) { + private void checkLowReplication() { + // if the number of replicas in HDFS has fallen below the initial + // value, then roll logs. + try { + int numCurrentReplicas = getLogReplication(); + if (numCurrentReplicas != 0 && + numCurrentReplicas < this.initialReplication) { + LOG.warn("HDFS pipeline error detected. " + + "Found " + numCurrentReplicas + " replicas but expecting " + + this.initialReplication + " replicas. " + + " Requesting close of hlog."); requestLogRoll(); + logRollRequested = true; } + } catch (Exception e) { + LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + + " still proceeding ahead..."); } } @@ -1023,8 +904,8 @@ public class HLog implements Syncable { hflush(); } - private void requestLogRoll() { - if (this.listener != null) { + private synchronized void requestLogRoll() { + if (this.listener != null && !logRollRequested) { this.listener.logRollRequested(); } } @@ -1124,7 +1005,7 @@ public class HLog implements Syncable { } } // sync txn to file system - this.sync(isMetaRegion); + this.sync(); } finally { this.cacheFlushLock.unlock(); diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index ad8f9e5..688a42a 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -334,8 +334,8 @@ public class TestHLog { wal.append(regioninfo, tableName, kvs, System.currentTimeMillis()); } // Now call sync to send the data to HDFS datanodes - wal.sync(true); - int namenodePort = cluster.getNameNodePort(); + wal.sync(); + int namenodePort = cluster.getNameNodePort(); final Path walPath = wal.computeFilename();