diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HLog.java b/src/java/org/apache/hadoop/hbase/regionserver/HLog.java index c7c4422..caa1e12 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HLog.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HLog.java @@ -349,7 +349,6 @@ public class HLog implements HConstants, Syncable { } this.numEntries.set(0); this.editsSize.set(0); - updateLock.notifyAll(); } } finally { this.cacheFlushLock.unlock(); @@ -596,7 +595,6 @@ public class HLog implements HConstants, Syncable { LOG.debug("closing hlog writer in " + this.dir.toString()); } this.writer.close(); - updateLock.notifyAll(); } } finally { cacheFlushLock.unlock(); @@ -653,8 +651,9 @@ public class HLog implements HConstants, Syncable { this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum)); boolean sync = regionInfo.isMetaRegion() || regionInfo.isRootRegion(); doWrite(logKey, logEdit, sync, logKey.getWriteTime()); + + this.unflushedEntries.incrementAndGet(); this.numEntries.incrementAndGet(); - updateLock.notifyAll(); } if (this.editsSize.get() > this.logrollsize) { if (listener != null) { @@ -706,7 +705,9 @@ public class HLog implements HConstants, Syncable { doWrite(logKey, kv, sync, now); this.numEntries.incrementAndGet(); } - updateLock.notifyAll(); + + // Only count 1 row as an unflushed entry. + this.unflushedEntries.incrementAndGet(); } if (this.editsSize.get() > this.logrollsize) { requestLogRoll(); @@ -714,20 +715,45 @@ public class HLog implements HConstants, Syncable { } public void sync() throws IOException { - lastLogFlushTime = System.currentTimeMillis(); - if (this.append && syncfs != null) { - try { - this.syncfs.invoke(this.writer, NO_ARGS); - } catch (Exception e) { - throw new IOException("Reflection", e); + sync(false); + } + + /** + * Multiple threads will call sync() at the same time, only the winner + * will actually flush if there is any race or build up. + * + * @param force sync regardless (for meta updates) if there is data + * @throws IOException + */ + public void sync(boolean force) throws IOException { + synchronized (this.updateLock) { + if (this.unflushedEntries.get() == 0) { + LOG.debug("Sync has nothing to do (someone else got here first?)"); + return; // win + } + + if (force || this.unflushedEntries.get() > this.flushlogentries) { + try { + lastLogFlushTime = System.currentTimeMillis(); + if (this.append && syncfs != null) { + try { + this.syncfs.invoke(this.writer, NO_ARGS); + } catch (Exception e) { + throw new IOException("Reflection", e); + } + } else { + this.writer.sync(); + if (this.writer_out != null) + this.writer_out.sync(); + } + this.unflushedEntries.set(0); + } catch (IOException e) { + LOG.fatal("Could not append. Requesting close of hlog", e); + requestLogRoll(); + throw e; + } } - } else { - this.writer.sync(); - // Above is sequencefile.writer sync. It doesn't actually synce the - // backing stream. Need to do the below to do that. - if (this.writer_out != null) this.writer_out.sync(); } - this.unflushedEntries.set(0); } void optionalSync() { @@ -766,9 +792,6 @@ public class HLog implements HConstants, Syncable { try { this.editsSize.addAndGet(logKey.heapSize() + logEdit.heapSize()); this.writer.append(logKey, logEdit); - if (sync || this.unflushedEntries.incrementAndGet() >= flushlogentries) { - sync(); - } long took = System.currentTimeMillis() - now; if (took > 1000) { LOG.warn(Thread.currentThread().getName() + " took " + took + @@ -854,7 +877,6 @@ public class HLog implements HConstants, Syncable { if (seq != null && logSeqId >= seq.longValue()) { this.lastSeqWritten.remove(regionName); } - updateLock.notifyAll(); } } finally { this.cacheFlushLock.unlock(); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 268b845..87de560 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1755,6 +1755,8 @@ public class HRegionServer implements HConstants, HRegionInterface, try { cacheFlusher.reclaimMemStoreMemory(); region.put(put, getLockFromId(put.getLockId())); + + this.hlog.sync(region.getRegionInfo().isMetaRegion()); } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } @@ -1765,8 +1767,10 @@ public class HRegionServer implements HConstants, HRegionInterface, // Count of Puts processed. int i = 0; checkOpen(); + boolean isMetaRegion = false; try { HRegion region = getRegion(regionName); + isMetaRegion = region.getRegionInfo().isMetaRegion(); this.cacheFlusher.reclaimMemStoreMemory(); Integer[] locks = new Integer[puts.length]; for (i = 0; i < puts.length; i++) { @@ -1774,16 +1778,22 @@ public class HRegionServer implements HConstants, HRegionInterface, locks[i] = getLockFromId(puts[i].getLockId()); region.put(puts[i], locks[i]); } + } catch (WrongRegionException ex) { LOG.debug("Batch puts: " + i, ex); - return i; } catch (NotServingRegionException ex) { - return i; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } // All have been processed successfully. - return -1; + + this.hlog.sync(isMetaRegion); + + if (i == puts.length) { + return -1; + } else { + return i; + } } /** @@ -1809,8 +1819,11 @@ public class HRegionServer implements HConstants, HRegionInterface, HRegion region = getRegion(regionName); try { cacheFlusher.reclaimMemStoreMemory(); - return region.checkAndPut(row, family, qualifier, value, put, + boolean retval = region.checkAndPut(row, family, qualifier, value, put, getLockFromId(put.getLockId()), true); + + this.hlog.sync(region.getRegionInfo().isMetaRegion()); + return retval; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } @@ -1964,7 +1977,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Integer lid = getLockFromId(delete.getLockId()); HRegion region = getRegion(regionName); region.delete(delete, lid, writeToWAL); - } catch(WrongRegionException ex) { + + this.hlog.sync(region.getRegionInfo().isMetaRegion()); + } catch (WrongRegionException ex) { } catch (NotServingRegionException ex) { } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); @@ -2449,8 +2464,12 @@ public class HRegionServer implements HConstants, HRegionInterface, requestCount.incrementAndGet(); try { HRegion region = getRegion(regionName); - return region.incrementColumnValue(row, family, qualifier, amount, + long retval = region.incrementColumnValue(row, family, qualifier, amount, writeToWAL); + + this.hlog.sync(region.getRegionInfo().isMetaRegion()); + + return retval; } catch (IOException e) { checkFileSystem(); throw e;