From 467b8f5fae0d57fd536cef44162df5528daab744 Mon Sep 17 00:00:00 2001 From: wuxinqiang Date: Fri, 11 Nov 2022 14:37:23 +0800 Subject: [PATCH] introducing redis as a distributed cache for kylin --- cache/pom.xml | 5 + .../AbstractRemoteCacheManager.java | 29 +++ .../cachemanager/MultiLevelCacheManager.java | 135 +++++++++++ .../cache/cachemanager/RedisManager.java | 191 +++++++++++++++ .../RemoteLocalFailOverCacheManager.java | 4 +- .../cache/redis/AbstractRedisClient.java | 226 ++++++++++++++++++ .../apache/kylin/cache/redis/RedisClient.java | 33 +++ .../cache/redis/RedisClientTypeEnum.java | 22 ++ .../apache/kylin/cache/redis/RedisConfig.java | 145 +++++++++++ .../cache/redis/jedis/JedisPoolClient.java | 94 ++++++++ .../cache/redis/JedisPoolClientTest.java | 76 ++++++ pom.xml | 5 + .../src/main/resources/applicationContext.xml | 18 ++ 13 files changed, 981 insertions(+), 2 deletions(-) create mode 100644 cache/src/main/java/org/apache/kylin/cache/cachemanager/AbstractRemoteCacheManager.java create mode 100644 cache/src/main/java/org/apache/kylin/cache/cachemanager/MultiLevelCacheManager.java create mode 100644 cache/src/main/java/org/apache/kylin/cache/cachemanager/RedisManager.java create mode 100644 cache/src/main/java/org/apache/kylin/cache/redis/AbstractRedisClient.java create mode 100644 cache/src/main/java/org/apache/kylin/cache/redis/RedisClient.java create mode 100644 cache/src/main/java/org/apache/kylin/cache/redis/RedisClientTypeEnum.java create mode 100644 cache/src/main/java/org/apache/kylin/cache/redis/RedisConfig.java create mode 100644 cache/src/main/java/org/apache/kylin/cache/redis/jedis/JedisPoolClient.java create mode 100644 cache/src/test/java/org/apache/kylin/cache/redis/JedisPoolClientTest.java diff --git a/cache/pom.xml b/cache/pom.xml index 27997b73d..b93a12cd4 100644 --- a/cache/pom.xml +++ b/cache/pom.xml @@ -69,6 +69,11 @@ spymemcached + + redis.clients + jedis + + junit diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/AbstractRemoteCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/AbstractRemoteCacheManager.java new file mode 100644 index 000000000..8b9f1935f --- /dev/null +++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/AbstractRemoteCacheManager.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.cache.cachemanager; + +import org.springframework.cache.support.AbstractCacheManager; + + +public abstract class AbstractRemoteCacheManager extends AbstractCacheManager { + + public abstract boolean isClusterDown(); + + abstract void setClusterHealth(boolean health); + +} diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/MultiLevelCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/MultiLevelCacheManager.java new file mode 100644 index 000000000..7592bcddf --- /dev/null +++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/MultiLevelCacheManager.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cache.cachemanager; + +import org.apache.kylin.shaded.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.cache.support.AbstractCacheManager; +import java.util.Collection; +import java.util.concurrent.Callable; + +public class MultiLevelCacheManager extends AbstractCacheManager { + + private static final Logger logger = LoggerFactory.getLogger(MultiLevelCacheManager.class); + + @Autowired + private AbstractRemoteCacheManager remoteCacheManager; + + @Autowired + private CacheManager localCacheManager; + + @Override + protected Collection loadCaches() { + Cache successCache = new MultiLevelCacheManager.MultiLevelCacheAdaptor(remoteCacheManager, localCacheManager, CacheConstants.QUERY_CACHE); + addCache(successCache); + + Collection names = getCacheNames(); + Collection caches = Lists.newArrayList(); + for (String name : names) { + caches.add(getCache(name)); + } + return caches; + } + + public static class MultiLevelCacheAdaptor implements Cache { + + private AbstractRemoteCacheManager remoteCacheManager; + private CacheManager localCacheManager; + private String name; + + public MultiLevelCacheAdaptor(AbstractRemoteCacheManager remoteCacheManager, CacheManager localCacheManager, String name) { + this.remoteCacheManager = remoteCacheManager; + this.localCacheManager = localCacheManager; + this.name = name; + } + + @Override + public String getName() { + return name; + } + + @Override + public Object getNativeCache() { + throw new UnsupportedOperationException(); + } + + @Override + public ValueWrapper get(Object key) { + ValueWrapper localResult = localCacheManager.getCache(name).get(key); + if (localResult != null) { + return localResult; + } + if (!remoteCacheManager.isClusterDown()) { + return remoteCacheManager.getCache(name).get(key); + } + return null; + } + + @Override + public T get(Object key, Class type) { + T localResult = localCacheManager.getCache(name).get(key, type); + if (localResult != null) { + return localResult; + } + if (!remoteCacheManager.isClusterDown()) { + return remoteCacheManager.getCache(name).get(key, type); + } + return null; + } + + @Override + public T get(Object key, Callable valueLoader) { + throw new UnsupportedOperationException(); + } + + @Override + public void put(Object key, Object value) { + localCacheManager.getCache(name).put(key, value); + remoteCacheManager.getCache(name).put(key, value); + } + + @Override + public void evict(Object key) { + remoteCacheManager.getCache(name).evict(key); + localCacheManager.getCache(name).evict(key); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override + public ValueWrapper putIfAbsent(Object key, Object value) { + ValueWrapper existing = get(key); + if (existing == null) { + put(key, value); + return null; + } else { + return existing; + } + } + + } + +} diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RedisManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RedisManager.java new file mode 100644 index 000000000..27bf01a11 --- /dev/null +++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RedisManager.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.cache.cachemanager; + +import org.apache.commons.lang3.SerializationUtils; +import org.apache.kylin.cache.redis.RedisClient; +import org.apache.kylin.cache.redis.RedisConfig; +import org.apache.kylin.cache.redis.jedis.JedisPoolClient; +import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.kylin.shaded.com.google.common.collect.Lists; +import org.apache.kylin.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.Cache; +import org.springframework.cache.support.SimpleValueWrapper; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.kylin.cache.redis.RedisClientTypeEnum.JEDIS_POOL; + + +public class RedisManager extends AbstractRemoteCacheManager { + + private static final Logger logger = LoggerFactory.getLogger(RedisManager.class); + private static final Long ONE_MINUTE = 60 * 1000L; + + @Autowired + private RedisConfig redisConfig; + + private ScheduledExecutorService timer = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setNameFormat("Redis-HealthChecker").build()); + private AtomicBoolean clusterHealth = new AtomicBoolean(true); + + @Override + protected Collection loadCaches() { + Cache successCache = new RedisManager.RedisCacheAdaptor(createRedisClient(redisConfig), CacheConstants.QUERY_CACHE); + + addCache(successCache); + + Collection names = getCacheNames(); + Collection caches = Lists.newArrayList(); + for (String name : names) { + caches.add(getCache(name)); + } + timer.scheduleWithFixedDelay(new RedisManager.RedisClusterHealthChecker(), ONE_MINUTE, ONE_MINUTE, + TimeUnit.MILLISECONDS); + return caches; + } + + public static class RedisCacheAdaptor implements Cache { + + private RedisClient redisClient; + private String name; + + public RedisCacheAdaptor(RedisClient redisClient, String name) { + this.redisClient = redisClient; + this.name = name; + } + + @Override + public String getName() { + return name; + } + + @Override + public Object getNativeCache() { + return redisClient; + } + + @Override + public ValueWrapper get(Object key) { + byte[] value = redisClient.get(key); + if (value == null) { + return null; + } + return new SimpleValueWrapper(SerializationUtils.deserialize(value)); + } + + @Override + public T get(Object key, Class type) { + byte[] value = redisClient.get(key); + if (value == null) { + return null; + } + Object obj = SerializationUtils.deserialize(value); + if (obj != null && type != null && !type.isInstance(value)) { + throw new IllegalStateException( + "Cached value is not of required type [" + type.getName() + "]: " + value); + } + return (T) obj; + } + + @Override + public T get(Object key, Callable valueLoader) { + throw new UnsupportedOperationException(); + } + + @Override + public void put(Object key, Object value) { + redisClient.put(key, value); + } + + @Override + public ValueWrapper putIfAbsent(Object key, Object value) { + byte[] existing = redisClient.get(key); + if (existing == null) { + redisClient.put(key, value); + return null; + } else { + return new SimpleValueWrapper(SerializationUtils.deserialize(existing)); + } + } + + @Override + public void evict(Object key) { + redisClient.del(key); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + } + + protected RedisClient createRedisClient(RedisConfig redisConfig) { + RedisClient redisClient = null; + if (redisConfig.getRedisClientType() == JEDIS_POOL) { + redisClient = new JedisPoolClient(redisConfig); + } + assert redisClient != null; + return redisClient; + } + + @Override + public boolean isClusterDown() { + return !clusterHealth.get(); + } + + @VisibleForTesting + void setClusterHealth(boolean ifHealth) { + clusterHealth.set(ifHealth); + } + + private class RedisClusterHealthChecker implements Runnable { + @Override + public void run() { + Cache cache = getCache(CacheConstants.QUERY_CACHE); + RedisClient redisClient = (RedisClient) cache.getNativeCache(); + clusterHealth.set(isConnected(redisClient)); + } + + private boolean isConnected(RedisClient redisClient) { + int success = 0; + int total = 5; + for (int i = 0; i < total; i++) { + if (redisClient.ping()) { + success++; + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + logger.warn("RedisClusterHealthChecker sleep is Interrupted"); + } + } + boolean connected = success * 2 + 1 > total; + logger.info("RedisClusterHealthChecker, connected:{}, success:{}, total:{}", connected, success, total); + return connected; + } + } + +} diff --git a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java index aae0d7c62..bb30c52d5 100644 --- a/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java +++ b/cache/src/main/java/org/apache/kylin/cache/cachemanager/RemoteLocalFailOverCacheManager.java @@ -34,7 +34,7 @@ public class RemoteLocalFailOverCacheManager extends AbstractCacheManager { private static final Logger logger = LoggerFactory.getLogger(RemoteLocalFailOverCacheManager.class); @Autowired - private MemcachedCacheManager remoteCacheManager; + private AbstractRemoteCacheManager remoteCacheManager; @Autowired private CacheManager localCacheManager; @@ -70,7 +70,7 @@ public class RemoteLocalFailOverCacheManager extends AbstractCacheManager { } @VisibleForTesting - MemcachedCacheManager getRemoteCacheManager() { + AbstractRemoteCacheManager getRemoteCacheManager() { return remoteCacheManager; } } \ No newline at end of file diff --git a/cache/src/main/java/org/apache/kylin/cache/redis/AbstractRedisClient.java b/cache/src/main/java/org/apache/kylin/cache/redis/AbstractRedisClient.java new file mode 100644 index 000000000..e9215c787 --- /dev/null +++ b/cache/src/main/java/org/apache/kylin/cache/redis/AbstractRedisClient.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.cache.redis; + +import com.codahale.metrics.Gauge; +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.directory.api.util.Strings; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.CompressionUtils; +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.shaded.com.google.common.base.Charsets; +import org.apache.kylin.shaded.com.google.common.base.Joiner; +import org.apache.kylin.shaded.com.google.common.primitives.Ints; +import org.apache.kylin.shaded.com.google.common.primitives.Shorts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.zip.DataFormatException; +import static com.codahale.metrics.MetricRegistry.name; +import static org.apache.kylin.metrics.lib.impl.MetricsSystem.Metrics; + + +public abstract class AbstractRedisClient implements RedisClient { + + private static final Logger logger = LoggerFactory.getLogger(AbstractRedisClient.class); + + protected final RedisConfig config; + protected final String redisPrefix; + protected final int compressThreshold; + protected final int timeToLiveSeconds; + + protected final AtomicLong hitCount = new AtomicLong(0); + protected final AtomicLong missCount = new AtomicLong(0); + protected final AtomicLong readBytes = new AtomicLong(0); + protected final AtomicLong errorCount = new AtomicLong(0); + protected final AtomicLong putCount = new AtomicLong(0); + protected final AtomicLong putBytes = new AtomicLong(0); + protected final AtomicLong delCount = new AtomicLong(0); + + public AbstractRedisClient(RedisConfig redisConfig) { + this.config = redisConfig; + compressThreshold = redisConfig.getMaxObjectSize() / 2; + redisPrefix = redisConfig.getPrefix(); + timeToLiveSeconds = redisConfig.getTtl(); + + Map metricsConfig = KylinConfig.getInstanceFromEnv().getKylinMetricsConf(); + if ("true".equalsIgnoreCase(metricsConfig.get("redis.enabled"))) { + final String prefix = name(config.getRedisClientType().name(), redisPrefix); + Metrics.register(name(prefix, "hits"), new Gauge() { + @Override + public Long getValue() { + return hitCount.longValue(); + } + }); + + Metrics.register(name(prefix, "misses"), new Gauge() { + @Override + public Long getValue() { + return missCount.longValue(); + } + }); + + Metrics.register(name(prefix, "eviction-count"), new Gauge() { + @Override + public Long getValue() { + return delCount.longValue(); + } + }); + + Metrics.register(name(prefix, "put-count"), new Gauge() { + @Override + public Long getValue() { + return putCount.longValue(); + } + }); + } + } + + public abstract byte[] internalGet(String key); + public abstract void internalPut(String hashedKey, byte[] encodedValue, int expiration); + public abstract void internalDel(String key); + + @Override + public void put(Object key, Object value) { + String keyString = serializeKey(key); + if (Strings.isEmpty(keyString)) { + return; + } + String hashKey = computeKeyHash(keyString); + byte[] encodedValue = encodeValue(keyString.getBytes(Charsets.UTF_8), serializeValue(value)); + if (encodedValue.length > config.getMaxObjectSize()) { + logger.info("AbstractRedisClient#put, value oversize, value size:{}, maxObjectSize:{}", encodedValue.length, config.getMaxObjectSize()); + return; + } + internalPut(hashKey, encodedValue, timeToLiveSeconds); + putCount.incrementAndGet(); + putBytes.addAndGet(encodedValue.length); + } + + @Override + public byte[] get(Object key) { + String keyString = serializeKey(key); + if (Strings.isEmpty(keyString)) { + return null; + } + byte[] result = internalGet(computeKeyHash(keyString)); + if (result == null) { + missCount.incrementAndGet(); + } else { + hitCount.incrementAndGet(); + readBytes.addAndGet(result.length); + } + return decodeValue(keyString.getBytes(Charsets.UTF_8), result); + } + + @Override + public void del(Object key) { + if (key == null) { + return; + } + String keyString = computeKeyHash(serializeKey(key)); + if (Strings.isEmpty(keyString)) { + return; + } + internalDel(keyString); + delCount.incrementAndGet(); + } + + @Override + public String getName(){ + return redisPrefix; + } + + protected String serializeKey(Object key) { + try { + return JsonUtil.writeValueAsString(key); + } catch (JsonProcessingException e) { + logger.warn("Can not convert key to String.", e); + } + return null; + } + + protected byte[] serializeValue(Object value) { + return SerializationUtils.serialize((Serializable) value); + } + + protected byte[] encodeValue(byte[] key, byte[] valueB) { + byte[] compressed = null; + if (config.isEnableCompression() && (valueB.length + Ints.BYTES + key.length > compressThreshold)) { + try { + compressed = CompressionUtils.compress(ByteBuffer.allocate(Ints.BYTES + key.length + valueB.length) + .putInt(key.length).put(key).put(valueB).array()); + } catch (IOException e) { + compressed = null; + logger.warn("Compressing value bytes error.", e); + } + } + if (compressed != null) { + return ByteBuffer.allocate(Shorts.BYTES + compressed.length).putShort((short) 1).put(compressed).array(); + } else { + return ByteBuffer.allocate(Shorts.BYTES + Ints.BYTES + key.length + valueB.length).putShort((short) 0) + .putInt(key.length).put(key).put(valueB).array(); + } + } + + protected byte[] decodeValue(byte[] key, byte[] valueE) { + if (valueE == null) + return null; + ByteBuffer buf = ByteBuffer.wrap(valueE); + short enableCompression = buf.getShort(); + byte[] uncompressed = null; + if (enableCompression == 1) { + byte[] value = new byte[buf.remaining()]; + buf.get(value); + try { + uncompressed = CompressionUtils.decompress(value); + } catch (IOException | DataFormatException e) { + logger.error("Decompressing value bytes error.", e); + return null; + } + } + if (uncompressed != null) { + buf = ByteBuffer.wrap(uncompressed); + } + final int keyLength = buf.getInt(); + byte[] keyBytes = new byte[keyLength]; + buf.get(keyBytes); + if (!Arrays.equals(keyBytes, key)) { + logger.error("Keys do not match, possible hash collision!, keyBytes:{}, key:{}", Arrays.toString(keyBytes), Arrays.toString(key)); + errorCount.incrementAndGet(); + return null; + } + byte[] value = new byte[buf.remaining()]; + buf.get(value); + return value; + } + + protected String computeKeyHash(String key) { + // hash keys to keep things under 250 characters for redis + return Joiner.on(":").skipNulls().join(redisPrefix, KylinConfig.getInstanceFromEnv().getDeployEnv(), + DigestUtils.shaHex(key)); + } + +} diff --git a/cache/src/main/java/org/apache/kylin/cache/redis/RedisClient.java b/cache/src/main/java/org/apache/kylin/cache/redis/RedisClient.java new file mode 100644 index 000000000..c3509e07e --- /dev/null +++ b/cache/src/main/java/org/apache/kylin/cache/redis/RedisClient.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.cache.redis; + + +public interface RedisClient { + + void put(Object key, Object value); + + byte[] get(Object key); + + void del(Object key); + + boolean ping(); + + String getName(); + +} diff --git a/cache/src/main/java/org/apache/kylin/cache/redis/RedisClientTypeEnum.java b/cache/src/main/java/org/apache/kylin/cache/redis/RedisClientTypeEnum.java new file mode 100644 index 000000000..602660555 --- /dev/null +++ b/cache/src/main/java/org/apache/kylin/cache/redis/RedisClientTypeEnum.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.cache.redis; + +public enum RedisClientTypeEnum { + JEDIS_POOL +} diff --git a/cache/src/main/java/org/apache/kylin/cache/redis/RedisConfig.java b/cache/src/main/java/org/apache/kylin/cache/redis/RedisConfig.java new file mode 100644 index 000000000..e67c33835 --- /dev/null +++ b/cache/src/main/java/org/apache/kylin/cache/redis/RedisConfig.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.cache.redis; + +public class RedisConfig { + + private RedisClientTypeEnum redisClientType = RedisClientTypeEnum.JEDIS_POOL; + + private String appName; + private String host; + private int port; + + private int maxTotal = 50; + private int maxIdle = 20; + private int minIdle = 10; + private long maxWaitMillis = 500; + private int timeout = 500; + private int ttl = 24 * 3600; + private String prefix; + + // whether enable compress the value data or not + private boolean enableCompression = true; + private int maxObjectSize = 1024 * 1024; + + + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public int getMaxTotal() { + return maxTotal; + } + + public void setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + } + + public int getMaxIdle() { + return maxIdle; + } + + public void setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + } + + public int getMinIdle() { + return minIdle; + } + + public void setMinIdle(int minIdle) { + this.minIdle = minIdle; + } + + public long getMaxWaitMillis() { + return maxWaitMillis; + } + + public void setMaxWaitMillis(long maxWaitMillis) { + this.maxWaitMillis = maxWaitMillis; + } + + public boolean isEnableCompression() { + return enableCompression; + } + + public void setEnableCompression(boolean enableCompression) { + this.enableCompression = enableCompression; + } + + public int getMaxObjectSize() { + return maxObjectSize; + } + + public void setMaxObjectSize(int maxObjectSize) { + this.maxObjectSize = maxObjectSize; + } + + public int getTtl() { + return ttl; + } + + public void setTtl(int ttl) { + this.ttl = ttl; + } + + public String getPrefix() { + return prefix; + } + + public void setPrefix(String prefix) { + this.prefix = prefix; + } + + public RedisClientTypeEnum getRedisClientType() { + return redisClientType; + } + + public void setRedisClientType(RedisClientTypeEnum redisClientType) { + this.redisClientType = redisClientType; + } + +} diff --git a/cache/src/main/java/org/apache/kylin/cache/redis/jedis/JedisPoolClient.java b/cache/src/main/java/org/apache/kylin/cache/redis/jedis/JedisPoolClient.java new file mode 100644 index 000000000..4829ea6e2 --- /dev/null +++ b/cache/src/main/java/org/apache/kylin/cache/redis/jedis/JedisPoolClient.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.cache.redis.jedis; + +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.kylin.cache.redis.AbstractRedisClient; +import org.apache.kylin.cache.redis.RedisClient; +import org.apache.kylin.cache.redis.RedisConfig; +import org.apache.kylin.shaded.com.google.common.base.Charsets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; + +public class JedisPoolClient extends AbstractRedisClient implements RedisClient { + + private static final Logger logger = LoggerFactory.getLogger(JedisPoolClient.class); + + private final JedisPool jedisPool; + + public JedisPoolClient(RedisConfig redisConfig) { + super(redisConfig); + logger.info("JedisPoolClient init, redisConfig:{}", redisConfig); + // init jedis config + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); + poolConfig.setMaxIdle(redisConfig.getMaxIdle()); + poolConfig.setMinIdle(redisConfig.getMinIdle()); + poolConfig.setMaxTotal(redisConfig.getMaxTotal()); + poolConfig.setMaxWaitMillis(redisConfig.getMaxWaitMillis()); + jedisPool = new JedisPool(poolConfig, redisConfig.getHost(), redisConfig.getPort(), redisConfig.getTimeout()); + } + + @Override + public void internalPut(String hashedKey, byte[] encodedValue, int expiration) { + logger.debug("JedisPoolClient internalPut, key:{}, value size:{}, expiration:{}", hashedKey, encodedValue.length, expiration); + try (Jedis jedis = jedisPool.getResource()) { + jedis.setex(hashedKey.getBytes(Charsets.UTF_8), expiration, encodedValue); + } catch (Exception e) { + errorCount.incrementAndGet(); + logger.error("JedisPoolClient put error, ", e); + } + } + + @Override + public byte[] internalGet(String key) { + logger.debug("JedisPoolClient internalGet, key:{}", key); + try (Jedis jedis = jedisPool.getResource()) { + return jedis.get(key.getBytes(Charsets.UTF_8)); + } catch (Exception e) { + errorCount.incrementAndGet(); + logger.error("JedisPoolClient Get error", e); + } + return null; + } + + @Override + public void internalDel(String key) { + logger.debug("JedisPoolClient internalDel, key:{}", key); + try (Jedis jedis = jedisPool.getResource()) { + jedis.del(key); + } catch (Exception e) { + errorCount.incrementAndGet(); + logger.error("JedisPoolClient del error, ", e); + } + } + + @Override + public boolean ping() { + try (Jedis jedis = jedisPool.getResource()) { + jedis.ping(); + return true; + } catch (Exception e) { + errorCount.incrementAndGet(); + logger.error("JedisPoolClient isConnected error, ", e); + return false; + } + } + +} diff --git a/cache/src/test/java/org/apache/kylin/cache/redis/JedisPoolClientTest.java b/cache/src/test/java/org/apache/kylin/cache/redis/JedisPoolClientTest.java new file mode 100644 index 000000000..8b22c656e --- /dev/null +++ b/cache/src/test/java/org/apache/kylin/cache/redis/JedisPoolClientTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.cache.redis; + +import org.apache.kylin.cache.cachemanager.CacheConstants; +import org.apache.kylin.cache.cachemanager.RedisManager; +import org.apache.kylin.cache.redis.jedis.JedisPoolClient; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.shaded.com.google.common.base.Charsets; +import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import java.util.Map; +import static org.mockito.Mockito.when; + +public class JedisPoolClientTest extends LocalFileMetadataTestCase { + + private Map keyValueMap; + + private RedisManager.RedisCacheAdaptor redisCacheAdaptor; + + + @Before + public void setUp() { + this.createTestMetadata(); + + keyValueMap = Maps.newHashMap(); + keyValueMap.put("sql1", "value1"); + keyValueMap.put("sql11", "value11"); + + RedisConfig redisConfig = new RedisConfig(); + JedisPoolClient jedisPoolClient = new JedisPoolClient(redisConfig); + JedisPoolClient clientSpy = Mockito.spy(jedisPoolClient); + redisCacheAdaptor = new RedisManager.RedisCacheAdaptor(clientSpy, CacheConstants.QUERY_CACHE); + + for (String key : keyValueMap.keySet()) { + String keyS = jedisPoolClient.serializeKey(key); + String hashedKey = jedisPoolClient.computeKeyHash(keyS); + String value = keyValueMap.get(key); + byte[] valueE = jedisPoolClient.encodeValue(keyS.getBytes(Charsets.UTF_8), jedisPoolClient.serializeValue(value)); + when(clientSpy.internalGet(hashedKey)).thenReturn(valueE); + } + + } + + @After + public void after() { + this.cleanupTestMetadata(); + } + + @Test + public void testGet() { + for (String key : keyValueMap.keySet()) { + Assert.assertEquals("The value should not change", keyValueMap.get(key), redisCacheAdaptor.get(key).get()); + } + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index a7be37a0f..a60908101 100644 --- a/pom.xml +++ b/pom.xml @@ -1133,6 +1133,11 @@ netty 3.9.9.Final + + redis.clients + jedis + 2.10.0 + diff --git a/server/src/main/resources/applicationContext.xml b/server/src/main/resources/applicationContext.xml index be13a89f5..8e525fdaf 100644 --- a/server/src/main/resources/applicationContext.xml +++ b/server/src/main/resources/applicationContext.xml @@ -118,6 +118,24 @@ --> + + +