Index: src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (revision 1386889)
+++ src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (working copy)
@@ -23,6 +23,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
@@ -40,8 +41,8 @@
@Override
public void init(Configuration conf, FileSystem fs,
- ReplicationSourceManager manager, Stoppable stopper,
- AtomicBoolean replicating, String peerClusterId)
+ ReplicationSourceManager manager, RegionServerServices services,
+ Stoppable stopper, AtomicBoolean replicating, String peerClusterId)
throws IOException {
this.manager = manager;
this.peerClusterId = peerClusterId;
Index: src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (revision 1386889)
+++ src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (working copy)
@@ -109,7 +109,7 @@
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true"));
- replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
+ replication = new Replication(new DummyServer(), fs, logDir, oldLogDir, null);
manager = replication.getReplicationManager();
fs = FileSystem.get(conf);
oldLogDir = new Path(utility.getDataTestDir(),
@@ -195,7 +195,7 @@
hlog.rollWriter();
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
- "1", 0, false);
+ "1", 0, false, false);
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
Index: src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (revision 1386889)
+++ src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (working copy)
@@ -74,7 +74,7 @@
public void stop(String why) {}
@Override
public boolean isStopped() {return false;}
- }, FileSystem.get(conf), replicating, logDir, oldLogDir);
+ }, FileSystem.get(conf), replicating, logDir, oldLogDir, null);
}
/**
Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1386889)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy)
@@ -196,6 +196,7 @@
// The timestamp (in ms) when the log file was created.
private volatile long filenum = -1;
+ private volatile String currentFilePath;
//number of transactions in the current Hlog.
private final AtomicInteger numEntries = new AtomicInteger(0);
@@ -585,12 +586,13 @@
byte [][] regionsToFlush = null;
this.cacheFlushLock.lock();
this.logRollRunning = true;
+ Path newPath = null;
try {
// Do all the preparation outside of the updateLock to block
// as less as possible the incoming writes
long currentFilenum = this.filenum;
this.filenum = System.currentTimeMillis();
- Path newPath = computeFilename();
+ newPath = computeFilename();
HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
// Can we get at the dfsclient outputstream? If an instance of
// SFLW, it'll have done the necessary reflection to get at the
@@ -642,6 +644,7 @@
}
} finally {
this.logRollRunning = false;
+ this.currentFilePath = newPath.toUri().getRawPath();
this.cacheFlushLock.unlock();
}
return regionsToFlush;
@@ -1679,6 +1682,14 @@
return lastDeferredSeq;
}
+ /** Is the file currently open for writing by HLog. If the filename
+ * doesn't match with the currentFilePath field value, then it is
+ * assumed that the HLog instance has closed/aborted the file.
+ */
+ public boolean isFileInUse(String filename) {
+ return currentFilePath.equals(filename);
+ }
+
/**
* Pass one or more log file names and it will either dump out a text version
* on stdout or split the specified log files.
Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1386889)
+++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -1201,7 +1201,7 @@
// log directories.
try {
this.replicationHandler = Replication.isReplication(this.conf)?
- new Replication(this, this.fs, logdir, oldLogDir): null;
+ new Replication(this, this.fs, logdir, oldLogDir, this): null;
} catch (KeeperException e) {
throw new IOException("Failed replication handler create", e);
}
Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (revision 1386889)
+++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (working copy)
@@ -42,6 +42,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -87,6 +88,8 @@
private final long sleepBeforeFailover;
// Homemade executer service for replication
private final ThreadPoolExecutor executor;
+ // the region server services
+ private final RegionServerServices services;
/**
* Creates a replication manager and sets the watch on all the other
@@ -98,6 +101,7 @@
* @param replicating the status of the replication on this cluster
* @param logDir the directory that contains all hlog directories of live RSs
* @param oldLogDir the directory where old logs are archived
+ * @param services the region server services
*/
public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
final Configuration conf,
@@ -105,7 +109,8 @@
final FileSystem fs,
final AtomicBoolean replicating,
final Path logDir,
- final Path oldLogDir) {
+ final Path oldLogDir,
+ final RegionServerServices services) {
this.sources = new ArrayList();
this.replicating = replicating;
this.zkHelper = zkHelper;
@@ -133,6 +138,7 @@
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat("ReplicationExecutor-%d");
this.executor.setThreadFactory(tfb.build());
+ this.services = services;
}
/**
@@ -144,11 +150,16 @@
* @param id id of the peer cluster
* @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server
+ * @param fileInUse is the file currently in use
*/
- public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
+ public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered,
+ boolean fileInUse) {
String key = log.getName();
LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
this.zkHelper.writeReplicationStatus(key, id, position);
+ if (fileInUse) {
+ return;
+ }
synchronized (this.hlogsById) {
SortedSet hlogs = this.hlogsById.get(id);
if (!queueRecovered && hlogs.first() != key) {
@@ -198,7 +209,7 @@
*/
public ReplicationSourceInterface addSource(String id) throws IOException {
ReplicationSourceInterface src =
- getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
+ getReplicationSource(this.conf, this.fs, this, services, stopper, replicating, id);
// TODO set it to what's in ZK
src.setSourceEnabled(true);
synchronized (this.hlogsById) {
@@ -297,6 +308,7 @@
* @param conf the configuration to use
* @param fs the file system to use
* @param manager the manager to use
+ * @param services the region server services
* @param stopper the stopper object for this region server
* @param replicating the status of the replication on this cluster
* @param peerId the id of the peer cluster
@@ -307,6 +319,7 @@
final Configuration conf,
final FileSystem fs,
final ReplicationSourceManager manager,
+ final RegionServerServices services,
final Stoppable stopper,
final AtomicBoolean replicating,
final String peerId) throws IOException {
@@ -322,7 +335,7 @@
src = new ReplicationSource();
}
- src.init(conf, fs, manager, stopper, replicating, peerId);
+ src.init(conf, fs, manager, services, stopper, replicating, peerId);
return src;
}
@@ -584,7 +597,7 @@
String peerId = entry.getKey();
try {
ReplicationSourceInterface src = getReplicationSource(conf,
- fs, ReplicationSourceManager.this, stopper, replicating, peerId);
+ fs, ReplicationSourceManager.this, services, stopper, replicating, peerId);
if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (revision 1386889)
+++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (working copy)
@@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -67,7 +68,7 @@
* @throws KeeperException
*/
public Replication(final Server server, final FileSystem fs,
- final Path logDir, final Path oldLogDir)
+ final Path logDir, final Path oldLogDir, final RegionServerServices services)
throws IOException, KeeperException {
this.server = server;
this.conf = this.server.getConfiguration();
@@ -75,7 +76,7 @@
if (replication) {
this.zkHelper = new ReplicationZookeeper(server, this.replicating);
this.replicationManager = new ReplicationSourceManager(zkHelper, conf,
- this.server, fs, this.replicating, logDir, oldLogDir) ;
+ this.server, fs, this.replicating, logDir, oldLogDir, services) ;
} else {
this.replicationManager = null;
this.zkHelper = null;
Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (revision 1386889)
+++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (working copy)
@@ -50,6 +50,7 @@
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -93,6 +94,8 @@
private String peerId;
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
+ // The reference to the services provided by the RS
+ private RegionServerServices services;
// Should we stop everything?
private Stoppable stopper;
// List of chosen sinks (region servers)
@@ -154,6 +157,7 @@
public void init(final Configuration conf,
final FileSystem fs,
final ReplicationSourceManager manager,
+ final RegionServerServices services,
final Stoppable stopper,
final AtomicBoolean replicating,
final String peerClusterZnode)
@@ -182,6 +186,7 @@
this.random = new Random();
this.replicating = replicating;
this.manager = manager;
+ this.services = services;
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
@@ -289,6 +294,12 @@
}
continue;
}
+ boolean fileInUse = false;
+ //take a snapshot of whether the file is in use (for writing) NOW
+ if (services.getWAL().isFileInUse(getCurrentPath().toUri().getRawPath())) {
+ LOG.info("File " + getCurrentPath() + " in use");
+ fileInUse = true;
+ }
// Open a reader on it
if (!openReader(sleepMultiplier)) {
// Reset the sleep multiplier, else it'd be reused for the next file
@@ -307,7 +318,7 @@
boolean gotIOE = false;
currentNbEntries = 0;
try {
- if(readAllEntriesToReplicateOrNextFile()) {
+ if(readAllEntriesToReplicateOrNextFile(fileInUse)) {
continue;
}
} catch (IOException ioe) {
@@ -360,7 +371,7 @@
if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
if (this.lastLoggedPosition != this.position) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered);
+ this.peerClusterZnode, this.position, queueRecovered, fileInUse);
this.lastLoggedPosition = this.position;
}
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
@@ -369,7 +380,7 @@
continue;
}
sleepMultiplier = 1;
- shipEdits();
+ shipEdits(fileInUse);
}
if (this.conn != null) {
@@ -385,11 +396,13 @@
/**
* Read all the entries from the current log files and retain those
* that need to be replicated. Else, process the end of the current file.
+ * @param fileInUse is the file in use
* @return true if we got nothing and went to the next file, false if we got
* entries
* @throws IOException
*/
- protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
+ protected boolean readAllEntriesToReplicateOrNextFile(boolean fileInUse)
+ throws IOException{
long seenEntries = 0;
if (this.position != 0) {
this.reader.seek(this.position);
@@ -438,6 +451,9 @@
LOG.debug("currentNbOperations:" + currentNbOperations +
" and seenEntries:" + seenEntries +
" and size: " + (this.reader.getPosition() - this.position));
+ if (fileInUse) {
+ return false;
+ }
// If we didn't get anything and the queue has an object, it means we
// hit the end of the file for sure
return seenEntries == 0 && processEndOfFile();
@@ -601,7 +617,7 @@
/**
* Do the shipping logic
*/
- protected void shipEdits() {
+ protected void shipEdits(boolean fileInUse) {
int sleepMultiplier = 1;
if (this.currentNbEntries == 0) {
LOG.warn("Was given 0 edits to ship");
@@ -614,7 +630,7 @@
rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
if (this.lastLoggedPosition != this.position) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
- this.peerClusterZnode, this.position, queueRecovered);
+ this.peerClusterZnode, this.position, queueRecovered, fileInUse);
this.lastLoggedPosition = this.position;
}
this.totalReplicatedEdits += currentNbEntries;
Index: src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (revision 1386889)
+++ src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (working copy)
@@ -26,6 +26,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
/**
* Interface that defines a replication source
@@ -37,6 +38,7 @@
* @param conf the configuration to use
* @param fs the file system to use
* @param manager the manager to use
+ * @param services the region server services
* @param stopper the stopper object for this region server
* @param replicating the status of the replication on this cluster
* @param peerClusterId the id of the peer cluster
@@ -45,6 +47,7 @@
public void init(final Configuration conf,
final FileSystem fs,
final ReplicationSourceManager manager,
+ final RegionServerServices services,
final Stoppable stopper,
final AtomicBoolean replicating,
final String peerClusterId) throws IOException;