Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (revision 1537818) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java (working copy) @@ -84,7 +84,7 @@ private static final String PREFIX_CLUSTER_KEY = "."; private final int VERSION_2 = -1; - private final ArrayList kvs = new ArrayList(); + private final ArrayList kvs = new ArrayList(1); /** * This variable contains the information of the column family replication settings and contains @@ -218,6 +218,7 @@ } // read in all the key values + kvs.ensureCapacity(length); for(int i=0; i< length && decoder.advance(); i++) { kvs.add(decoder.current()); } Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java (revision 1537818) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationHLogReaderManager.java (working copy) @@ -80,14 +80,11 @@ /** * Get the next entry, returned and also added in the array - * @param entriesArray - * @param currentNbEntries * @return a new entry or null * @throws IOException */ - public HLog.Entry readNextAndSetPosition(HLog.Entry[] entriesArray, - int currentNbEntries) throws IOException { - HLog.Entry entry = this.reader.next(entriesArray[currentNbEntries]); + public HLog.Entry readNextAndSetPosition() throws IOException { + HLog.Entry entry = this.reader.next(); // Store the position so that in the future the reader can start // reading from here. If the above call to next() throws an // exception, the position won't be changed and retry will happen Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1537818) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -25,7 +25,6 @@ import java.net.ConnectException; import java.net.SocketTimeoutException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -79,8 +78,6 @@ private static final Log LOG = LogFactory.getLog(ReplicationSource.class); // Queue of logs to process private PriorityBlockingQueue queue; - // container of entries to replicate - private HLog.Entry[] entriesArray; private HConnection conn; // Helper class for zookeeper private ReplicationZookeeper zkHelper; @@ -127,8 +124,6 @@ private int maxRetriesMultiplier; // Socket timeouts require even bolder actions since we don't want to DDOS private int socketTimeoutMultiplier; - // Current number of entries that we need to replicate - private int currentNbEntries = 0; // Current number of operations (Put/Delete) that we need to replicate private int currentNbOperations = 0; // Current size of data we need to replicate @@ -165,10 +160,6 @@ this.conf.getLong("replication.source.size.capacity", 1024*1024*64); this.replicationQueueNbCapacity = this.conf.getInt("replication.source.nb.capacity", 25000); - this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity]; - for (int i = 0; i < this.replicationQueueNbCapacity; i++) { - this.entriesArray[i] = new HLog.Entry(); - } this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 10); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier * maxRetriesMultiplier); @@ -382,10 +373,11 @@ boolean gotIOE = false; currentNbOperations = 0; - currentNbEntries = 0; + List entries = null; currentSize = 0; try { - if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) { + entries = readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo); + if (entries == null) { continue; } } catch (IOException ioe) { @@ -404,11 +396,10 @@ } catch (IOException e) { LOG.warn(peerClusterZnode + " Got while getting file size: ", e); } - } else if (currentNbEntries != 0) { + } else if (entries != null && !entries.isEmpty()) { LOG.warn(peerClusterZnode + " Got EOF while reading, " + "looks like this file is broken? " + currentPath); considerDumping = true; - currentNbEntries = 0; } if (considerDumping && @@ -430,7 +421,7 @@ // If we didn't get anything to replicate, or if we hit a IOE, // wait a bit and retry. // But if we need to stop, don't bother sleeping - if (this.isActive() && (gotIOE || currentNbEntries == 0)) { + if (this.isActive() && (gotIOE || entries.isEmpty())) { if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo); @@ -442,8 +433,7 @@ continue; } sleepMultiplier = 1; - shipEdits(currentWALisBeingWrittenTo); - + shipEdits(currentWALisBeingWrittenTo, entries); } if (this.conn != null) { try { @@ -459,16 +449,17 @@ * Read all the entries from the current log files and retain those * that need to be replicated. Else, process the end of the current file. * @param currentWALisBeingWrittenTo is the current WAL being written to - * @return true if we got nothing and went to the next file, false if we got - * entries + * @return null if we got nothing and went to the next file, the list of entries + * to replicate if we got any * @throws IOException */ - protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo) + protected List readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo) throws IOException{ long seenEntries = 0; this.repLogReader.seek(); + List entries = new ArrayList(1); HLog.Entry entry = - this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries); + this.repLogReader.readNextAndSetPosition(); while (entry != null) { WALEdit edit = entry.getEdit(); this.metrics.logEditsReadRate.inc(1); @@ -499,7 +490,7 @@ edit.addClusterId(clusterId); } currentNbOperations += countDistinctRowKeys(edit); - currentNbEntries++; + entries.add(entry); currentSize += entry.getEdit().heapSize(); } else { this.metrics.logEditsFilteredRate.inc(1); @@ -507,11 +498,11 @@ } // Stop if too many entries or too big if (currentSize >= this.replicationQueueSizeCapacity || - currentNbEntries >= this.replicationQueueNbCapacity) { + entries.size() >= this.replicationQueueNbCapacity) { break; } try { - entry = this.repLogReader.readNextAndSetPosition(this.entriesArray, this.currentNbEntries); + entry = this.repLogReader.readNextAndSetPosition(); } catch (IOException ie) { LOG.debug("Break on IOE: " + ie.getMessage()); break; @@ -521,11 +512,11 @@ " and seenEntries:" + seenEntries + " and size: " + this.currentSize); if (currentWALisBeingWrittenTo) { - return false; + return entries; } // If we didn't get anything and the queue has an object, it means we // hit the end of the file for sure - return seenEntries == 0 && processEndOfFile(); + return seenEntries == 0 && processEndOfFile() ? null : entries; } private void connectToPeers() { @@ -704,9 +695,9 @@ * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) * written to when this method was called */ - protected void shipEdits(boolean currentWALisBeingWrittenTo) { + protected void shipEdits(boolean currentWALisBeingWrittenTo, List entries) { int sleepMultiplier = 1; - if (this.currentNbEntries == 0) { + if (entries.isEmpty()) { LOG.warn("Was given 0 edits to ship"); return; } @@ -719,19 +710,20 @@ } try { HRegionInterface rrs = getRS(); - LOG.debug("Replicating " + currentNbEntries); - rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries)); + LOG.debug("Replicating " + entries.size()); + // can't avoid the copy here, the replicateLogEntries RPC require an HLog.Entry[] + rrs.replicateLogEntries(entries.toArray(new HLog.Entry[entries.size()])); if (this.lastLoggedPosition != this.repLogReader.getPosition()) { this.manager.logPositionAndCleanOldLogs(this.currentPath, this.peerClusterZnode, this.repLogReader.getPosition(), queueRecovered, currentWALisBeingWrittenTo); this.lastLoggedPosition = this.repLogReader.getPosition(); } - this.totalReplicatedEdits += currentNbEntries; + this.totalReplicatedEdits += entries.size(); this.metrics.shippedBatchesRate.inc(1); this.metrics.shippedOpsRate.inc( this.currentNbOperations); this.metrics.setAgeOfLastShippedOp( - this.entriesArray[currentNbEntries-1].getKey().getWriteTime()); + entries.get(entries.size()-1).getKey().getWriteTime()); LOG.debug("Replicated in total: " + this.totalReplicatedEdits); break;