Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -162,6 +162,8 @@ * Current log file. */ Writer writer; + List waledits = Collections.synchronizedList(new ArrayList()); + List flushEdits = new ArrayList(); /* * Map of all log files but the current one. @@ -853,18 +855,25 @@ if (this.closed) { throw new IOException("Cannot append; log is closed"); } + waledits.add(logEdit); synchronized (updateLock) { - long seqNum = obtainSeqNum(); - logKey.setLogSeqNum(seqNum); - // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region (i.e. the first edit added to the particular - // memstore). When the cache is flushed, the entry for the - // region being flushed is removed if the sequence number of the flush - // is greater than or equal to the value in lastSeqWritten. - this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), - Long.valueOf(seqNum)); - doWrite(regionInfo, logKey, logEdit, htd); - this.numEntries.incrementAndGet(); + if(waledits.isEmpty()) + return; + changeToFlush(); + for(WALEdit edit: flushEdits){ + long seqNum = obtainSeqNum(); + logKey.setLogSeqNum(seqNum); + // The 'lastSeqWritten' map holds the sequence number of the oldest + // write for each region (i.e. the first edit added to the particular + // memstore). When the cache is flushed, the entry for the + // region being flushed is removed if the sequence number of the flush + // is greater than or equal to the value in lastSeqWritten. + this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(), + Long.valueOf(seqNum)); + doWrite(regionInfo, logKey, edit, htd); + this.numEntries.incrementAndGet(); + } + flushEdits.clear(); } // Sync if catalog region, and if not then check if that table supports @@ -876,6 +885,12 @@ } } + private void changeToFlush(){ + synchronized(waledits){ + flushEdits.addAll(waledits); + waledits.clear(); + } + } /** * Append a set of edits to the log. Log edits are keyed by (encoded) * regionName, rowname, and log-sequence-id. @@ -906,20 +921,27 @@ if (this.closed) { throw new IOException("Cannot append; log is closed"); } + waledits.add(edits); + if(waledits.isEmpty()) + return; + changeToFlush(); synchronized (this.updateLock) { - long seqNum = obtainSeqNum(); - // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region (i.e. the first edit added to the particular - // memstore). . When the cache is flushed, the entry for the - // region being flushed is removed if the sequence number of the flush - // is greater than or equal to the value in lastSeqWritten. - // Use encoded name. Its shorter, guaranteed unique and a subset of - // actual name. - byte [] hriKey = info.getEncodedNameAsBytes(); - this.lastSeqWritten.putIfAbsent(hriKey, seqNum); - HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); - doWrite(info, logKey, edits, htd); - this.numEntries.incrementAndGet(); + for(WALEdit edit: flushEdits){ + long seqNum = obtainSeqNum(); + // The 'lastSeqWritten' map holds the sequence number of the oldest + // write for each region (i.e. the first edit added to the particular + // memstore). . When the cache is flushed, the entry for the + // region being flushed is removed if the sequence number of the flush + // is greater than or equal to the value in lastSeqWritten. + // Use encoded name. Its shorter, guaranteed unique and a subset of + // actual name. + byte [] hriKey = info.getEncodedNameAsBytes(); + this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); + doWrite(info, logKey, edit, htd); + this.numEntries.incrementAndGet(); + } + flushEdits.clear(); } // Sync if catalog region, and if not then check if that table supports // deferred log flushing