From efd904813baa4b489665647899195395446fed32 Mon Sep 17 00:00:00 2001 From: shenhua Date: Wed, 7 Nov 2018 21:40:52 -0800 Subject: [PATCH] zookeeper cache sync --- core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java | 2 ++ core-common/src/main/java/org/apache/kylin/common/util/CuratorFrameworkSingleton.java | 184 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------- server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java | 30 ++++++++++++++++++++++++++++++ 8 files changed, 285 insertions(+), 48 deletions(-) create mode 100644 core-common/src/main/java/org/apache/kylin/common/util/CuratorFrameworkSingleton.java diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java index 93f5e19..0d7b121 100644 --- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java +++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java @@ -64,6 +64,7 @@ public class RestClient { private int httpConnectionTimeoutMs = 30000; private int httpSocketTimeoutMs = 120000; + private static final String rest_prot = KylinConfig.getInstanceFromEnv().getRestPort(); public static final String SCHEME_HTTP = "http://"; @@ -100,6 +101,7 @@ public class RestClient { String host = m.group(3); String portStr = m.group(4); int port = Integer.parseInt(portStr == null ? "7070" : portStr); +// int port = Integer.parseInt(rest_prot == null ? "7070" : rest_prot); if (httpConnectionTimeoutMs != null) this.httpConnectionTimeoutMs = httpConnectionTimeoutMs; diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CuratorFrameworkSingleton.java b/core-common/src/main/java/org/apache/kylin/common/util/CuratorFrameworkSingleton.java new file mode 100644 index 0000000..ef53352 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/util/CuratorFrameworkSingleton.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common.util; + +import java.lang.management.ManagementFactory; +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; +import java.util.Set; + +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.Query; +import javax.management.QueryExp; +import javax.management.ReflectionException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.kylin.common.KylinConfig; +import org.apache.zookeeper.CreateMode; + + +public class CuratorFrameworkSingleton { + + public static KylinConfig kylinConfig; + private static CuratorFramework sharedClient = null; + private static String restNode = "rest_server"; + static final Log LOG = LogFactory.getLog("CuratorFrameworkSingleton"); + static{ + synchronized(CuratorFrameworkSingleton.class){ + if (sharedClient == null) { + kylinConfig = KylinConfig.getInstanceFromEnv(); + String connectString = kylinConfig.getZookeeperConnectString(); + sharedClient = CuratorFrameworkFactory.builder().connectString(connectString) + .sessionTimeoutMs(6000) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + sharedClient.start(); + } + } + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + closeAndReleaseInstance(); + } + }); + } + + public static synchronized CuratorFramework getInstance() { + if (sharedClient == null) { + kylinConfig = KylinConfig.getInstanceFromEnv(); + String connectString = kylinConfig.getZookeeperConnectString(); + LOG.info("zookeepr server info :" + connectString); + sharedClient = CuratorFrameworkFactory.builder().connectString(connectString) + .sessionTimeoutMs(6000) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + sharedClient.start(); + } + return sharedClient; + } + + public static synchronized void closeAndReleaseInstance() { + if (sharedClient != null) { + sharedClient.close(); + sharedClient = null; + String shutdownMsg = "Closing ZooKeeper client."; + LOG.info(shutdownMsg); + } + } + + public static InetAddress getFirstNonLoopbackAddress(boolean preferIpv4, boolean preferIPv6) throws SocketException { + Enumeration en = NetworkInterface.getNetworkInterfaces(); + while (en.hasMoreElements()) { + NetworkInterface i = (NetworkInterface) en.nextElement(); + for (Enumeration en2 = i.getInetAddresses(); en2.hasMoreElements();) { + InetAddress addr = (InetAddress) en2.nextElement(); + if (!addr.isLoopbackAddress()) { + if (addr instanceof Inet4Address) { + if (preferIPv6) { + continue; + } + return addr; + } + if (addr instanceof Inet6Address) { + if (preferIpv4) { + continue; + } + return addr; + } + } + } + } + return null; + } + + public static String port() throws AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException, MalformedObjectNameException { + MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer(); + final QueryExp queryExp = Query.and(Query.eq(Query.attr("protocol"), Query.value("HTTP/1.1")), + Query.eq(Query.attr("scheme"), Query.value("http"))); + Set objectNames = beanServer.queryNames(new ObjectName("*:type=Connector,*"), queryExp); + ObjectName objectName = objectNames.iterator().next(); + String port = objectName.getKeyProperty("port"); + return port; + } + + public static void registerServer() throws Exception{ + String zkRootNode = kylinConfig.getZookeeperBasePath(); + String kylinCluster = kylinConfig.getClusterName(); + InetAddress address = getFirstNonLoopbackAddress(true, false); + String port = port(); + String serverNode = zkRootNode + "/" + kylinCluster + "/" + restNode + "/" + address.getHostAddress() + ":" + port; + LOG.info("server register zk, znode is :" + serverNode); + if(sharedClient.checkExists().forPath(serverNode) != null){ + sharedClient.delete().deletingChildrenIfNeeded().forPath(serverNode); + sharedClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(serverNode); + }else{ + sharedClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(serverNode); + } + } + + public static void updateData() throws Exception{ + String zkRootNode = kylinConfig.getZookeeperBasePath(); + String kylinCluster = kylinConfig.getClusterName(); + InetAddress address = getFirstNonLoopbackAddress(true, false); + String port = port(); + String serverNode = zkRootNode + "/" + kylinCluster + "/" + restNode + "/" + address.getHostAddress() + ":" + port; + if(sharedClient.checkExists().forPath(serverNode)!=null) + sharedClient.setData().forPath(serverNode, address.getHostAddress().getBytes()); + } + + public static List listServer() throws Exception{ + String zkRootNode = kylinConfig.getZookeeperBasePath(); + String kylinCluster = kylinConfig.getClusterName(); + String serverParentNode = zkRootNode + "/" + kylinCluster + "/" + restNode; + List nodes = sharedClient.getChildren().forPath(serverParentNode); + LOG.info("server register zk, znodes are :" + nodes); + return nodes; + } + + public static List listAliveServer(long time) throws Exception{ + List aliveNodes = new ArrayList(); + String zkRootNode = kylinConfig.getZookeeperBasePath(); + String kylinCluster = kylinConfig.getClusterName(); + String serverParentNode = zkRootNode + "/" + kylinCluster + "/" + restNode; + List nodes = sharedClient.getChildren().forPath(serverParentNode); + long currentTime = System.currentTimeMillis(); + for(String node: nodes){ + String childNode = serverParentNode + "/" + node; + long mtime = sharedClient.checkExists().forPath(childNode).getMtime(); + if(currentTime - mtime < time){ + aliveNodes.add(node); + } + } + LOG.info("server register zk, alive znodes are :" + aliveNodes); + return aliveNodes; + } +} \ No newline at end of file diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java index 6462a27..ae2ab24 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java @@ -36,6 +36,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.restclient.RestClient; import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.CuratorFrameworkSingleton; import org.apache.kylin.common.util.DaemonThreadFactory; import org.apache.kylin.metadata.project.ProjectManager; import org.slf4j.Logger; @@ -65,6 +66,7 @@ public class Broadcaster { public static final String SYNC_PRJ_SCHEMA = "project_schema"; // the special entity to indicate project schema has change, e.g. table/model/cube_desc update public static final String SYNC_PRJ_DATA = "project_data"; // the special entity to indicate project data has change, e.g. cube/raw_table update public static final String SYNC_PRJ_ACL = "project_acl"; // the special entity to indicate query ACL has change, e.g. table_acl/learn_kylin update + private static final long difftime = 10000; public static Broadcaster getInstance(KylinConfig config) { return config.getManager(Broadcaster.class); @@ -82,6 +84,7 @@ public class Broadcaster { private KylinConfig config; private ExecutorService announceMainLoop; private ExecutorService announceThreadPool; + private ExecutorService maydeadNodeAnnounceThreadPool; private SyncErrorHandler syncErrorHandler; private BlockingDeque broadcastEvents = new LinkedBlockingDeque<>(); private Map> listenerMap = Maps.newConcurrentMap(); @@ -93,6 +96,8 @@ public class Broadcaster { this.announceMainLoop = Executors.newSingleThreadExecutor(new DaemonThreadFactory()); this.announceThreadPool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new DaemonThreadFactory()); + this.maydeadNodeAnnounceThreadPool = new ThreadPoolExecutor(1, 10, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue(), new DaemonThreadFactory()); final String[] nodes = config.getRestServers(); if (nodes == null || nodes.length < 1) { @@ -103,53 +108,67 @@ public class Broadcaster { announceMainLoop.execute(new Runnable() { @Override public void run() { - final Map restClientMap = Maps.newHashMap(); - - while (!announceThreadPool.isShutdown()) { - try { - final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst(); - - String[] restServers = config.getRestServers(); - logger.debug("Servers in the cluster: " + Arrays.toString(restServers)); - for (final String node : restServers) { - if (restClientMap.containsKey(node) == false) { - restClientMap.put(node, new RestClient(node)); - } - } - - String toWhere = broadcastEvent.getTargetNode(); - if (toWhere == null) - toWhere = "all"; - logger.debug("Announcing new broadcast to " + toWhere + ": " + broadcastEvent); - - for (final String node : restServers) { - if (!(toWhere.equals("all") || toWhere.equals(node))) - continue; - - announceThreadPool.execute(new Runnable() { - @Override - public void run() { - RestClient restClient = restClientMap.get(node); - try { - restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), - broadcastEvent.getCacheKey()); - } catch (IOException e) { - logger.error( - "Announce broadcast event failed, targetNode {} broadcastEvent {}, error msg: {}", - node, broadcastEvent, e); - syncErrorHandler.handleAnnounceError(node, restClient, broadcastEvent); - } - } - }); - } - } catch (Exception e) { - logger.error("error running wiping", e); - } + syncCache(); + } + }); + } + + private void syncCache(){ + boolean flag = true; + while (flag) { + try { + final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst(); + List allServers = CuratorFrameworkSingleton.listServer(); + final List aliveServers = CuratorFrameworkSingleton.listAliveServer(difftime); + syncCacheToPeer(broadcastEvent, announceThreadPool, aliveServers); + allServers.removeAll(aliveServers); + final List maybeDaedServers = allServers; + syncCacheToPeer(broadcastEvent, maydeadNodeAnnounceThreadPool, maybeDaedServers); + if(announceThreadPool.isShutdown()){ + flag = false; } + } catch (Exception e) { + logger.error("broadcast error , error running wiping", e); } - }); + } } + private void syncCacheToPeer(final BroadcastEvent broadcastEvent, ExecutorService es, List servers){ + final Map restClientMap = Maps.newHashMap(); + logger.debug("Servers in the cluster: " + servers); + for (final String node : servers) { + if (restClientMap.containsKey(node) == false) { + restClientMap.put(node, new RestClient(node)); + } + } + String toWhere = broadcastEvent.getTargetNode(); + if (toWhere == null) + toWhere = "all"; + logger.debug("Announcing new broadcast to " + toWhere + ": " + broadcastEvent); + + for (final String node : servers) { + if (!(toWhere.equals("all") || toWhere.equals(node))) + continue; + + es.execute(new Runnable() { + @Override + public void run() { + RestClient restClient = restClientMap.get(node); + try { + restClient.wipeCache(broadcastEvent.getEntity(), broadcastEvent.getEvent(), + broadcastEvent.getCacheKey()); + logger.info("Success: Announcing new broadcast to " + node); + } catch (IOException e) { + logger.error("Failed: Announcing new broadcast to " + node); + logger.error( + "Announce broadcast event failed, targetNode {} broadcastEvent {}, error msg: {}", + node, broadcastEvent, e); + syncErrorHandler.handleAnnounceError(node, restClient, broadcastEvent); + } + } + }); + } + } private SyncErrorHandler getSyncErrorHandler(KylinConfig config) { String clzName = config.getCacheSyncErrorHandler(); if (StringUtils.isEmpty(clzName)) { diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java index 467ef82..151aa18 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java +++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java @@ -20,6 +20,7 @@ package org.apache.kylin.rest.init; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.CuratorFrameworkSingleton; import org.apache.kylin.rest.metrics.QueryMetrics2Facade; import org.apache.kylin.rest.metrics.QueryMetricsFacade; import org.slf4j.Logger; @@ -32,6 +33,7 @@ import org.springframework.beans.factory.InitializingBean; public class InitialTaskManager implements InitializingBean { private static final Logger logger = LoggerFactory.getLogger(InitialTaskManager.class); + private static final int REGISTERINTERVAL = 5000; @Override public void afterPropertiesSet() throws Exception { @@ -48,6 +50,9 @@ public class InitialTaskManager implements InitializingBean { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); String initTasks = kylinConfig.getInitTasks(); + Runnable run = new Node(); + Thread registerRunner = new Thread(run); + registerRunner.start(); if (!StringUtils.isEmpty(initTasks)) { String[] taskClasses = initTasks.split(","); for (String taskClass : taskClasses) { @@ -62,4 +67,29 @@ public class InitialTaskManager implements InitializingBean { logger.info("All initial tasks finished."); } } + + public class Node implements Runnable{ + + @Override + public void run() { + logger.info("server register start..."); + CuratorFrameworkSingleton.getInstance(); + try { + CuratorFrameworkSingleton.registerServer(); + } catch (Exception e) { + logger.error("kylin server register zookeeper failed:", e); + } + while(true) { + try { + Thread.sleep(REGISTERINTERVAL); + CuratorFrameworkSingleton.updateData(); + } catch (InterruptedException e) { + logger.error("thread sleep failed:", e); + } catch (Exception e) { + logger.error("kylin server update zookeeper node data failed:", e); + } + } + } + } + }