Index: hbase-protocol/src/main/protobuf/ZooKeeper.proto =================================================================== --- hbase-protocol/src/main/protobuf/ZooKeeper.proto (revision 1549385) +++ hbase-protocol/src/main/protobuf/ZooKeeper.proto (working copy) @@ -87,6 +87,7 @@ } required State state = 1; required ServerName server_name = 2; + optional uint64 ts = 3; } /** Index: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java =================================================================== --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java (revision 1549385) +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java (working copy) @@ -3230,6 +3230,16 @@ * required .ServerName server_name = 2; */ org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getServerNameOrBuilder(); + + // optional uint64 ts = 3; + /** + * optional uint64 ts = 3; + */ + boolean hasTs(); + /** + * optional uint64 ts = 3; + */ + long getTs(); } /** * Protobuf type {@code SplitLogTask} @@ -3312,6 +3322,11 @@ bitField0_ |= 0x00000002; break; } + case 24: { + bitField0_ |= 0x00000004; + ts_ = input.readUInt64(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -3499,9 +3514,26 @@ return serverName_; } + // optional uint64 ts = 3; + public static final int TS_FIELD_NUMBER = 3; + private long ts_; + /** + * optional uint64 ts = 3; + */ + public boolean hasTs() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional uint64 ts = 3; + */ + public long getTs() { + return ts_; + } + private void initFields() { state_ = org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.State.UNASSIGNED; serverName_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance(); + ts_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -3533,6 +3565,9 @@ if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeMessage(2, serverName_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeUInt64(3, ts_); + } getUnknownFields().writeTo(output); } @@ -3550,6 +3585,10 @@ size += com.google.protobuf.CodedOutputStream .computeMessageSize(2, serverName_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(3, ts_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3583,6 +3622,11 @@ result = result && getServerName() .equals(other.getServerName()); } + result = result && (hasTs() == other.hasTs()); + if (hasTs()) { + result = result && (getTs() + == other.getTs()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -3604,6 +3648,10 @@ hash = (37 * hash) + SERVER_NAME_FIELD_NUMBER; hash = (53 * hash) + getServerName().hashCode(); } + if (hasTs()) { + hash = (37 * hash) + TS_FIELD_NUMBER; + hash = (53 * hash) + hashLong(getTs()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -3728,6 +3776,8 @@ serverNameBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000002); + ts_ = 0L; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -3768,6 +3818,10 @@ } else { result.serverName_ = serverNameBuilder_.build(); } + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.ts_ = ts_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -3790,6 +3844,9 @@ if (other.hasServerName()) { mergeServerName(other.getServerName()); } + if (other.hasTs()) { + setTs(other.getTs()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -3982,6 +4039,39 @@ return serverNameBuilder_; } + // optional uint64 ts = 3; + private long ts_ ; + /** + * optional uint64 ts = 3; + */ + public boolean hasTs() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional uint64 ts = 3; + */ + public long getTs() { + return ts_; + } + /** + * optional uint64 ts = 3; + */ + public Builder setTs(long value) { + bitField0_ |= 0x00000004; + ts_ = value; + onChanged(); + return this; + } + /** + * optional uint64 ts = 3; + */ + public Builder clearTs() { + bitField0_ = (bitField0_ & ~0x00000004); + ts_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:SplitLogTask) } @@ -9399,29 +9489,29 @@ "gionTransition\022\027\n\017event_type_code\030\001 \002(\r\022" + "\023\n\013region_name\030\002 \002(\014\022\023\n\013create_time\030\003 \002(" + "\004\022 \n\013server_name\030\004 \002(\0132\013.ServerName\022\017\n\007p" + - "ayload\030\005 \001(\014\"\231\001\n\014SplitLogTask\022\"\n\005state\030\001" + + "ayload\030\005 \001(\014\"\245\001\n\014SplitLogTask\022\"\n\005state\030\001" + " \002(\0162\023.SplitLogTask.State\022 \n\013server_name", - "\030\002 \002(\0132\013.ServerName\"C\n\005State\022\016\n\nUNASSIGN" + - "ED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020\002\022\010\n\004DONE\020\003\022" + - "\007\n\003ERR\020\004\"n\n\005Table\022$\n\005state\030\001 \002(\0162\014.Table" + - ".State:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n" + - "\010DISABLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003" + - "\"%\n\017ReplicationPeer\022\022\n\nclusterkey\030\001 \002(\t\"" + - "^\n\020ReplicationState\022&\n\005state\030\001 \002(\0162\027.Rep" + - "licationState.State\"\"\n\005State\022\013\n\007ENABLED\020" + - "\000\022\014\n\010DISABLED\020\001\"+\n\027ReplicationHLogPositi" + - "on\022\020\n\010position\030\001 \002(\003\"%\n\017ReplicationLock\022", - "\022\n\nlock_owner\030\001 \002(\t\"\230\001\n\tTableLock\022\036\n\ntab" + - "le_name\030\001 \001(\0132\n.TableName\022\037\n\nlock_owner\030" + - "\002 \001(\0132\013.ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n" + - "\tis_shared\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013cre" + - "ate_time\030\006 \001(\003\";\n\017StoreSequenceId\022\023\n\013fam" + - "ily_name\030\001 \002(\014\022\023\n\013sequence_id\030\002 \002(\004\"g\n\026R" + - "egionStoreSequenceIds\022 \n\030last_flushed_se" + - "quence_id\030\001 \002(\004\022+\n\021store_sequence_id\030\002 \003" + - "(\0132\020.StoreSequenceIdBE\n*org.apache.hadoo" + - "p.hbase.protobuf.generatedB\017ZooKeeperPro", - "tosH\001\210\001\001\240\001\001" + "\030\002 \002(\0132\013.ServerName\022\n\n\002ts\030\003 \001(\004\"C\n\005State" + + "\022\016\n\nUNASSIGNED\020\000\022\t\n\005OWNED\020\001\022\014\n\010RESIGNED\020" + + "\002\022\010\n\004DONE\020\003\022\007\n\003ERR\020\004\"n\n\005Table\022$\n\005state\030\001" + + " \002(\0162\014.Table.State:\007ENABLED\"?\n\005State\022\013\n\007" + + "ENABLED\020\000\022\014\n\010DISABLED\020\001\022\r\n\tDISABLING\020\002\022\014" + + "\n\010ENABLING\020\003\"%\n\017ReplicationPeer\022\022\n\nclust" + + "erkey\030\001 \002(\t\"^\n\020ReplicationState\022&\n\005state" + + "\030\001 \002(\0162\027.ReplicationState.State\"\"\n\005State" + + "\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replicati" + + "onHLogPosition\022\020\n\010position\030\001 \002(\003\"%\n\017Repl", + "icationLock\022\022\n\nlock_owner\030\001 \002(\t\"\230\001\n\tTabl" + + "eLock\022\036\n\ntable_name\030\001 \001(\0132\n.TableName\022\037\n" + + "\nlock_owner\030\002 \001(\0132\013.ServerName\022\021\n\tthread" + + "_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose\030" + + "\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\";\n\017StoreSeque" + + "nceId\022\023\n\013family_name\030\001 \002(\014\022\023\n\013sequence_i" + + "d\030\002 \002(\004\"g\n\026RegionStoreSequenceIds\022 \n\030las" + + "t_flushed_sequence_id\030\001 \002(\004\022+\n\021store_seq" + + "uence_id\030\002 \003(\0132\020.StoreSequenceIdBE\n*org." + + "apache.hadoop.hbase.protobuf.generatedB\017", + "ZooKeeperProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -9457,7 +9547,7 @@ internal_static_SplitLogTask_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SplitLogTask_descriptor, - new java.lang.String[] { "State", "ServerName", }); + new java.lang.String[] { "State", "ServerName", "Ts", }); internal_static_Table_descriptor = getDescriptor().getMessageTypes().get(5); internal_static_Table_fieldAccessorTable = new Index: hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java (revision 1549385) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestSerialization.java (working copy) @@ -77,7 +77,8 @@ @Test public void testSplitLogTask() throws DeserializationException { - SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")); + SplitLogTask slt = new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), + HConstants.LEASE_RECOVERY_UNREQUESTED); byte [] bytes = slt.toByteArray(); SplitLogTask sltDeserialized = SplitLogTask.parseFrom(bytes); assertTrue(slt.equals(sltDeserialized)); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java (revision 1549385) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSHDFSUtils.java (working copy) @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.junit.Before; @@ -64,7 +65,8 @@ // Fail four times and pass on the fifth. Mockito.when(dfs.recoverLease(FILE)). thenReturn(false).thenReturn(false).thenReturn(false).thenReturn(false).thenReturn(true); - assertTrue(this.fsHDFSUtils.recoverDFSFileLease(dfs, FILE, HTU.getConfiguration(), reporter)); + assertTrue(this.fsHDFSUtils.recoverDFSFileLease(dfs, FILE, HTU.getConfiguration(), reporter, + HConstants.LEASE_RECOVERY_UNREQUESTED)); Mockito.verify(dfs, Mockito.times(5)).recoverLease(FILE); // Make sure we waited at least hbase.lease.recovery.dfs.timeout * 3 (the first two // invocations will happen pretty fast... the we fall into the longer wait loop). @@ -89,8 +91,8 @@ Mockito.when(dfs.recoverLease(FILE)). thenReturn(false).thenReturn(false).thenReturn(true); Mockito.when(dfs.isFileClosed(FILE)).thenReturn(true); - assertTrue(this.fsHDFSUtils.recoverDFSFileLease(dfs, FILE, HTU.getConfiguration(), reporter)); - Mockito.verify(dfs, Mockito.times(2)).recoverLease(FILE); + assertTrue(this.fsHDFSUtils.recoverDFSFileLease(dfs, FILE, HTU.getConfiguration(), reporter, + HConstants.LEASE_RECOVERY_UNREQUESTED)); Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (revision 1549385) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (working copy) @@ -837,7 +837,7 @@ t.setDaemon(true); t.start(); try { - logSplitter.splitLogFile(logfiles[0], null); + logSplitter.splitLogFile(logfiles[0], null, HConstants.LEASE_RECOVERY_UNREQUESTED); fail("Didn't throw!"); } catch (IOException ioe) { assertTrue(ioe.toString().contains("Injected")); @@ -1056,7 +1056,7 @@ } }; - logSplitter.splitLogFile(fs.getFileStatus(logPath), null); + logSplitter.splitLogFile(fs.getFileStatus(logPath), null,HConstants.LEASE_RECOVERY_UNREQUESTED); // Verify number of written edits per region Map outputCounts = logSplitter.outputSink.getOutputCounts(); @@ -1308,7 +1308,7 @@ } }; try{ - logSplitter.splitLogFile(logfiles[0], null); + logSplitter.splitLogFile(logfiles[0], null, HConstants.LEASE_RECOVERY_UNREQUESTED); } catch (IOException e) { LOG.info(e); Assert.fail("Throws IOException when spliting " Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (revision 1549385) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (working copy) @@ -524,7 +524,8 @@ FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration()); for (Path p : paths) { LOG.debug("recovering lease for " + p); - fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null); + fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), + null, HConstants.LEASE_RECOVERY_UNREQUESTED); LOG.debug("Reading HLog "+FSUtils.getPath(p)); HLog.Reader reader = null; Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (revision 1549385) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (working copy) @@ -437,8 +437,8 @@ public Exception exception = null; public void run() { try { - FSUtils.getInstance(fs, rlConf) - .recoverFileLease(recoveredFs, walPath, rlConf, null); + FSUtils.getInstance(fs, rlConf).recoverFileLease(recoveredFs, walPath, rlConf, null, + HConstants.LEASE_RECOVERY_UNREQUESTED); } catch (IOException e) { exception = e; } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (revision 1549385) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (working copy) @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; @@ -125,7 +126,7 @@ new SplitLogWorker.TaskExecutor() { @Override - public Status exec(String name, CancelableProgressable p) { + public Status exec(String name, CancelableProgressable p, long leaseRecoveryReqTS) { while (true) { try { Thread.sleep(1000); @@ -148,8 +149,9 @@ final ServerName RS = ServerName.valueOf("rs,1,1"); RegionServerServices mockedRS = getRegionServer(RS); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS), - new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), + HConstants.LEASE_RECOVERY_UNREQUESTED).toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); SplitLogWorker slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask); @@ -183,8 +185,8 @@ final ServerName SVR1 = ServerName.valueOf("svr1,1,1"); final ServerName SVR2 = ServerName.valueOf("svr2,1,1"); zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TRFT), - new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + new SplitLogTask.Unassigned(MANAGER, HConstants.LEASE_RECOVERY_UNREQUESTED).toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); RegionServerServices mockedRS1 = getRegionServer(SVR1); RegionServerServices mockedRS2 = getRegionServer(SVR2); SplitLogWorker slw1 = @@ -225,15 +227,15 @@ // this time create a task node after starting the splitLogWorker zkw.getRecoverableZooKeeper().create(PATH, - new SplitLogTask.Unassigned(MANAGER).toByteArray(), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + new SplitLogTask.Unassigned(MANAGER, HConstants.LEASE_RECOVERY_UNREQUESTED).toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, 8000); assertEquals(1, slw.taskReadySeq); byte [] bytes = ZKUtil.getData(zkw, PATH); SplitLogTask slt = SplitLogTask.parseFrom(bytes); assertTrue(slt.isOwned(SRV)); - slt = new SplitLogTask.Unassigned(MANAGER); + slt = new SplitLogTask.Unassigned(MANAGER, HConstants.LEASE_RECOVERY_UNREQUESTED); ZKUtil.setData(zkw, PATH, slt.toByteArray()); waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, 1500); } finally { @@ -256,7 +258,8 @@ Thread.sleep(100); waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, 1500); - SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER); + SplitLogTask unassignedManager = new SplitLogTask.Unassigned(MANAGER, + HConstants.LEASE_RECOVERY_UNREQUESTED); zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -296,7 +299,7 @@ Thread.sleep(100); String task = ZKSplitLog.getEncodedNodeName(zkw, "task"); - SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER); + SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER, HConstants.LEASE_RECOVERY_UNREQUESTED); zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); @@ -349,8 +352,9 @@ for (int i = 0; i < maxTasks; i++) { zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), - new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), + HConstants.LEASE_RECOVERY_UNREQUESTED).toByteArray(), + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask); @@ -392,9 +396,9 @@ for (int i = 0; i < maxTasks; i++) { zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), - new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), - Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), + HConstants.LEASE_RECOVERY_UNREQUESTED).toByteArray(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); } SplitLogWorker slw = new SplitLogWorker(zkw, testConf, mockedRS, neverEndingTask); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (revision 1549385) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (working copy) @@ -32,6 +32,7 @@ import static org.junit.Assert.fail; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -965,7 +966,8 @@ // slm.splitLogDistributed(logDir); FileStatus[] logfiles = fs.listStatus(logDir); TaskBatch batch = new TaskBatch(); - slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch); + slm.enqueueSplitTask(logfiles[0].getPath().toString(), batch, + HConstants.LEASE_RECOVERY_UNREQUESTED); //waitForCounter but for one of the 2 counters long curt = System.currentTimeMillis(); long waitTime = 80000; @@ -1079,7 +1081,7 @@ // is set to true, until it is interrupted. slm.splitLogDistributed(logDir); } catch (IOException e) { - assertTrue(Thread.currentThread().isInterrupted()); + assertTrue(e instanceof InterruptedIOException); return; } fail("did not get the expected IOException from the 2nd call"); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (revision 1549385) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (working copy) @@ -178,7 +178,7 @@ zkw.registerListener(listener); ZKUtil.watchAndCheckExists(zkw, tasknode); - slm.enqueueSplitTask(name, batch); + slm.enqueueSplitTask(name, batch, HConstants.LEASE_RECOVERY_UNREQUESTED); assertEquals(1, batch.installed); assertTrue(slm.findOrCreateOrphanTask(tasknode).batch == batch); assertEquals(1L, tot_mgr_node_create_queued.get()); @@ -238,7 +238,8 @@ " startup"); String tasknode = ZKSplitLog.getEncodedNodeName(zkw, "orphan/test/slash"); //create an unassigned orphan task - SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER); + SplitLogTask slt = new SplitLogTask.Unassigned(DUMMY_MASTER, + HConstants.LEASE_RECOVERY_UNREQUESTED); zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); int version = ZKUtil.checkExists(zkw, tasknode); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java (revision 1549385) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java (working copy) @@ -36,10 +36,16 @@ public class SplitLogTask { private final ServerName originServer; private final ZooKeeperProtos.SplitLogTask.State state; + // if non-zero, the time when lease recovery was submitted + private final long leaseRecoveryReqTS; + /* + * @param originServer + * @param leaseRecoveryReqTS timestamp of lease recovery attempt + */ public static class Unassigned extends SplitLogTask { - public Unassigned(final ServerName originServer) { - super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED); + public Unassigned(final ServerName originServer, final long leaseRecoveryReqTS) { + super(originServer, ZooKeeperProtos.SplitLogTask.State.UNASSIGNED, leaseRecoveryReqTS); } } @@ -68,17 +74,34 @@ } SplitLogTask(final ZooKeeperProtos.SplitLogTask slt) { - this(ProtobufUtil.toServerName(slt.getServerName()), slt.getState()); + this(ProtobufUtil.toServerName(slt.getServerName()), slt.getState(), slt.getTs()); } SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state) { this.originServer = originServer; this.state = state; + this.leaseRecoveryReqTS = HConstants.LEASE_RECOVERY_UNREQUESTED; } + /* + * @param originServer + * @param state + * @param leaseRecoveryReqTS timestamp of lease recovery attempt + */ + SplitLogTask(final ServerName originServer, final ZooKeeperProtos.SplitLogTask.State state, + final long leaseRecoveryReqTS) { + this.originServer = originServer; + this.state = state; + this.leaseRecoveryReqTS = leaseRecoveryReqTS; + } + public ServerName getServerName() { return this.originServer; } + + public long getleaseRecoveryReqTS() { + return this.leaseRecoveryReqTS; + } public boolean isUnassigned(final ServerName sn) { return this.originServer.equals(sn) && isUnassigned(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java (revision 1549385) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/HLogSplitterHandler.java (working copy) @@ -55,11 +55,12 @@ private final AtomicInteger inProgressTasks; private final MutableInt curTaskZKVersion; private final TaskExecutor splitTaskExecutor; + private final long leaseRecoveryReqTS; public HLogSplitterHandler(final Server server, String curTask, final MutableInt curTaskZKVersion, CancelableProgressable reporter, - AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) { + AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor, long leaseRecoveryReqTS) { super(server, EventType.RS_LOG_REPLAY); this.curTask = curTask; this.wal = ZKSplitLog.getFileName(curTask); @@ -70,13 +71,14 @@ this.zkw = server.getZooKeeper(); this.curTaskZKVersion = curTaskZKVersion; this.splitTaskExecutor = splitTaskExecutor; + this.leaseRecoveryReqTS = leaseRecoveryReqTS; } @Override public void process() throws IOException { long startTime = System.currentTimeMillis(); try { - Status status = this.splitTaskExecutor.exec(wal, reporter); + Status status = this.splitTaskExecutor.exec(wal, reporter, leaseRecoveryReqTS); switch (status) { case DONE: endTask(zkw, new SplitLogTask.Done(this.serverName), Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1549385) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy) @@ -214,9 +214,29 @@ Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, ZooKeeperWatcher zkw) throws IOException { HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw); - return s.splitLogFile(logfile, reporter); + return s.splitLogFile(logfile, reporter, HConstants.LEASE_RECOVERY_UNREQUESTED); } + /* + * @param rootDir + * @param logfile + * @param fs + * @param conf + * @param reporter + * @param idChecker + * @param zkw ZooKeeperWatcher if it's null, we will back to the old-style log splitting where we + * dump out recoved.edits files for regions to replay on. + * @param leaseRecoveryReqTS timestamp of lease recovery attempt + * @return false if it is interrupted by the progress-able. + * @throws IOException + */ + public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, + Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, + ZooKeeperWatcher zkw, long leaseRecoveryReqTS) throws IOException { + HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw); + return s.splitLogFile(logfile, reporter, leaseRecoveryReqTS); + } + // A wrapper to split one log folder using the method used by distributed // log splitting. Used by tools and unit tests. It should be package private. // It is public only because TestWALObserver is in a different package, @@ -228,7 +248,7 @@ if (logfiles != null && logfiles.length > 0) { for (FileStatus logfile: logfiles) { HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null); - if (s.splitLogFile(logfile, null)) { + if (s.splitLogFile(logfile, null, HConstants.LEASE_RECOVERY_UNREQUESTED)) { finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); if (s.outputSink.splits != null) { splits.addAll(s.outputSink.splits); @@ -244,7 +264,7 @@ // The real log splitter. It just splits one log file. boolean splitLogFile(FileStatus logfile, - CancelableProgressable reporter) throws IOException { + CancelableProgressable reporter, long leaseRecoveryReqTS) throws IOException { boolean isCorrupted = false; Preconditions.checkState(status == null); boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", @@ -270,7 +290,7 @@ } Reader in = null; try { - in = getReader(fs, logfile, conf, skipErrors, reporter); + in = getReader(fs, logfile, conf, skipErrors, reporter, leaseRecoveryReqTS); } catch (CorruptedLogFileException e) { LOG.warn("Could not get reader, corrupted log file " + logPath, e); ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); @@ -535,7 +555,7 @@ * @throws CorruptedLogFileException */ protected Reader getReader(FileSystem fs, FileStatus file, Configuration conf, - boolean skipErrors, CancelableProgressable reporter) + boolean skipErrors, CancelableProgressable reporter, long leaseRecoveryReqTS) throws IOException, CorruptedLogFileException { Path path = file.getPath(); long length = file.getLen(); @@ -549,7 +569,7 @@ } try { - FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter); + FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter, leaseRecoveryReqTS); try { in = getReader(fs, path, conf, reporter); } catch (EOFException e) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (revision 1549385) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (working copy) @@ -125,7 +125,8 @@ RegionServerServices server, final LastSequenceId sequenceIdChecker) { this(watcher, conf, server, new TaskExecutor() { @Override - public Status exec(String filename, CancelableProgressable p) { + public Status exec(String filename, CancelableProgressable p, long leaseRecoveryReqTS) { + // TODO: utilize ts Path rootdir; FileSystem fs; try { @@ -140,7 +141,7 @@ // encountered a bad non-retry-able persistent error. try { if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)), - fs, conf, p, sequenceIdChecker, watcher)) { + fs, conf, p, sequenceIdChecker, watcher, leaseRecoveryReqTS)) { return Status.PREEMPTED; } } catch (InterruptedIOException iioe) { @@ -364,7 +365,7 @@ SplitLogCounters.tot_wkr_task_acquired.incrementAndGet(); getDataSetWatchAsync(); - submitTask(path, currentVersion, this.report_period); + submitTask(path, currentVersion, this.report_period, slt.getleaseRecoveryReqTS()); // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks try { @@ -459,8 +460,10 @@ * Submit a log split task to executor service * @param curTask * @param curTaskZKVersion + * @param leaseRecoveryReqTS */ - void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod) { + void submitTask(final String curTask, final int curTaskZKVersion, final int reportPeriod, + final long leaseRecoveryReqTS) { final MutableInt zkVersion = new MutableInt(curTaskZKVersion); CancelableProgressable reporter = new CancelableProgressable() { @@ -485,7 +488,7 @@ HLogSplitterHandler hsh = new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, this.tasksInProgress, - this.splitTaskExecutor); + this.splitTaskExecutor, leaseRecoveryReqTS); this.executorService.submit(hsh); } @@ -660,6 +663,6 @@ RESIGNED(), PREEMPTED() } - Status exec(String name, CancelableProgressable p); + Status exec(String name, CancelableProgressable p, long leaseRecoveryReqTS); } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (revision 1549385) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (working copy) @@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -33,7 +34,12 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; @@ -61,12 +67,14 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; @@ -121,6 +129,7 @@ private long zkretries; private long resubmit_threshold; + private int maxThreads; // max number of threads for submitting lease recovery private long timeout; private long unassignedTimeout; private long lastNodeCreateTime = Long.MAX_VALUE; @@ -220,6 +229,7 @@ this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES); this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT); this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT); + this.maxThreads = conf.getInt("hbase.splitlog.manager.max.threads", 6); this.unassignedTimeout = conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, @@ -328,63 +338,111 @@ SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet(); LOG.info("started splitting " + logfiles.length + " logs in " + logDirs); long t = EnvironmentEdgeManager.currentTimeMillis(); - long totalSize = 0; - TaskBatch batch = new TaskBatch(); + final AtomicLong totalSize = new AtomicLong(0); + final AtomicInteger count = new AtomicInteger(0); + final TaskBatch batch = new TaskBatch(); Boolean isMetaRecovery = (filter == null) ? null : false; - for (FileStatus lf : logfiles) { - // TODO If the log file is still being written to - which is most likely - // the case for the last log file - then its length will show up here - // as zero. The size of such a file can only be retrieved after - // recover-lease is done. totalSize will be under in most cases and the - // metrics that it drives will also be under-reported. - totalSize += lf.getLen(); - String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf); - if (!enqueueSplitTask(pathToLog, batch)) { - throw new IOException("duplicate log split scheduled for " + lf.getPath()); + ExecutorService service = null; + List results = null; + final List exceptions = new ArrayList(); + if (logfiles.length > 0) { + service = Executors.newFixedThreadPool(this.maxThreads); + results = new ArrayList(logfiles.length); + } + try { + for (final FileStatus lf : logfiles) { + // TODO If the log file is still being written to - which is most likely + // the case for the last log file - then its length will show up here + // as zero. The size of such a file can only be retrieved after + // recover-lease is done. totalSize will be under in most cases and the + // metrics that it drives will also be under-reported. + Runnable runnable = new Runnable() { + @Override + public void run() { + count.getAndIncrement(); + totalSize.getAndAdd(lf.getLen()); + try { + FileSystem fs = lf.getPath().getFileSystem(conf); + long ts = HConstants.LEASE_RECOVERY_SUCCEEDED; + if (fs instanceof DistributedFileSystem) { + boolean b = ((FSHDFSUtils)FSUtils.getInstance(fs, conf)) + .recoverLease((DistributedFileSystem)fs, 0, lf.getPath(), 0L); + // record the time when lease recovery was submitted + if (!b) ts = EnvironmentEdgeManager.currentTimeMillis(); + } + String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf); + if (!enqueueSplitTask(pathToLog, batch, ts)) { + exceptions.add(new IOException("duplicate log split scheduled for " +lf.getPath())); + } + } catch (IOException ioe) { + exceptions.add(ioe); + } + } + }; + results.add(service.submit(runnable)); } - } - waitForSplittingCompletion(batch, status); - // remove recovering regions from ZK - if (filter == MasterFileSystem.META_FILTER /* reference comparison */) { - // we split meta regions and user regions separately therefore logfiles are either all for - // meta or user regions but won't for both( we could have mixed situations in tests) - isMetaRecovery = true; - } - this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery); + if (results != null) { + for (Future f : results) { + try { + f.get(); + } catch (ExecutionException ee) { + throw new IOException(ee.getCause()); + } catch (InterruptedException ie) { + String s = "Interrupted while waiting for log splits submission"; + LOG.warn(s); + throw new InterruptedIOException(s); + } + } + } + if (!exceptions.isEmpty()) { + LOG.warn("Encountered " + exceptions.size() + " exceptions"); + throw exceptions.get(0); + } + waitForSplittingCompletion(batch, status); + // remove recovering regions from ZK + if (filter == MasterFileSystem.META_FILTER /* reference comparison */) { + // we split meta regions and user regions separately therefore logfiles are either all for + // meta or user regions but won't for both( we could have mixed situations in tests) + isMetaRecovery = true; + } + this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery); - if (batch.done != batch.installed) { - batch.isDead = true; - SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet(); - LOG.warn("error while splitting logs in " + logDirs + - " installed = " + batch.installed + " but only " + batch.done + " done"); - String msg = "error or interrupted while splitting logs in " - + logDirs + " Task = " + batch; - status.abort(msg); - throw new IOException(msg); - } - for(Path logDir: logDirs){ - status.setStatus("Cleaning up log directory..."); - try { - if (fs.exists(logDir) && !fs.delete(logDir, false)) { - LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); + if (batch.done != batch.installed) { + batch.isDead = true; + SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet(); + LOG.warn("error while splitting logs in " + logDirs + + " installed = " + batch.installed + " but only " + batch.done + " done"); + String msg = "error or interrupted while splitting logs in " + + logDirs + " Task = " + batch; + status.abort(msg); + throw new IOException(msg); + } + for (Path logDir: logDirs){ + status.setStatus("Cleaning up log directory..."); + try { + if (fs.exists(logDir) && !fs.delete(logDir, false)) { + LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); + } + } catch (IOException ioe) { + FileStatus[] files = fs.listStatus(logDir); + if (files != null && files.length > 0) { + LOG.warn("returning success without actually splitting and " + + "deleting all the log files in path " + logDir); + } else { + LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); + } } - } catch (IOException ioe) { - FileStatus[] files = fs.listStatus(logDir); - if (files != null && files.length > 0) { - LOG.warn("returning success without actually splitting and " + - "deleting all the log files in path " + logDir); - } else { - LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); - } + SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet(); } - SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet(); + String msg = "finished splitting (more than or equal to) " + totalSize + + " bytes in " + batch.installed + " log files in " + logDirs + " in " + + (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms"; + status.markComplete(msg); + LOG.info(msg); + } finally { + if (service != null) service.shutdown(); } - String msg = "finished splitting (more than or equal to) " + totalSize + - " bytes in " + batch.installed + " log files in " + logDirs + " in " + - (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms"; - status.markComplete(msg); - LOG.info(msg); - return totalSize; + return totalSize.longValue(); } /** @@ -392,9 +450,10 @@ * * @param taskname the path of the log to be split * @param batch the batch this task belongs to + * @param leaseRecoveryReqTS the time when lease recovery was submitted * @return true if a new entry is created, false if it is already there. */ - boolean enqueueSplitTask(String taskname, TaskBatch batch) { + boolean enqueueSplitTask(String taskname, TaskBatch batch, long leaseRecoveryReqTS) { SplitLogCounters.tot_mgr_log_split_start.incrementAndGet(); // This is a znode path under the splitlog dir with the rest of the path made up of an // url encoding of the passed in log to split. @@ -402,7 +461,7 @@ Task oldtask = createTaskIfAbsent(path, batch); if (oldtask == null) { // publish the task in zk - createNode(path, zkretries); + createNode(path, zkretries, leaseRecoveryReqTS); return true; } return false; @@ -699,8 +758,8 @@ return; } - private void createNode(String path, Long retry_count) { - SplitLogTask slt = new SplitLogTask.Unassigned(serverName); + private void createNode(String path, Long retry_count, long leaseRecoveryReqTS) { + SplitLogTask slt = new SplitLogTask.Unassigned(serverName, leaseRecoveryReqTS); ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count); SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet(); return; @@ -876,7 +935,8 @@ task.incarnation++; try { // blocking zk call but this is done from the timeout thread - SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName); + SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName, + HConstants.LEASE_RECOVERY_UNREQUESTED); if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) { LOG.debug("failed to resubmit task " + path + " version changed"); @@ -1503,7 +1563,7 @@ createNodeFailure(path); } else { SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet(); - createNode(path, retry_count - 1); + createNode(path, retry_count - 1, 0); } return; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 1549385) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy) @@ -1303,10 +1303,12 @@ * @param fs FileSystem handle * @param p Path of file to recover lease * @param conf Configuration handle + * @param leaseRecoveryReqTS timestamp of lease recovery request * @throws IOException */ public abstract void recoverFileLease(final FileSystem fs, final Path p, - Configuration conf, CancelableProgressable reporter) throws IOException; + Configuration conf, CancelableProgressable reporter, long leaseRecoveryReqTS) + throws IOException; public static List getTableDirs(final FileSystem fs, final Path rootdir) throws IOException { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java (revision 1549385) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSMapRUtils.java (working copy) @@ -36,7 +36,7 @@ private static final Log LOG = LogFactory.getLog(FSMapRUtils.class); public void recoverFileLease(final FileSystem fs, final Path p, - Configuration conf, CancelableProgressable reporter) throws IOException { + Configuration conf, CancelableProgressable reporter, long leaseRecoveryReqTS) throws IOException { LOG.info("Recovering file " + p.toString() + " by changing permission to readonly"); FsPermission roPerm = new FsPermission((short) 0444); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java (revision 1549385) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java (working copy) @@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; @@ -765,7 +766,8 @@ } else { LOG.debug("_balancedSplit file found. Replay log to restore state..."); FSUtils.getInstance(fs, table.getConfiguration()) - .recoverFileLease(fs, splitFile, table.getConfiguration(), null); + .recoverFileLease(fs, splitFile, table.getConfiguration(), null, + HConstants.LEASE_RECOVERY_UNREQUESTED); // parse split file and process remaining splits FSDataInputStream tmpIn = fs.open(splitFile); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (revision 1549385) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java (working copy) @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; @@ -47,11 +48,11 @@ */ @Override public void recoverFileLease(final FileSystem fs, final Path p, - Configuration conf, CancelableProgressable reporter) + Configuration conf, CancelableProgressable reporter, long leaseRecoveryReqTS) throws IOException { // lease recovery not needed for local file system case. if (!(fs instanceof DistributedFileSystem)) return; - recoverDFSFileLease((DistributedFileSystem)fs, p, conf, reporter); + recoverDFSFileLease((DistributedFileSystem)fs, p, conf, reporter, leaseRecoveryReqTS); } /* @@ -81,14 +82,10 @@ * If HDFS-4525 is available, call it every second and we might be able to exit early. */ boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p, - final Configuration conf, final CancelableProgressable reporter) + final Configuration conf, final CancelableProgressable reporter,final long leaseRecoveryReqTS) throws IOException { + if (leaseRecoveryReqTS == HConstants.LEASE_RECOVERY_SUCCEEDED) return true; LOG.info("Recovering lease on dfs file " + p); - long startWaiting = EnvironmentEdgeManager.currentTimeMillis(); - // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS - // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves - // beyond that limit 'to be safe'. - long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + startWaiting; // This setting should be a little bit above what the cluster dfs heartbeat is set to. long firstPause = conf.getInt("hbase.lease.recovery.first.pause", 4000); // This should be set to how long it'll take for us to timeout against primary datanode if it @@ -97,39 +94,49 @@ long subsequentPause = conf.getInt("hbase.lease.recovery.dfs.timeout", 61 * 1000); Method isFileClosedMeth = null; - // whether we need to look for isFileClosed method - boolean findIsFileClosedMeth = true; - boolean recovered = false; + try { + isFileClosedMeth = dfs.getClass().getMethod("isFileClosed", + new Class[]{ Path.class }); + } catch (NoSuchMethodException nsme) { + LOG.debug("isFileClosed not available"); + } + if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) { + return true; + } + long startWaiting = leaseRecoveryReqTS; + // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS + // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves + // beyond that limit 'to be safe'. + long recoveryTimeout = conf.getInt("hbase.lease.recovery.timeout", 900000) + + EnvironmentEdgeManager.currentTimeMillis(); // We break the loop if we succeed the lease recovery, timeout, or we throw an exception. - for (int nbAttempt = 0; !recovered; nbAttempt++) { - recovered = recoverLease(dfs, nbAttempt, p, startWaiting); - if (recovered) break; + int recoveryPause = conf.getInt("hbase.lease.recovery.pause", 1000); + for (int nbAttempt = 0; ; nbAttempt++) { + // The very first attempt was made by SplitLogManager + if (leaseRecoveryReqTS == HConstants.LEASE_RECOVERY_UNREQUESTED || nbAttempt > 0) { + startWaiting = EnvironmentEdgeManager.currentTimeMillis(); + if (recoverLease(dfs, nbAttempt, p, startWaiting)) return true; + } checkIfCancelled(reporter); if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) break; try { - // On the first time through wait the short 'firstPause'. - if (nbAttempt == 0) { - Thread.sleep(firstPause); + // On the first time through wait the short 'firstPause' if isFileClosed is not available + // calculation of firstPause takes into account the time when the recovery was started + if (nbAttempt == 0 && leaseRecoveryReqTS != HConstants.LEASE_RECOVERY_UNREQUESTED) { + firstPause -= (EnvironmentEdgeManager.currentTimeMillis() - leaseRecoveryReqTS); + } + if (nbAttempt == 0 && isFileClosedMeth == null) { + if (firstPause > 0) Thread.sleep(firstPause); + else continue; } else { // Cycle here until subsequentPause elapses. While spinning, check isFileClosed if // available (should be in hadoop 2.0.5... not in hadoop 1 though. long localStartWaiting = EnvironmentEdgeManager.currentTimeMillis(); while ((EnvironmentEdgeManager.currentTimeMillis() - localStartWaiting) < subsequentPause) { - Thread.sleep(conf.getInt("hbase.lease.recovery.pause", 1000)); - if (findIsFileClosedMeth) { - try { - isFileClosedMeth = dfs.getClass().getMethod("isFileClosed", - new Class[]{ Path.class }); - } catch (NoSuchMethodException nsme) { - LOG.debug("isFileClosed not available"); - } finally { - findIsFileClosedMeth = false; - } - } + Thread.sleep(recoveryPause); if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) { - recovered = true; - break; + return true; } checkIfCancelled(reporter); } @@ -140,7 +147,7 @@ throw iioe; } } - return recovered; + return false; } boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout, @@ -164,7 +171,7 @@ * @return True if dfs#recoverLease came by true. * @throws FileNotFoundException */ - boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p, + public boolean recoverLease(final DistributedFileSystem dfs, final int nbAttempt, final Path p, final long startWaiting) throws FileNotFoundException { boolean recovered = false; Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1549385) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -792,6 +792,13 @@ public static final String LOCALHOST_IP = "127.0.0.1"; + /* + * The following two constants are for passing status of lease recovery attempt from + * SplitLogManager to SplitLogWorker + */ + public static final long LEASE_RECOVERY_UNREQUESTED = 0; + public static final long LEASE_RECOVERY_SUCCEEDED = 1; + /** Conf key that enables unflushed WAL edits directly being replayed to region servers */ public static final String DISTRIBUTED_LOG_REPLAY_KEY = "hbase.master.distributed.log.replay"; public static final boolean DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG = false;