Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1399881) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -634,12 +634,6 @@ if (nextWriter instanceof SequenceFileLogWriter) { nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream(); } - // Tell our listeners that a new log was created - if (!this.listeners.isEmpty()) { - for (WALActionsListener i : this.listeners) { - i.postLogRoll(oldPath, newPath); - } - } synchronized (updateLock) { // Clean up current writer. @@ -655,6 +649,13 @@ " for " + FSUtils.getPath(newPath)); this.numEntries.set(0); } + // Tell our listeners that a new log was created + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.postLogRoll(oldPath, newPath); + } + } + // Can we delete any of the old log files? if (this.outputfiles.size() > 0) { if (this.lastSeqWritten.isEmpty()) { Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1399881) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy) @@ -188,12 +188,12 @@ @Override public void preLogRoll(Path oldPath, Path newPath) throws IOException { - // Not interested + getReplicationManager().preLogRoll(newPath); } @Override public void postLogRoll(Path oldPath, Path newPath) throws IOException { - getReplicationManager().logRolled(newPath); + getReplicationManager().postLogRoll(newPath); } @Override Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1399881) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy) @@ -286,6 +286,18 @@ } continue; } + boolean currentWALisBeingWrittenTo = false; + //For WAL files we own (rather than recovered), take a snapshot of whether the + //current WAL file (this.currentPath) is in use (for writing) NOW! + //Since the new WAL paths are enqueued only after the prev WAL file + //is 'closed', presence of an element in the queue means that + //the previous WAL file was closed, else the file is in use (currentPath) + //We take the snapshot now so that we are protected against races + //where a new file gets enqueued while the current file is being processed + //(and where we just finished reading the current file). + if (!this.queueRecovered && queue.size() == 0) { + currentWALisBeingWrittenTo = true; + } // Open a reader on it if (!openReader(sleepMultiplier)) { // Reset the sleep multiplier, else it'd be reused for the next file @@ -304,7 +316,7 @@ boolean gotIOE = false; currentNbEntries = 0; try { - if(readAllEntriesToReplicateOrNextFile()) { + if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo)) { continue; } } catch (IOException ioe) { @@ -353,7 +365,7 @@ if (this.isActive() && (gotIOE || currentNbEntries == 0)) { if (this.lastLoggedPosition != this.position) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); + this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo); this.lastLoggedPosition = this.position; } if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { @@ -362,7 +374,7 @@ continue; } sleepMultiplier = 1; - shipEdits(); + shipEdits(currentWALisBeingWrittenTo); } if (this.conn != null) { @@ -378,11 +390,13 @@ /** * Read all the entries from the current log files and retain those * that need to be replicated. Else, process the end of the current file. + * @param currentWALisBeingWrittenTo is the current WAL being written to * @return true if we got nothing and went to the next file, false if we got * entries * @throws IOException */ - protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{ + protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo) + throws IOException{ long seenEntries = 0; if (this.position != 0) { this.reader.seek(this.position); @@ -432,6 +446,9 @@ LOG.debug("currentNbOperations:" + currentNbOperations + " and seenEntries:" + seenEntries + " and size: " + (this.reader.getPosition() - startPosition)); + if (currentWALisBeingWrittenTo) { + return false; + } // If we didn't get anything and the queue has an object, it means we // hit the end of the file for sure return seenEntries == 0 && processEndOfFile(); @@ -604,8 +621,10 @@ /** * Do the shipping logic + * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) + * written to when this method was called */ - protected void shipEdits() { + protected void shipEdits(boolean currentWALisBeingWrittenTo) { int sleepMultiplier = 1; if (this.currentNbEntries == 0) { LOG.warn("Was given 0 edits to ship"); @@ -624,7 +643,7 @@ rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries)); if (this.lastLoggedPosition != this.position) { this.manager.logPositionAndCleanOldLogs(this.currentPath, - this.peerClusterZnode, this.position, queueRecovered); + this.peerClusterZnode, this.position, queueRecovered, currentWALisBeingWrittenTo); this.lastLoggedPosition = this.position; } this.totalReplicatedEdits += currentNbEntries; Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1399881) +++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy) @@ -145,11 +145,16 @@ * @param id id of the peer cluster * @param position current location in the log * @param queueRecovered indicates if this queue comes from another region server + * @param holdLogInZK if true then the log is retained in ZK */ - public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) { + public void logPositionAndCleanOldLogs(Path log, String id, long position, + boolean queueRecovered, boolean holdLogInZK) { String key = log.getName(); LOG.info("Going to report log #" + key + " for position " + position + " in " + log); this.zkHelper.writeReplicationStatus(key, id, position); + if (holdLogInZK) { + return; + } synchronized (this.hlogsById) { SortedSet hlogs = this.hlogsById.get(id); if (!queueRecovered && hlogs.first() != key) { @@ -251,7 +256,7 @@ return this.sources; } - void logRolled(Path newLog) throws IOException { + void preLogRoll(Path newLog) throws IOException { if (!this.replicating.get()) { LOG.warn("Replication stopped, won't add new log"); return; @@ -277,6 +282,14 @@ } this.latestPath = newLog; + } + + void postLogRoll(Path newLog) throws IOException { + if (!this.replicating.get()) { + LOG.warn("Replication stopped, won't add new log"); + return; + } + // This only updates the sources we own, not the recovered ones for (ReplicationSourceInterface source : this.sources) { source.enqueueLog(newLog); Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1399881) +++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy) @@ -202,7 +202,7 @@ hlog.rollWriter(); manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), - "1", 0, false); + "1", 0, false, false); HLogKey key = new HLogKey(hri.getRegionName(), test, seq++, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);