From 61974f7db753141a85b38b18bb0bfd78fc4eeba4 Mon Sep 17 00:00:00 2001
From: Tim Armstrong <tarmstrong@cloudera.com>
Date: Mon, 16 Jul 2018 14:01:25 -0700
Subject: [PATCH] Repro CDH-70703

---
 be/src/statestore/statestore.cc | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 680eada..49e0ecb 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -52,7 +52,7 @@ DEFINE_int32(statestore_update_frequency_ms, 2000, "(Advanced) Frequency (in ms)
 
 DEFINE_int32(statestore_num_heartbeat_threads, 10, "(Advanced) Number of threads used to "
     " send heartbeats in parallel to all registered subscribers.");
-DEFINE_int32(statestore_heartbeat_frequency_ms, 1000, "(Advanced) Frequency (in ms) with"
+DEFINE_int32(statestore_heartbeat_frequency_ms, 100, "(Advanced) Frequency (in ms) with"
     " which the statestore sends heartbeat heartbeats to subscribers.");
 
 DEFINE_int32(state_store_port, 24000, "port where StatestoreService is running");
@@ -466,6 +466,12 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
     return Status::OK();
   }
 
+   if (subscriber->id() == "impalad@tarmstrong-box:22002" && response.topic_updates.size() > 0) {
+     LOG(INFO) << "Simulating slow update for " << subscriber->id();
+     SleepForMs(15000);
+     LOG(INFO) << "Update complete";
+   }
+
   // At this point the updates are assumed to have been successfully processed by the
   // subscriber. Update the subscriber's max version of each topic.
   for (const auto& topic_delta: update_state_request.topic_deltas) {
@@ -483,7 +489,7 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
         continue;
       }
 
-      VLOG_RPC << "Received update for topic " << update.topic_name
+      VLOG_QUERY << "Received update for topic " << update.topic_name
                << " from  " << subscriber->id() << ", number of entries: "
                << update.topic_entries.size();
 
@@ -499,6 +505,7 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
       Topic* topic = &topic_it->second;
       for (const TTopicItem& item: update.topic_entries) {
+        LOG(INFO) << "Add transient " << item.key << " " << update.topic_name << " " << subscriber->id();
         subscriber->AddTransientUpdate(update.topic_name, item.key,
             topic->Put(item.key, item.value, item.deleted));
       }
@@ -712,7 +719,10 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
     // message.
     SubscriberMap::iterator it = subscribers_.find(update.subscriber_id);
     if (it == subscribers_.end() ||
-        it->second->registration_id() != update.registration_id) return;
+        it->second->registration_id() != update.registration_id) {
+      LOG(INFO) << "Not found";
+      return;
+    }
     if (!status.ok()) {
       LOG(INFO) << "Unable to send " << hb_type << " message to subscriber "
                 << update.subscriber_id << ", received error: " << status.GetDetail();
-- 
2.7.4

