From 45c30b49efaf7034d49049d5cea1022b6272bb47 Mon Sep 17 00:00:00 2001 From: gaodayue Date: Sat, 6 May 2017 05:21:22 +0800 Subject: [PATCH] KYLIN-2619 Use newCachedThreadPool instead of newFixedThreadPool in Broadcaster --- .../main/java/org/apache/kylin/common/restclient/RestClient.java | 6 +++++- .../main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java | 7 +++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java index 33a4e7a5c..fb0527424 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java @@ -113,7 +113,7 @@ public class RestClient { } } - public void wipeCache(String entity, String event, String cacheKey) throws IOException { + public synchronized void wipeCache(String entity, String event, String cacheKey) throws IOException { String url = baseUrl + "/cache/" + entity + "/" + cacheKey + "/" + event; HttpPut request = new HttpPut(url); @@ -287,4 +287,8 @@ public class RestClient { return result.toString(); } + @Override + public String toString() { + return String.format("RestClient(%s:%d)", host, port); + } } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java index 1394f7b74..4f691e042 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java @@ -35,6 +35,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.restclient.RestClient; import org.apache.kylin.common.util.DaemonThreadFactory; +import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.metadata.project.ProjectManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,7 +117,7 @@ public class Broadcaster { for (String node : config.getRestServers()) { restClients.add(new RestClient(node)); } - final ExecutorService wipingCachePool = Executors.newFixedThreadPool(restClients.size(), new DaemonThreadFactory()); + final ExecutorService wipingCachePool = Executors.newCachedThreadPool(new DaemonThreadFactory()); while (true) { try { final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst(); @@ -125,8 +126,10 @@ public class Broadcaster { wipingCachePool.execute(new Runnable() { @Override public void run() { - try { + try (SetThreadName ignored = new SetThreadName("CacheWiper %s %s", restClient, broadcastEvent)) { + logger.info("wipe cache start"); restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), broadcastEvent.getCacheKey()); + logger.info("wipe cache success"); } catch (IOException e) { logger.warn("Thread failed during wipe cache at " + broadcastEvent, e); } -- 2.11.0 (Apple Git-81)