diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index 3f46fc0..3bcce32 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -372,7 +372,8 @@ Status RaftConsensus::BecomeLeaderUnlocked() { state_->GetActiveQuorumStateUnlocked().majority_size); // Create the peers so that we're able to replicate messages remotely and locally - RETURN_NOT_OK(peer_manager_->UpdateQuorum(state_->GetPendingQuorumUnlocked())); + RETURN_NOT_OK_PREPEND(peer_manager_->UpdateQuorum(state_->GetPendingQuorumUnlocked()), + "Unable to create peers"); // Initiate a config change transaction. This is mostly acting as the NO_OP // transaction that is sent at the beginning of every term change in raft. @@ -393,9 +394,11 @@ Status RaftConsensus::BecomeLeaderUnlocked() { gscoped_ptr round( new ConsensusRound(this, make_scoped_refptr(new RefCountedReplicate(replicate)))); - RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round.get())); - RETURN_NOT_OK(state_->GetReplicaTransactionFactoryUnlocked()->StartReplicaTransaction( - round.Pass())); + RETURN_NOT_OK_PREPEND(AppendNewRoundToQueueUnlocked(round.get()), + "Unable to append config change upon becoming leader"); + RETURN_NOT_OK_PREPEND(state_->GetReplicaTransactionFactoryUnlocked()->StartReplicaTransaction( + round.Pass()), + "Unable to start config change upon becoming leader"); return Status::OK(); } @@ -505,7 +508,14 @@ void RaftConsensus::UpdateMajorityReplicatedUnlocked(const OpId& majority_replic void RaftConsensus::NotifyTermChange(uint64_t term) { ReplicaState::UniqueLock lock; - CHECK_OK(state_->LockForConfigChange(&lock)); + Status s = state_->LockForConfigChange(&lock); + if (!s.ok()) { + DCHECK(state_->IsShuttingDown()); + LOG_WITH_PREFIX_LK(WARNING) << "Ignoring term change to term " << term + << " when already shutting down"; + return; + } + WARN_NOT_OK(HandleTermAdvanceUnlocked(term), "Couldn't advance consensus term."); } @@ -1253,7 +1263,8 @@ void RaftConsensus::ElectionCallback(const ElectionResult& result) { ReplicaState::UniqueLock lock; Status s = state_->LockForConfigChange(&lock); if (PREDICT_FALSE(!s.ok())) { - LOG_WITH_PREFIX(INFO) << "Received election callback for term " + DCHECK(state_->IsShuttingDown()) << "unexpected state: " << s.ToString(); + LOG_WITH_PREFIX_LK(INFO) << "Received election callback for term " << result.election_term << " while not running: " << s.ToString(); return; diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc index 1d1135f..35783ee 100644 --- a/src/kudu/consensus/raft_consensus_state.cc +++ b/src/kudu/consensus/raft_consensus_state.cc @@ -604,6 +604,13 @@ ReplicaState::State ReplicaState::state() const { return state_; } +bool ReplicaState::IsShuttingDown() const { + ReplicaState::UniqueLock lock; + CHECK_OK(LockForRead(&lock)); + return state_ == kShuttingDown || state_ == kShutDown; +} + + string ReplicaState::ToString() const { ReplicaState::UniqueLock lock(&update_lock_); return ToStringUnlocked(); diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h index 880c29c..90e91b1 100644 --- a/src/kudu/consensus/raft_consensus_state.h +++ b/src/kudu/consensus/raft_consensus_state.h @@ -96,7 +96,7 @@ class ReplicaState { public: enum State { // State after the replica is built. - kInitialized, + kInitialized = 0, // State signaling the replica accepts requests (from clients // if leader, from leader if follower) @@ -323,6 +323,8 @@ class ReplicaState { // The update_lock_ must be held. ReplicaState::State state() const; + bool IsShuttingDown() const; + private: // Helper method to update the active quorum state for peers, etc. void ResetActiveQuorumStateUnlocked(const metadata::QuorumPB& quorum); diff --git a/src/kudu/integration-tests/create-table-stress-test.cc b/src/kudu/integration-tests/create-table-stress-test.cc index 1fcc647..dc3fd06 100644 --- a/src/kudu/integration-tests/create-table-stress-test.cc +++ b/src/kudu/integration-tests/create-table-stress-test.cc @@ -27,8 +27,9 @@ using kudu::master::MasterServiceProxy; DECLARE_int32(heartbeat_interval_ms); DECLARE_bool(log_preallocate_segments); -DEFINE_int32(num_test_tablets, 100, "Number of tablets for stress test"); +DEFINE_int32(num_test_tablets, 60, "Number of tablets for stress test"); DECLARE_bool(use_hybrid_clock); +DECLARE_bool(enable_leader_failure_detection); DECLARE_int32(max_clock_sync_error_usec); namespace kudu { @@ -56,6 +57,8 @@ class CreateTableStressTest : public KuduTest { // Increase the max error tolerance to 10 seconds. FLAGS_max_clock_sync_error_usec = 10000000; + FLAGS_enable_leader_failure_detection = true; + // Don't preallocate log segments, since we're creating thousands // of tablets here. If each preallocates 64M or so, we use // a ton of disk space in this test, and it fails on normal @@ -65,7 +68,9 @@ class CreateTableStressTest : public KuduTest { FLAGS_log_preallocate_segments = false; KuduTest::SetUp(); - cluster_.reset(new MiniCluster(env_.get(), MiniClusterOptions())); + MiniClusterOptions opts; + opts.num_tablet_servers = 3; + cluster_.reset(new MiniCluster(env_.get(), opts)); ASSERT_STATUS_OK(cluster_->Start()); ASSERT_STATUS_OK(KuduClientBuilder() @@ -95,6 +100,7 @@ void CreateTableStressTest::CreateBigTable(const string& table_name, int num_tab ASSERT_STATUS_OK(client_->NewTableCreator() ->table_name(table_name) + .num_replicas(3) .schema(&schema_) .split_keys(keys) .wait(false)