Index: hbase-protocol/src/main/protobuf/ZooKeeper.proto
===================================================================
--- hbase-protocol/src/main/protobuf/ZooKeeper.proto (revision 1548338)
+++ hbase-protocol/src/main/protobuf/ZooKeeper.proto (working copy)
@@ -87,6 +87,7 @@
}
required State state = 1;
required ServerName server_name = 2;
+ optional uint64 ts = 3;
}
/**
Index: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
===================================================================
--- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java (revision 1548338)
+++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java (working copy)
@@ -3230,6 +3230,16 @@
* required .ServerName server_name = 2;
*/
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder();
+
+ // optional uint64 ts = 3;
+ /**
+ * optional uint64 ts = 3;
+ */
+ boolean hasTs();
+ /**
+ * optional uint64 ts = 3;
+ */
+ long getTs();
}
/**
* Protobuf type {@code SplitLogTask}
@@ -3312,6 +3322,11 @@
bitField0_ |= 0x00000002;
break;
}
+ case 24: {
+ bitField0_ |= 0x00000004;
+ ts_ = input.readUInt64();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3499,9 +3514,26 @@
return serverName_;
}
+ // optional uint64 ts = 3;
+ public static final int TS_FIELD_NUMBER = 3;
+ private long ts_;
+ /**
+ * optional uint64 ts = 3;
+ */
+ public boolean hasTs() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * optional uint64 ts = 3;
+ */
+ public long getTs() {
+ return ts_;
+ }
+
private void initFields() {
state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED;
serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
+ ts_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -3533,6 +3565,9 @@
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeMessage(2, serverName_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeUInt64(3, ts_);
+ }
getUnknownFields().writeTo(output);
}
@@ -3550,6 +3585,10 @@
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(2, serverName_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt64Size(3, ts_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -3583,6 +3622,11 @@
result = result && getServerName()
.equals(other.getServerName());
}
+ result = result && (hasTs() == other.hasTs());
+ if (hasTs()) {
+ result = result && (getTs()
+ == other.getTs());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -3604,6 +3648,10 @@
hash = (37 * hash) + SERVER_NAME_FIELD_NUMBER;
hash = (53 * hash) + getServerName().hashCode();
}
+ if (hasTs()) {
+ hash = (37 * hash) + TS_FIELD_NUMBER;
+ hash = (53 * hash) + hashLong(getTs());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -3728,6 +3776,8 @@
serverNameBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000002);
+ ts_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@@ -3768,6 +3818,10 @@
} else {
result.serverName_ = serverNameBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.ts_ = ts_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -3790,6 +3844,9 @@
if (other.hasServerName()) {
mergeServerName(other.getServerName());
}
+ if (other.hasTs()) {
+ setTs(other.getTs());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -3982,6 +4039,39 @@
return serverNameBuilder_;
}
+ // optional uint64 ts = 3;
+ private long ts_ ;
+ /**
+ * optional uint64 ts = 3;
+ */
+ public boolean hasTs() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * optional uint64 ts = 3;
+ */
+ public long getTs() {
+ return ts_;
+ }
+ /**
+ * optional uint64 ts = 3;
+ */
+ public Builder setTs(long value) {
+ bitField0_ |= 0x00000004;
+ ts_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional uint64 ts = 3;
+ */
+ public Builder clearTs() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ ts_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:SplitLogTask)
}
@@ -9399,29 +9489,29 @@
"gionTransition\022\027\n\017event_type_code\030\001 \002(\r\022" +
"\023\n\013region_name\030\002 \002(\014\022\023\n\013create_time\030\003 \002(" +
"\004\022 \n\013server_name\030\004 \002(\0132\013.ServerName\022\017\n\007p" +
- "ayload\030\005 \001(\014\"\231\001\n\014SplitLogTask\022\"\n\005state\030\001" +
+ "ayload\030\005 \001(\014\"\245\001\n\014SplitLogTask\022\"\n\005state\030\001" +
" \002(\0162\023.SplitLogTask.State\022 \n\013server_name",
- "\030\002 \002(\0132\013.ServerName\"C\n\005State\022\016\n\nUNASSIGN" +
- "ED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002\022\010\n\004DONE\020\003\022" +
- "\007\n\003ERR\020\004\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table" +
- ".State:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n" +
- "\010DISABLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003" +
- "\"%\n\017ReplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"" +
- "^\n\020ReplicationState\022&\n\005state\030\001 \002(\0162\027.Rep" +
- "licationState.State\"\"\n\005State\022\013\n\007ENABLED\020" +
- "\000\022\014\n\010DISABLED\020\001\"+\n\027ReplicationHLogPositi" +
- "on\022\020\n\010position\030\001 \002(\003\"%\n\017ReplicationLock\022",
- "\022\n\nlock_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntab" +
- "le_name\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030" +
- "\002 \001(\0132\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n" +
- "\tis_shared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013cre" +
- "ate_time\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013fam" +
- "ily_name\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026R" +
- "egionStoreSequenceIds\022 \n\030last_flushed_se" +
- "quence_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003" +
- "(\0132\020.StoreSequenceIdBE\n*org.apache.hadoo" +
- "p.hbase.protobuf.generatedB\017ZooKeeperPro",
- "tosH\001\210\001\001\240\001\001"
+ "\030\002 \002(\0132\013.ServerName\022\n\n\002ts\030\003 \001(\004\"C\n\005State" +
+ "\022\016\n\nUNASSIGNED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020" +
+ "\002\022\010\n\004DONE\020\003\022\007\n\003ERR\020\004\"n\n\005Table\022$\n\005state\030\001" +
+ " \002(\0162\014.Table.State:\007ENABLED\"?\n\005State\022\013\n\007" +
+ "ENABLED\020\000\022\014\n\010DISABLED\020\001\022\r\n\tDISABLING\020\002\022\014" +
+ "\n\010ENABLING\020\003\"%\n\017ReplicationPeer\022\022\n\nclust" +
+ "erkey\030\001 \002(\t\"^\n\020ReplicationState\022&\n\005state" +
+ "\030\001 \002(\0162\027.ReplicationState.State\"\"\n\005State" +
+ "\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replicati" +
+ "onHLogPosition\022\020\n\010position\030\001 \002(\003\"%\n\017Repl",
+ "icationLock\022\022\n\nlock_owner\030\001 \002(\t\"\230\001\n\tTabl" +
+ "eLock\022\036\n\ntable_name\030\001 \001(\0132\n.TableName\022\037\n" +
+ "\nlock_owner\030\002 \001(\0132\013.ServerName\022\021\n\tthread" +
+ "_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose\030" +
+ "\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\";\n\017StoreSeque" +
+ "nceId\022\023\n\013family_name\030\001 \002(\014\022\023\n\013sequence_i" +
+ "d\030\002 \002(\004\"g\n\026RegionStoreSequenceIds\022 \n\030las" +
+ "t_flushed_sequence_id\030\001 \002(\004\022+\n\021store_seq" +
+ "uence_id\030\002 \003(\0132\020.StoreSequenceIdBE\n*org." +
+ "apache.hadoop.hbase.protobuf.generatedB\017",
+ "ZooKeeperProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9457,7 +9547,7 @@
internal_static_SplitLogTask_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SplitLogTask_descriptor,
- new java.lang.String[] { "State", "ServerName", });
+ new java.lang.String[] { "State", "ServerName", "Ts", });
internal_static_Table_descriptor =
getDescriptor().getMessageTypes().get(5);
internal_static_Table_fieldAccessorTable = new
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java (revision 1548338)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java (working copy)
@@ -77,7 +77,8 @@
@Test
public void testSplitLogTask() throws DeserializationException {
- SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"));
+ SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"),
+ HConstants.LEASE_RECOVERY_UNREQUESTED);
byte [] bytes = slt.toByteArray();
SplitLogTask sltDeserialized = SplitLogTask.parseFrom(bytes);
assertTrue(slt.equals(sltDeserialized));
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java (revision 1548338)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java (working copy)
@@ -24,6 +24,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.junit.Before;
@@ -64,7 +65,8 @@
// Fail four times and pass on the fifth.
Mockito.when(dfs.recoverLease(FILE)).
thenReturn(false).thenReturn(false).thenReturn(false).thenReturn(false).thenReturn(true);
- assertTrue(this.fsHDFSUtils.recoverDFSFileLease(dfs, FILE, HTU.getConfiguration(), reporter));
+ assertTrue(this.fsHDFSUtils.recoverDFSFileLease(dfs, FILE, HTU.getConfiguration(), reporter,
+ HConstants.LEASE_RECOVERY_UNREQUESTED));
Mockito.verify(dfs, Mockito.times(5)).recoverLease(FILE);
// Make sure we waited at least hbase.lease.recovery.dfs.timeout * 3 (the first two
// invocations will happen pretty fast... the we fall into the longer wait loop).
@@ -89,8 +91,8 @@
Mockito.when(dfs.recoverLease(FILE)).
thenReturn(false).thenReturn(false).thenReturn(true);
Mockito.when(dfs.isFileClosed(FILE)).thenReturn(true);
- assertTrue(this.fsHDFSUtils.recoverDFSFileLease(dfs, FILE, HTU.getConfiguration(), reporter));
- Mockito.verify(dfs, Mockito.times(2)).recoverLease(FILE);
+ assertTrue(this.fsHDFSUtils.recoverDFSFileLease(dfs, FILE, HTU.getConfiguration(), reporter,
+ HConstants.LEASE_RECOVERY_UNREQUESTED));
Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (revision 1548338)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (working copy)
@@ -837,7 +837,7 @@
t.setDaemon(true);
t.start();
try {
- logSplitter.splitLogFile(logfiles[0], null);
+ logSplitter.splitLogFile(logfiles[0], null, HConstants.LEASE_RECOVERY_UNREQUESTED);
fail("Didn't throw!");
} catch (IOException ioe) {
assertTrue(ioe.toString().contains("Injected"));
@@ -1056,7 +1056,7 @@
}
};
- logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
+ logSplitter.splitLogFile(fs.getFileStatus(logPath), null,HConstants.LEASE_RECOVERY_UNREQUESTED);
// Verify number of written edits per region
Map outputCounts = logSplitter.outputSink.getOutputCounts();
@@ -1308,7 +1308,7 @@
}
};
try{
- logSplitter.splitLogFile(logfiles[0], null);
+ logSplitter.splitLogFile(logfiles[0], null, HConstants.LEASE_RECOVERY_UNREQUESTED);
} catch (IOException e) {
LOG.info(e);
Assert.fail("Throws IOException when spliting "
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (revision 1548338)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (working copy)
@@ -524,7 +524,8 @@
FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration());
for (Path p : paths) {
LOG.debug("recovering lease for " + p);
- fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null);
+ fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, TEST_UTIL.getConfiguration(),
+ null, HConstants.LEASE_RECOVERY_UNREQUESTED);
LOG.debug("Reading HLog "+FSUtils.getPath(p));
HLog.Reader reader = null;
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (revision 1548338)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (working copy)
@@ -437,8 +437,8 @@
public Exception exception = null;
public void run() {
try {
- FSUtils.getInstance(fs, rlConf)
- .recoverFileLease(recoveredFs, walPath, rlConf, null);
+ FSUtils.getInstance(fs, rlConf).recoverFileLease(recoveredFs, walPath, rlConf, null,
+ HConstants.LEASE_RECOVERY_UNREQUESTED);
} catch (IOException e) {
exception = e;
}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (revision 1548338)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (working copy)
@@ -31,6 +31,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
@@ -125,7 +126,7 @@
new SplitLogWorker.TaskExecutor() {
@Override
- public Status exec(String name, CancelableProgressable p) {
+ public Status exec(String name, CancelableProgressable p, long ts) {
while (true) {
try {
Thread.sleep(1000);
@@ -148,8 +149,9 @@
final ServerName RS = ServerName.valueOf("rs,1,1");
RegionServerServices mockedRS = getRegionServer(RS);
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS),
- new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"),
+ HConstants.LEASE_RECOVERY_UNREQUESTED).toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
SplitLogWorker slw =
new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
@@ -183,8 +185,8 @@
final ServerName SVR1 = ServerName.valueOf("svr1,1,1");
final ServerName SVR2 = ServerName.valueOf("svr2,1,1");
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT),
- new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ new SplitLogTask.Unassigned(MANAGER, HConstants.LEASE_RECOVERY_UNREQUESTED).toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
RegionServerServices mockedRS1 = getRegionServer(SVR1);
RegionServerServices mockedRS2 = getRegionServer(SVR2);
SplitLogWorker slw1 =
@@ -225,15 +227,15 @@
// this time create a task node after starting the splitLogWorker
zkw.getRecoverableZooKeeper().create(PATH,
- new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ new SplitLogTask.Unassigned(MANAGER, HConstants.LEASE_RECOVERY_UNREQUESTED).toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 8000);
assertEquals(1, slw.taskReadySeq);
byte [] bytes = ZKUtil.getData(zkw, PATH);
SplitLogTask slt = SplitLogTask.parseFrom(bytes);
assertTrue(slt.isOwned(SRV));
- slt = new SplitLogTask.Unassigned(MANAGER);
+ slt = new SplitLogTask.Unassigned(MANAGER, HConstants.LEASE_RECOVERY_UNREQUESTED);
ZKUtil.setData(zkw, PATH, slt.toByteArray());
waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1500);
} finally {
@@ -256,7 +258,8 @@
Thread.sleep(100);
waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, 1500);
- SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER);
+ SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER,
+ HConstants.LEASE_RECOVERY_UNREQUESTED);
zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -296,7 +299,7 @@
Thread.sleep(100);
String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
- SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER);
+ SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER, HConstants.LEASE_RECOVERY_UNREQUESTED);
zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
@@ -349,8 +352,9 @@
for (int i = 0; i < maxTasks; i++) {
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
- new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
- Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"),
+ HConstants.LEASE_RECOVERY_UNREQUESTED).toByteArray(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
@@ -392,9 +396,9 @@
for (int i = 0; i < maxTasks; i++) {
zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i),
- new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(),
- Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"),
+ HConstants.LEASE_RECOVERY_UNREQUESTED).toByteArray(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
}
SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask);
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (revision 1548338)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (working copy)
@@ -32,6 +32,7 @@
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@@ -965,7 +966,8 @@
// slm.splitLogDistributed(logDir);
FileStatus[] logfiles = fs.listStatus(logDir);
TaskBatch batch = new TaskBatch();
- slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch);
+ slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch,
+ HConstants.LEASE_RECOVERY_UNREQUESTED);
//waitForCounter but for one of the 2 counters
long curt = System.currentTimeMillis();
long waitTime = 80000;
@@ -1079,7 +1081,7 @@
// is set to true, until it is interrupted.
slm.splitLogDistributed(logDir);
} catch (IOException e) {
- assertTrue(Thread.currentThread().isInterrupted());
+ assertTrue(e instanceof InterruptedIOException);
return;
}
fail("did not get the expected IOException from the 2nd call");
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (revision 1548338)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (working copy)
@@ -178,7 +178,7 @@
zkw.registerListener(listener);
ZKUtil.watchAndCheckExists(zkw, tasknode);
- slm.enqueueSplitTask(name, batch);
+ slm.enqueueSplitTask(name, batch, HConstants.LEASE_RECOVERY_UNREQUESTED);
assertEquals(1, batch.installed);
assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch);
assertEquals(1L, tot_mgr_node_create_queued.get());
@@ -238,7 +238,8 @@
" startup");
String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash");
//create an unassigned orphan task
- SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER);
+ SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER,
+ HConstants.LEASE_RECOVERY_UNREQUESTED);
zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
int version = ZKUtil.checkExists(zkw, tasknode);
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java (revision 1548338)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java (working copy)
@@ -36,10 +36,16 @@
public class SplitLogTask {
private final ServerName originServer;
private final ZooKeeperProtos.SplitLogTask.State state;
+ // if non-zero, the time when lease recovery was submitted
+ private final long ts;
+ /*
+ * @param originServer
+ * @param ts timestamp of lease recovery attempt
+ */
public static class Unassigned extends SplitLogTask {
- public Unassigned(final ServerName originServer) {
- super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED);
+ public Unassigned(final ServerName originServer, final long ts) {
+ super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED, ts);
}
}
@@ -68,17 +74,34 @@
}
SplitLogTask(final ZooKeeperProtos.SplitLogTask slt) {
- this(ProtobufUtil.toServerName(slt.getServerName()), slt.getState());
+ this(ProtobufUtil.toServerName(slt.getServerName()), slt.getState(), slt.getTs());
}
SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state) {
this.originServer = originServer;
this.state = state;
+ this.ts = HConstants.LEASE_RECOVERY_UNREQUESTED;
}
+ /*
+ * @param originServer
+ * @param state
+ * @param ts timestamp of lease recovery attempt
+ */
+ SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state,
+ final long ts) {
+ this.originServer = originServer;
+ this.state = state;
+ this.ts = ts;
+ }
+
public ServerName getServerName() {
return this.originServer;
}
+
+ public long getTs() {
+ return this.ts;
+ }
public boolean isUnassigned(final ServerName sn) {
return this.originServer.equals(sn) && isUnassigned();
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java (revision 1548338)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java (working copy)
@@ -55,11 +55,12 @@
private final AtomicInteger inProgressTasks;
private final MutableInt curTaskZKVersion;
private final TaskExecutor splitTaskExecutor;
+ private final long ts;
public HLogSplitterHandler(final Server server, String curTask,
final MutableInt curTaskZKVersion,
CancelableProgressable reporter,
- AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) {
+ AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, long ts) {
super(server, EventType.RS_LOG_REPLAY);
this.curTask = curTask;
this.wal = ZKSplitLog.getFileName(curTask);
@@ -70,13 +71,14 @@
this.zkw = server.getZooKeeper();
this.curTaskZKVersion = curTaskZKVersion;
this.splitTaskExecutor = splitTaskExecutor;
+ this.ts = ts;
}
@Override
public void process() throws IOException {
long startTime = System.currentTimeMillis();
try {
- Status status = this.splitTaskExecutor.exec(wal, reporter);
+ Status status = this.splitTaskExecutor.exec(wal, reporter, ts);
switch (status) {
case DONE:
endTask(zkw, new SplitLogTask.Done(this.serverName),
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1548338)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy)
@@ -214,9 +214,29 @@
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
ZooKeeperWatcher zkw) throws IOException {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw);
- return s.splitLogFile(logfile, reporter);
+ return s.splitLogFile(logfile, reporter, HConstants.LEASE_RECOVERY_UNREQUESTED);
}
+ /*
+ * @param rootDir
+ * @param logfile
+ * @param fs
+ * @param conf
+ * @param reporter
+ * @param idChecker
+ * @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we
+ * dump out recoved.edits files for regions to replay on.
+ * @param ts timestamp of lease recovery attempt
+ * @return false if it is interrupted by the progress-able.
+ * @throws IOException
+ */
+ public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
+ Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
+ ZooKeeperWatcher zkw, long ts) throws IOException {
+ HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw);
+ return s.splitLogFile(logfile, reporter, ts);
+ }
+
// A wrapper to split one log folder using the method used by distributed
// log splitting. Used by tools and unit tests. It should be package private.
// It is public only because TestWALObserver is in a different package,
@@ -228,7 +248,7 @@
if (logfiles != null && logfiles.length > 0) {
for (FileStatus logfile: logfiles) {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null);
- if (s.splitLogFile(logfile, null)) {
+ if (s.splitLogFile(logfile, null, HConstants.LEASE_RECOVERY_UNREQUESTED)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) {
splits.addAll(s.outputSink.splits);
@@ -244,7 +264,7 @@
// The real log splitter. It just splits one log file.
boolean splitLogFile(FileStatus logfile,
- CancelableProgressable reporter) throws IOException {
+ CancelableProgressable reporter, long ts) throws IOException {
boolean isCorrupted = false;
Preconditions.checkState(status == null);
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
@@ -270,7 +290,7 @@
}
Reader in = null;
try {
- in = getReader(fs, logfile, conf, skipErrors, reporter);
+ in = getReader(fs, logfile, conf, skipErrors, reporter, ts);
} catch (CorruptedLogFileException e) {
LOG.warn("Could not get reader, corrupted log file " + logPath, e);
ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
@@ -535,7 +555,7 @@
* @throws CorruptedLogFileException
*/
protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf,
- boolean skipErrors, CancelableProgressable reporter)
+ boolean skipErrors, CancelableProgressable reporter, long ts)
throws IOException, CorruptedLogFileException {
Path path = file.getPath();
long length = file.getLen();
@@ -549,7 +569,7 @@
}
try {
- FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
+ FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter, ts);
try {
in = getReader(fs, path, conf, reporter);
} catch (EOFException e) {
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (revision 1548338)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (working copy)
@@ -125,7 +125,8 @@
RegionServerServices server, final LastSequenceId sequenceIdChecker) {
this(watcher, conf, server, new TaskExecutor() {
@Override
- public Status exec(String filename, CancelableProgressable p) {
+ public Status exec(String filename, CancelableProgressable p, long ts) {
+ // TODO: utilize ts
Path rootdir;
FileSystem fs;
try {
@@ -140,7 +141,7 @@
// encountered a bad non-retry-able persistent error.
try {
if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
- fs, conf, p, sequenceIdChecker, watcher)) {
+ fs, conf, p, sequenceIdChecker, watcher, ts)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
@@ -364,7 +365,7 @@
SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
getDataSetWatchAsync();
- submitTask(path, currentVersion, this.report_period);
+ submitTask(path, currentVersion, this.report_period, slt.getTs());
// after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
try {
@@ -459,8 +460,10 @@
* Submit a log split task to executor service
* @param curTask
* @param curTaskZKVersion
+ * @param ts
*/
- void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) {
+ void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod,
+ final long ts) {
final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
CancelableProgressable reporter = new CancelableProgressable() {
@@ -485,7 +488,7 @@
HLogSplitterHandler hsh =
new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, this.tasksInProgress,
- this.splitTaskExecutor);
+ this.splitTaskExecutor, ts);
this.executorService.submit(hsh);
}
@@ -660,6 +663,6 @@
RESIGNED(),
PREEMPTED()
}
- Status exec(String name, CancelableProgressable p);
+ Status exec(String name, CancelableProgressable p, long ts);
}
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (revision 1548338)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (working copy)
@@ -25,6 +25,7 @@
import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -33,7 +34,12 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
@@ -61,12 +67,14 @@
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
@@ -121,6 +129,7 @@
private long zkretries;
private long resubmit_threshold;
+ private int maxThreads; // max number of threads for submitting lease recovery
private long timeout;
private long unassignedTimeout;
private long lastNodeCreateTime = Long.MAX_VALUE;
@@ -220,6 +229,7 @@
this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
+ this.maxThreads = conf.getInt("hbase.splitlog.manager.max.threads", 6);
this.unassignedTimeout =
conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
@@ -328,63 +338,111 @@
SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
LOG.info("started splitting " + logfiles.length + " logs in " + logDirs);
long t = EnvironmentEdgeManager.currentTimeMillis();
- long totalSize = 0;
- TaskBatch batch = new TaskBatch();
+ final AtomicLong totalSize = new AtomicLong(0);
+ final AtomicInteger count = new AtomicInteger(0);
+ final TaskBatch batch = new TaskBatch();
Boolean isMetaRecovery = (filter == null) ? null : false;
- for (FileStatus lf : logfiles) {
- // TODO If the log file is still being written to - which is most likely
- // the case for the last log file - then its length will show up here
- // as zero. The size of such a file can only be retrieved after
- // recover-lease is done. totalSize will be under in most cases and the
- // metrics that it drives will also be under-reported.
- totalSize += lf.getLen();
- String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
- if (!enqueueSplitTask(pathToLog, batch)) {
- throw new IOException("duplicate log split scheduled for " + lf.getPath());
+ ExecutorService service = null;
+ List results = null;
+ final List exceptions = new ArrayList();
+ if (logfiles.length > 0) {
+ service = Executors.newFixedThreadPool(this.maxThreads);
+ results = new ArrayList(logfiles.length);
+ }
+ try {
+ for (final FileStatus lf : logfiles) {
+ // TODO If the log file is still being written to - which is most likely
+ // the case for the last log file - then its length will show up here
+ // as zero. The size of such a file can only be retrieved after
+ // recover-lease is done. totalSize will be under in most cases and the
+ // metrics that it drives will also be under-reported.
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ count.getAndIncrement();
+ totalSize.getAndAdd(lf.getLen());
+ try {
+ FileSystem fs = lf.getPath().getFileSystem(conf);
+ long ts = HConstants.LEASE_RECOVERY_SUCCEEDED;
+ if (fs instanceof DistributedFileSystem) {
+ boolean b = ((FSHDFSUtils)FSUtils.getInstance(fs, conf))
+ .recoverLease((DistributedFileSystem)fs, 0, lf.getPath(), 0L);
+ // record the time when lease recovery was submitted
+ if (!b) ts = EnvironmentEdgeManager.currentTimeMillis();
+ }
+ String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
+ if (!enqueueSplitTask(pathToLog, batch, ts)) {
+ exceptions.add(new IOException("duplicate log split scheduled for " +lf.getPath()));
+ }
+ } catch (IOException ioe) {
+ exceptions.add(ioe);
+ }
+ }
+ };
+ results.add(service.submit(runnable));
}
- }
- waitForSplittingCompletion(batch, status);
- // remove recovering regions from ZK
- if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
- // we split meta regions and user regions separately therefore logfiles are either all for
- // meta or user regions but won't for both( we could have mixed situations in tests)
- isMetaRecovery = true;
- }
- this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);
+ if (results != null) {
+ for (Future f : results) {
+ try {
+ f.get();
+ } catch (ExecutionException ee) {
+ throw new IOException(ee.getCause());
+ } catch (InterruptedException ie) {
+ String s = "Interrupted while waiting for log splits submission";
+ LOG.warn(s);
+ throw new InterruptedIOException(s);
+ }
+ }
+ }
+ if (!exceptions.isEmpty()) {
+ LOG.debug("Encountered " + exceptions.size() + " exceptions");
+ throw exceptions.get(0);
+ }
+ waitForSplittingCompletion(batch, status);
+ // remove recovering regions from ZK
+ if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
+ // we split meta regions and user regions separately therefore logfiles are either all for
+ // meta or user regions but won't for both( we could have mixed situations in tests)
+ isMetaRecovery = true;
+ }
+ this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);
- if (batch.done != batch.installed) {
- batch.isDead = true;
- SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
- LOG.warn("error while splitting logs in " + logDirs +
- " installed = " + batch.installed + " but only " + batch.done + " done");
- String msg = "error or interrupted while splitting logs in "
- + logDirs + " Task = " + batch;
- status.abort(msg);
- throw new IOException(msg);
- }
- for(Path logDir: logDirs){
- status.setStatus("Cleaning up log directory...");
- try {
- if (fs.exists(logDir) && !fs.delete(logDir, false)) {
- LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
+ if (batch.done != batch.installed) {
+ batch.isDead = true;
+ SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
+ LOG.warn("error while splitting logs in " + logDirs +
+ " installed = " + batch.installed + " but only " + batch.done + " done");
+ String msg = "error or interrupted while splitting logs in "
+ + logDirs + " Task = " + batch;
+ status.abort(msg);
+ throw new IOException(msg);
+ }
+ for (Path logDir: logDirs){
+ status.setStatus("Cleaning up log directory...");
+ try {
+ if (fs.exists(logDir) && !fs.delete(logDir, false)) {
+ LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
+ }
+ } catch (IOException ioe) {
+ FileStatus[] files = fs.listStatus(logDir);
+ if (files != null && files.length > 0) {
+ LOG.warn("returning success without actually splitting and " +
+ "deleting all the log files in path " + logDir);
+ } else {
+ LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
+ }
}
- } catch (IOException ioe) {
- FileStatus[] files = fs.listStatus(logDir);
- if (files != null && files.length > 0) {
- LOG.warn("returning success without actually splitting and " +
- "deleting all the log files in path " + logDir);
- } else {
- LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
- }
+ SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
}
- SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
+ String msg = "finished splitting (more than or equal to) " + totalSize +
+ " bytes in " + batch.installed + " log files in " + logDirs + " in " +
+ (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
+ status.markComplete(msg);
+ LOG.info(msg);
+ } finally {
+ if (service != null) service.shutdown();
}
- String msg = "finished splitting (more than or equal to) " + totalSize +
- " bytes in " + batch.installed + " log files in " + logDirs + " in " +
- (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
- status.markComplete(msg);
- LOG.info(msg);
- return totalSize;
+ return totalSize.longValue();
}
/**
@@ -392,9 +450,10 @@
*
* @param taskname the path of the log to be split
* @param batch the batch this task belongs to
+ * @param ts the time when lease recovery was submitted
* @return true if a new entry is created, false if it is already there.
*/
- boolean enqueueSplitTask(String taskname, TaskBatch batch) {
+ boolean enqueueSplitTask(String taskname, TaskBatch batch, long ts) {
SplitLogCounters.tot_mgr_log_split_start.incrementAndGet();
// This is a znode path under the splitlog dir with the rest of the path made up of an
// url encoding of the passed in log to split.
@@ -402,7 +461,7 @@
Task oldtask = createTaskIfAbsent(path, batch);
if (oldtask == null) {
// publish the task in zk
- createNode(path, zkretries);
+ createNode(path, zkretries, ts);
return true;
}
return false;
@@ -699,8 +758,8 @@
return;
}
- private void createNode(String path, Long retry_count) {
- SplitLogTask slt = new SplitLogTask.Unassigned(serverName);
+ private void createNode(String path, Long retry_count, long ts) {
+ SplitLogTask slt = new SplitLogTask.Unassigned(serverName, ts);
ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
return;
@@ -876,7 +935,8 @@
task.incarnation++;
try {
// blocking zk call but this is done from the timeout thread
- SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName);
+ SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName,
+ HConstants.LEASE_RECOVERY_UNREQUESTED);
if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
LOG.debug("failed to resubmit task " + path +
" version changed");
@@ -1503,7 +1563,7 @@
createNodeFailure(path);
} else {
SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
- createNode(path, retry_count - 1);
+ createNode(path, retry_count - 1, 0);
}
return;
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 1548338)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy)
@@ -1303,10 +1303,11 @@
* @param fs FileSystem handle
* @param p Path of file to recover lease
* @param conf Configuration handle
+ * @param ts
* @throws IOException
*/
public abstract void recoverFileLease(final FileSystem fs, final Path p,
- Configuration conf, CancelableProgressable reporter) throws IOException;
+ Configuration conf, CancelableProgressable reporter, long ts) throws IOException;
public static List getTableDirs(final FileSystem fs, final Path rootdir)
throws IOException {
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java (revision 1548338)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java (working copy)
@@ -36,7 +36,7 @@
private static final Log LOG = LogFactory.getLog(FSMapRUtils.class);
public void recoverFileLease(final FileSystem fs, final Path p,
- Configuration conf, CancelableProgressable reporter) throws IOException {
+ Configuration conf, CancelableProgressable reporter, long ts) throws IOException {
LOG.info("Recovering file " + p.toString() +
" by changing permission to readonly");
FsPermission roPerm = new FsPermission((short) 0444);
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java (revision 1548338)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java (working copy)
@@ -47,6 +47,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -765,7 +766,8 @@
} else {
LOG.debug("_balancedSplit file found. Replay log to restore state...");
FSUtils.getInstance(fs, table.getConfiguration())
- .recoverFileLease(fs, splitFile, table.getConfiguration(), null);
+ .recoverFileLease(fs, splitFile, table.getConfiguration(), null,
+ HConstants.LEASE_RECOVERY_UNREQUESTED);
// parse split file and process remaining splits
FSDataInputStream tmpIn = fs.open(splitFile);
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (revision 1548338)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (working copy)
@@ -30,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
@@ -47,11 +48,11 @@
*/
@Override
public void recoverFileLease(final FileSystem fs, final Path p,
- Configuration conf, CancelableProgressable reporter)
+ Configuration conf, CancelableProgressable reporter, long ts)
throws IOException {
// lease recovery not needed for local file system case.
if (!(fs instanceof DistributedFileSystem)) return;
- recoverDFSFileLease((DistributedFileSystem)fs, p, conf, reporter);
+ recoverDFSFileLease((DistributedFileSystem)fs, p, conf, reporter, ts);
}
/*
@@ -81,14 +82,10 @@
* If HDFS-4525 is available, call it every second and we might be able to exit early.
*/
boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p,
- final Configuration conf, final CancelableProgressable reporter)
+ final Configuration conf, final CancelableProgressable reporter, long ts)
throws IOException {
+ if (ts == HConstants.LEASE_RECOVERY_SUCCEEDED) return true;
LOG.info("Recovering lease on dfs file " + p);
- long startWaiting = EnvironmentEdgeManager.currentTimeMillis();
- // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
- // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
- // beyond that limit 'to be safe'.
- long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting;
// This setting should be a little bit above what the cluster dfs heartbeat is set to.
long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 4000);
// This should be set to how long it'll take for us to timeout against primary datanode if it
@@ -100,9 +97,34 @@
// whether we need to look for isFileClosed method
boolean findIsFileClosedMeth = true;
boolean recovered = false;
+ if (findIsFileClosedMeth) {
+ try {
+ isFileClosedMeth = dfs.getClass().getMethod("isFileClosed",
+ new Class[]{ Path.class });
+ } catch (NoSuchMethodException nsme) {
+ LOG.debug("isFileClosed not available");
+ } finally {
+ findIsFileClosedMeth = false;
+ }
+ }
+ if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
+ return true;
+ }
+ long startWaiting;
+ // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS
+ // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves
+ // beyond that limit 'to be safe'.
+ long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) +
+ EnvironmentEdgeManager.currentTimeMillis();
// We break the loop if we succeed the lease recovery, timeout, or we throw an exception.
for (int nbAttempt = 0; !recovered; nbAttempt++) {
- recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
+ // The very first attempt was made by SplitLogManager
+ if (ts == HConstants.LEASE_RECOVERY_UNREQUESTED) {
+ startWaiting = EnvironmentEdgeManager.currentTimeMillis();
+ recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
+ } else {
+ startWaiting = ts;
+ }
if (recovered) break;
checkIfCancelled(reporter);
if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) break;
@@ -117,16 +139,6 @@
while ((EnvironmentEdgeManager.currentTimeMillis() - localStartWaiting) <
subsequentPause) {
Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000));
- if (findIsFileClosedMeth) {
- try {
- isFileClosedMeth = dfs.getClass().getMethod("isFileClosed",
- new Class[]{ Path.class });
- } catch (NoSuchMethodException nsme) {
- LOG.debug("isFileClosed not available");
- } finally {
- findIsFileClosedMeth = false;
- }
- }
if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) {
recovered = true;
break;
@@ -164,7 +176,7 @@
* @return True if dfs#recoverLease came by true.
* @throws FileNotFoundException
*/
- boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p,
+ public boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p,
final long startWaiting)
throws FileNotFoundException {
boolean recovered = false;
Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
===================================================================
--- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1548338)
+++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy)
@@ -792,6 +792,13 @@
public static final String LOCALHOST_IP = "127.0.0.1";
+ /*
+ * The following two constants are for passing status of lease recovery attempt from
+ * SplitLogManager to SplitLogWorker
+ */
+ public static final int LEASE_RECOVERY_UNREQUESTED = 0;
+ public static final int LEASE_RECOVERY_SUCCEEDED = 1;
+
/** Conf key that enables unflushed WAL edits directly being replayed to region servers */
public static final String DISTRIBUTED_LOG_REPLAY_KEY = "hbase.master.distributed.log.replay";
public static final boolean DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG = false;