From f9334eb148c034031979d557996f66d560fc787b Mon Sep 17 00:00:00 2001 From: sunyerui Date: Mon, 21 Nov 2016 21:26:34 +0800 Subject: [PATCH] KYLIN-2192 More Robust Global Dictionary --- .../org/apache/kylin/common/KylinConfigBase.java | 8 + .../org/apache/kylin/common/util/Dictionary.java | 2 +- .../model/validation/rule/DictionaryRuleTest.java | 2 +- .../apache/kylin/dict/AppendTrieDictionary.java | 285 ++++++++++++++---- .../kylin/dict/AppendTrieDictionaryChecker.java | 102 +++++++ .../java/org/apache/kylin/dict/CachedTreeMap.java | 260 ++++++++++------- .../apache/kylin/dict/GlobalDictionaryBuilder.java | 32 +-- .../kylin/dict/AppendTrieDictionaryTest.java | 150 ++++++++-- .../org/apache/kylin/dict/CachedTreeMapTest.java | 320 +++++++++++++-------- .../kylin/measure/bitmap/BitmapCounterTest.java | 6 +- ...test_kylin_cube_without_slr_left_join_desc.json | 8 +- .../query/sql_distinct_precisely/query00.sql | 2 +- .../query/sql_distinct_precisely/query01.sql | 2 +- .../query/sql_distinct_precisely/query02.sql | 2 +- .../query/sql_distinct_precisely/query03.sql | 3 +- .../query/sql_distinct_precisely/query04.sql | 3 +- .../query/sql_distinct_precisely/query05.sql | 2 +- .../query/sql_distinct_precisely/query06.sql | 2 +- .../query/sql_distinct_precisely/query07.sql | 2 +- 19 files changed, 847 insertions(+), 346 deletions(-) create mode 100644 core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index c7dd8a8..e53ad2e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -834,6 +834,14 @@ abstract public class KylinConfigBase implements Serializable { setProperty("kylin.dict.append.cache.size", String.valueOf(cacheSize)); } + public int getAppendDictMaxVersions() { + return Integer.parseInt(getOptional("kylin.dict.append.max.versions", "3")); + } + + public int getAppendDictVersionTTL() { + return Integer.parseInt(getOptional("kylin.dict.append.version.ttl", "259200000")); + } + @Deprecated public String getCreateFlatHiveTableMethod() { return getOptional("kylin.hive.create.flat.table.method", "1"); diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java index 0fb299c..a8dff71 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java @@ -158,7 +158,7 @@ abstract public class Dictionary implements Serializable { return nullId(); else { int id = getIdFromValueBytesImpl(value, offset, len, roundingFlag); - if (id < 0) + if (isNullId(id)) throw new IllegalArgumentException("Value '" + Bytes.toString(value, offset, len) + "' (" + Bytes.toStringBinary(value, offset, len) + ") not exists!"); return id; } diff --git a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java index 9b37507..64fc54d 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java @@ -72,7 +72,7 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase { @Test public void testBadDesc() throws IOException { - testDictionaryDesc("Column EDW.TEST_SITES.SITE_NAME has inconsistent builders " + "FakeBuilderClass and org.apache.kylin.dict.GlobalDictionaryBuilder", DictionaryDesc.create("SITE_NAME", null, "FakeBuilderClass")); + testDictionaryDesc("Column DEFAULT.TEST_KYLIN_FACT.PRICE has inconsistent builders " + "FakeBuilderClass and org.apache.kylin.dict.GlobalDictionaryBuilder", DictionaryDesc.create("PRICE", null, "FakeBuilderClass")); } @Test diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java index 14980bf..84060a7 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java @@ -31,10 +31,13 @@ import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.LinkedList; import java.util.List; +import java.util.NavigableSet; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -49,6 +52,8 @@ import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.MetadataManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,16 +62,16 @@ import org.slf4j.LoggerFactory; * int IDs, used for global dictionary. * * Trie data is split into sub trees, called {@link DictSlice}, and stored in a {@link CachedTreeMap} with a configurable cache size. - * + * * With Trie the memory footprint of the mapping is kinda minimized at the cost * CPU, if compared to HashMap of ID Arrays. Performance test shows Trie is * roughly 10 times slower, so there's a cache layer overlays on top of Trie and * gracefully fall back to Trie using a weak reference. - * + * * The implementation is NOT thread-safe for now. * * TODO making it thread-safe - * + * * @author sunyerui */ @SuppressWarnings({ "rawtypes", "unchecked", "serial" }) @@ -87,7 +92,7 @@ public class AppendTrieDictionary extends Dictionary { transient private int nValues; transient private BytesConverter bytesConverter; - private TreeMap dictSliceMap; + volatile private TreeMap dictSliceMap; transient private boolean enableValueCache = true; transient private SoftReference valueToIdCache; @@ -99,17 +104,23 @@ public class AppendTrieDictionary extends Dictionary { } } - public void update(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, CachedTreeMap dictMap) throws IOException { + public void initParams(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter) throws IOException { this.baseDir = baseDir; this.baseId = baseId; this.maxId = maxId; this.maxValueLength = maxValueLength; this.nValues = nValues; this.bytesConverter = bytesConverter; + } + public void initDictSliceMap(CachedTreeMap dictMap) throws IOException { int cacheSize = KylinConfig.getInstanceFromEnv().getAppendDictCacheSize(); - dictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build(); - ((CachedTreeMap)dictSliceMap).loadEntry(dictMap); + int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions(); + long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL(); + CachedTreeMap newDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir) + .immutable(true).maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build(); + newDictSliceMap.loadEntry(dictMap); + this.dictSliceMap = newDictSliceMap; } public byte[] writeDictMap() throws IOException { @@ -123,6 +134,13 @@ public class AppendTrieDictionary extends Dictionary { return dictMapBytes; } + // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state + public static void checkValidId(int id) { + if (id == 0 || id == -1) { + throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294"); + } + } + public static class DictSliceKey implements WritableComparable { byte[] key; @@ -181,7 +199,8 @@ public class AppendTrieDictionary extends Dictionary { transient private int nValues; transient private int sizeOfId; - transient private int childOffsetMask; + // mask MUST be long, since childOffset maybe 5 bytes at most + transient private long childOffsetMask; transient private int firstByteOffset; private void init(byte[] trieBytes) { @@ -197,7 +216,7 @@ public class AppendTrieDictionary extends Dictionary { this.sizeChildOffset = headIn.read(); this.sizeOfId = headIn.read(); - this.childOffsetMask = ~((BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE) << ((sizeChildOffset - 1) * 8)); + this.childOffsetMask = ~(((long)(BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE)) << ((sizeChildOffset - 1) * 8)); this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte } catch (Exception e) { if (e instanceof RuntimeException) @@ -216,7 +235,7 @@ public class AppendTrieDictionary extends Dictionary { if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) { break; } - nodeOffset = headSize + (BytesUtil.readUnsigned(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask); + nodeOffset = headSize + (int)(BytesUtil.readLong(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask); if (nodeOffset == headSize) { break; } @@ -258,7 +277,7 @@ public class AppendTrieDictionary extends Dictionary { } // find a child to continue - int c = headSize + (BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask); + int c = headSize + (int)(BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask); if (c == headSize) // has no children return -1; byte inpByte = inp[o]; @@ -297,7 +316,7 @@ public class AppendTrieDictionary extends Dictionary { DictNode root = null; while (true) { int p = n + firstByteOffset; - int childOffset = BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask; + int childOffset = (int)(BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask); int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1); boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE); @@ -329,6 +348,53 @@ public class AppendTrieDictionary extends Dictionary { return root; } + public boolean doCheck() { + int offset = headSize; + HashSet parentSet = new HashSet<>(); + boolean lastChild = false; + + while (offset < trieBytes.length) { + if (lastChild) { + boolean contained = parentSet.remove(offset - headSize); + // Can't find parent, the data is corrupted + if (!contained) { + return false; + } + lastChild = false; + } + int p = offset + firstByteOffset; + int childOffset = (int)(BytesUtil.readLong(trieBytes, offset, sizeChildOffset) & childOffsetMask); + int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1); + boolean isEndOfValue = checkFlag(offset, BIT_IS_END_OF_VALUE); + + // Copy value overflow, the data is corrupted + if (trieBytes.length < p + parLen) { + return false; + } + + // Check id is fine + if (isEndOfValue) { + BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId); + } + + // Record it if has children + if (childOffset != 0) { + parentSet.add(childOffset); + } + + // brothers done, move to next parent + if (checkFlag(offset, BIT_IS_LAST_CHILD)) { + lastChild = true; + } + + // move to next node + offset += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0); + } + + // ParentMap is empty, meaning all nodes has parent, the data is correct + return parentSet.isEmpty(); + } + public void write(DataOutput out) throws IOException { out.write(trieBytes); } @@ -341,7 +407,7 @@ public class AppendTrieDictionary extends Dictionary { throw new IllegalArgumentException("Wrong file type (magic does not match)"); DataInputStream headIn = new DataInputStream(// - new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I)); + new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I)); int headSize = headIn.readShort(); int bodyLen = headIn.readInt(); headIn.close(); @@ -398,6 +464,9 @@ public class AppendTrieDictionary extends Dictionary { this.id = o.id; this.isEndOfValue = o.isEndOfValue; this.children = o.children; + for (DictNode child : o.children) { + child.parent = this; + } this.nValuesBeneath = o.nValuesBeneath; this.parent = o.parent; this.childrenCount = o.childrenCount; @@ -602,7 +671,8 @@ public class AppendTrieDictionary extends Dictionary { // nValueBytes if (n.part.length > 255) - throw new RuntimeException(); + throw new RuntimeException("Value length is " + n.part.length + + " and larger than 255: " + Bytes.toStringBinary(n.part)); BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1); o++; @@ -611,7 +681,7 @@ public class AppendTrieDictionary extends Dictionary { o += n.part.length; if (n.isEndOfValue) { - assert n.id > 0; + checkValidId(n.id); BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId); o += sizeId; } @@ -715,12 +785,13 @@ public class AppendTrieDictionary extends Dictionary { s.mbpn_sizeId = 4; s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId; s.mbpn_sizeNoValueBytes = 1; - s.mbpn_sizeChildOffset = 4; + s.mbpn_sizeChildOffset = 5; s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset); while (true) { // minimize the offset size to match the footprint int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1); // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag - if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) { + // expand t to long before *4, avoiding exceed Integer.MAX_VALUE + if (BytesUtil.sizeForValue((long)t * 4) <= s.mbpn_sizeChildOffset - 1) { s.mbpn_sizeChildOffset--; s.mbpn_footprint = t; } else @@ -760,31 +831,97 @@ public class AppendTrieDictionary extends Dictionary { } public static class Builder { - private String baseDir; + private static ConcurrentHashMap> builderInstanceAndCountMap = new ConcurrentHashMap(); + + public static Builder getInstance(String resourcePath) throws IOException { + return getInstance(resourcePath, null); + } + + public synchronized static Builder getInstance(String resourcePath, AppendTrieDictionary dict) throws IOException { + Pair entry = builderInstanceAndCountMap.get(resourcePath); + if (entry == null) { + entry = new Pair<>(0, createNewBuilder(resourcePath, dict)); + builderInstanceAndCountMap.put(resourcePath, entry); + } + entry.setFirst(entry.getFirst() + 1); + return entry.getSecond(); + } + + // return true if entry still in map + private synchronized static boolean releaseInstance(String resourcePath) { + Pair entry = builderInstanceAndCountMap.get(resourcePath); + if (entry != null) { + entry.setFirst(entry.getFirst() - 1); + if (entry.getFirst() <= 0) { + builderInstanceAndCountMap.remove(resourcePath); + return false; + } + return true; + } + return false; + } + + public static Builder createNewBuilder(String resourcePath, AppendTrieDictionary existDict) throws IOException { + String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + resourcePath + "/"; + + AppendTrieDictionary dictToUse = existDict; + if (dictToUse == null) { + // Try to load the existing dict from cache, making sure there's only the same one object in memory + NavigableSet dicts = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().listResources(resourcePath); + ArrayList appendDicts = new ArrayList<>(); + if (dicts != null && !dicts.isEmpty()) { + for (String dict : dicts) { + DictionaryInfo info = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().getResource(dict, DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER); + if (info.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) { + appendDicts.add(dict); + } + } + } + if (appendDicts.isEmpty()) { + dictToUse = null; + } else if (appendDicts.size() == 1) { + dictToUse = (AppendTrieDictionary) DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0)); + } else { + throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", resourcePath, appendDicts.size())); + } + } + + AppendTrieDictionary.Builder builder; + if (dictToUse == null) { + logger.info("GlobalDict {} is empty, create new one", resourcePath); + builder = new Builder<>(resourcePath, null, dictDir, 0, 0, 0, new StringBytesConverter(), null); + } else { + logger.info("GlobalDict {} exist, append value", dictToUse); + builder = new Builder<>(resourcePath, dictToUse, dictToUse.baseDir, dictToUse.maxId, dictToUse.maxValueLength, + dictToUse.nValues, dictToUse.bytesConverter, dictToUse.writeDictMap()); + } + + return builder; + } + + private final String resourcePath; + private final String baseDir; private int maxId; private int maxValueLength; private int nValues; - private BytesConverter bytesConverter; + private final BytesConverter bytesConverter; - private AppendTrieDictionary dict; + private final AppendTrieDictionary dict; - private TreeMap mutableDictSliceMap; - private static int MAX_ENTRY_IN_SLICE = 10_000_000; + private final TreeMap mutableDictSliceMap; + private int MAX_ENTRY_IN_SLICE = 10_000_000; private static final double MAX_ENTRY_OVERHEAD_FACTOR = 1.0; private int processedCount = 0; - public static Builder create(String baseDir) throws IOException { - return new Builder<>(null, baseDir, 0, 0, 0, new StringBytesConverter(), null); - } - - public static Builder create(AppendTrieDictionary dict) throws IOException { - return new Builder<>(dict, dict.baseDir, dict.maxId, dict.maxValueLength, dict.nValues, dict.bytesConverter, dict.writeDictMap()); - } - // Constructor for a new Dict - private Builder(AppendTrieDictionary dict, String baseDir, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, byte[] dictMapBytes) throws IOException { - this.dict = dict; + private Builder(String resourcePath, AppendTrieDictionary dict, String baseDir, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, byte[] dictMapBytes) throws IOException { + this.resourcePath = resourcePath; + if (dict == null) { + this.dict = new AppendTrieDictionary(); + } else { + this.dict = dict; + } this.baseDir = baseDir; this.maxId = maxId; this.maxValueLength = maxValueLength; @@ -793,8 +930,11 @@ public class AppendTrieDictionary extends Dictionary { MAX_ENTRY_IN_SLICE = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize(); int cacheSize = KylinConfig.getInstanceFromEnv().getAppendDictCacheSize(); + int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions(); + long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL(); // create a new cached map with baseDir - mutableDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).persistent(true).immutable(false).build(); + mutableDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir) + .maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).immutable(false).build(); if (dictMapBytes != null) { ((Writable) mutableDictSliceMap).readFields(new DataInputStream(new ByteArrayInputStream(dictMapBytes))); } @@ -804,7 +944,7 @@ public class AppendTrieDictionary extends Dictionary { addValue(bytesConverter.convertToBytes(value)); } - public void addValue(byte[] value) { + private synchronized void addValue(byte[] value) { if (++processedCount % 1_000_000 == 0) { logger.debug("add value count " + processedCount); } @@ -859,15 +999,41 @@ public class AppendTrieDictionary extends Dictionary { private int createNextId() { int id = ++maxId; - if (maxId < 0) { - throw new IllegalArgumentException("AppendTrieDictionary Id overflow Integer.MAX_VALUE"); - } + checkValidId(id); nValues++; return id; } + // Only used for test + public void setMaxId(int id) { + this.maxId = id; + } + + // When add a new node, the value part maybe over 255 bytes, need split it into a sub tree + private DictNode addNodeMaybeOverflow(byte[] value, int start, int end) { + DictNode head = null; + DictNode current = null; + for (; start + 255 < end; start += 255) { + DictNode c = new DictNode(BytesUtil.subarray(value, start, start + 255), false); + if (head == null) { + head = c; + current = c; + } else { + current.addChild(c); + current = c; + } + } + DictNode last = new DictNode(BytesUtil.subarray(value, start, end), true); + last.id = createNextId(); + if (head == null) { + head = last; + } else { + current.addChild(last); + } + return head; + } + private void addValueR(DictNode node, byte[] value, int start) { - assert value.length - start <= 255 : "value bytes overflow than 255"; // match the value part of current node int i = 0, j = start; int n = node.part.length, nn = value.length; @@ -903,8 +1069,7 @@ public class AppendTrieDictionary extends Dictionary { if (i < n) { DictNode c1 = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); c1.id = node.id; - DictNode c2 = new DictNode(BytesUtil.subarray(value, j, nn), true); - c2.id = createNextId(); + DictNode c2 = addNodeMaybeOverflow(value, j, nn); node.reset(BytesUtil.subarray(node.part, 0, i), false); if (comp < 0) { node.addChild(c1); @@ -940,18 +1105,17 @@ public class AppendTrieDictionary extends Dictionary { addValueR(node.children.get(mid), value, j); } else { // otherwise, make the value a new child - DictNode c = new DictNode(BytesUtil.subarray(value, j, nn), true); - c.id = createNextId(); + DictNode c = addNodeMaybeOverflow(value, j, nn); node.addChild(comp <= 0 ? mid : mid + 1, c); } } - public AppendTrieDictionary build(int baseId) throws IOException { - if (dict == null) { - dict = new AppendTrieDictionary(); - } - dict.update(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter, (CachedTreeMap)mutableDictSliceMap); - dict.flushIndex((CachedTreeMap) mutableDictSliceMap); + public synchronized AppendTrieDictionary build(int baseId) throws IOException { + boolean keepAppend = releaseInstance(resourcePath); + CachedTreeMap dictSliceMap = (CachedTreeMap)mutableDictSliceMap; + dict.initParams(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter); + dict.flushIndex(dictSliceMap, keepAppend); + dict.initDictSliceMap(dictSliceMap); return dict; } @@ -970,8 +1134,6 @@ public class AppendTrieDictionary extends Dictionary { } DictSlice slice = dictSliceMap.get(sliceKey); int id = slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag); - if (id < 0) - logger.error("Not a valid value: " + bytesConverter.convertFromBytes(value, offset, len)); return id; } @@ -1031,25 +1193,24 @@ public class AppendTrieDictionary extends Dictionary { throw new UnsupportedOperationException("AppendTrieDictionary can't retrive value from id"); } - public void flushIndex(CachedTreeMap dictSliceMap) throws IOException { - Path filePath = new Path(dictSliceMap.getCurrentDir() + "/.index"); - Configuration conf = new Configuration(); - try (FSDataOutputStream indexOut = (FileSystem.get(filePath.toUri(), conf)).create(filePath, true, 8 * 1024 * 1024, (short) 5, 8 * 1024 * 1024 * 8)) { + public void flushIndex(CachedTreeMap dictSliceMap, boolean keepAppend) throws IOException { + try (FSDataOutputStream indexOut = dictSliceMap.openIndexOutput()) { indexOut.writeInt(baseId); indexOut.writeInt(maxId); indexOut.writeInt(maxValueLength); indexOut.writeInt(nValues); indexOut.writeUTF(bytesConverter.getClass().getName()); dictSliceMap.write(indexOut); + dictSliceMap.commit(keepAppend); } - dictSliceMap.commit(false); } @Override public AppendTrieDictionary copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException { Configuration conf = new Configuration(); AppendTrieDictionary newDict = new AppendTrieDictionary(); - newDict.update(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, bytesConverter, (CachedTreeMap)dictSliceMap); + newDict.initParams(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, bytesConverter); + newDict.initDictSliceMap((CachedTreeMap)dictSliceMap); logger.info("Copy AppendDict from {} to {}", this.baseDir, newDict.baseDir); Path srcPath = new Path(this.baseDir); Path dstPath = new Path(newDict.baseDir); @@ -1071,9 +1232,8 @@ public class AppendTrieDictionary extends Dictionary { @Override public void readFields(DataInput in) throws IOException { String baseDir = in.readUTF(); - Path filePath = new Path(baseDir + "/.index"); Configuration conf = new Configuration(); - try (FSDataInputStream input = (FileSystem.get(filePath.toUri(), conf)).open(filePath, 8 * 1024 * 1024)) { + try (FSDataInputStream input = CachedTreeMap.openLatestIndexInput(conf, baseDir)) { int baseId = input.readInt(); int maxId = input.readInt(); int maxValueLength = input.readInt(); @@ -1087,10 +1247,13 @@ public class AppendTrieDictionary extends Dictionary { throw new IOException(e); } } + initParams(baseDir, baseId, maxId, maxValueLength, nValues, converter); + + // Create instance for deserialize data, and update to map in dict CachedTreeMap dictMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder() - .baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build(); + .baseDir(baseDir).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build(); dictMap.readFields(input); - update(baseDir, baseId, maxId, maxValueLength, nValues, converter, dictMap); + initDictSliceMap(dictMap); } } @@ -1120,4 +1283,6 @@ public class AppendTrieDictionary extends Dictionary { public boolean contains(Dictionary other) { return false; } + } + diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java new file mode 100644 index 0000000..f231275 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.dict; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Created by sunyerui on 16/11/15. + */ +public class AppendTrieDictionaryChecker { + + public boolean runChecker(String baseDir) throws IOException { + Configuration conf = new Configuration(); + Path basePath = new Path(baseDir); + FileSystem fs = FileSystem.get(basePath.toUri(), conf); + List sliceList = new ArrayList<>(); + List corruptedSliceList = new ArrayList<>(); + listDictSlicePath(fs, fs.getFileStatus(basePath), sliceList); + + for (Path path : sliceList) { + if (!doCheck(fs, path)) { + System.out.println("AppendDict Slice " + path + " corrupted"); + corruptedSliceList.add(path); + } else { + System.out.println("AppendDict Slice " + path + " is right"); + } + } + + if (corruptedSliceList.isEmpty()) { + System.out.println("ALL AppendDict Slices is right"); + return true; + } else { + System.out.println("Some AppendDict Slice(s) corrupted: "); + for (Path path : corruptedSliceList) { + System.out.println(path.toString()); + } + return false; + } + } + + public void listDictSlicePath(FileSystem fs, FileStatus path, List list) throws IOException { + if (path.isDirectory()) { + for (FileStatus status : fs.listStatus(path.getPath())) { + listDictSlicePath(fs, status, list); + } + } else { + if (path.getPath().getName().startsWith(CachedTreeMap.CACHED_PREFIX)) { + list.add(path.getPath()); + } + } + } + + public boolean doCheck(FileSystem fs, Path filePath) { + try (FSDataInputStream input = fs.open(filePath, CachedTreeMap.BUFFER_SIZE)) { + AppendTrieDictionary.DictSlice slice = new AppendTrieDictionary.DictSlice(); + slice.readFields(input); + return slice.doCheck(); + } catch (Exception e) { + return false; + } catch (Error e) { + return false; + } + } + + public static void main(String[] args) throws IOException { + String path = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict/"; + if (args.length > 0) { + path = args[0]; + } + System.out.println("Recursive Check AppendTrieDictionary Slices in path " + path); + AppendTrieDictionaryChecker checker = new AppendTrieDictionaryChecker(); + if (checker.runChecker(path)) { + System.exit(0); + } else { + System.exit(-1); + } + } +} diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java index 1ea3c1c..6acf764 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java @@ -31,9 +31,11 @@ import java.util.concurrent.ExecutionException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.slf4j.Logger; @@ -56,25 +58,29 @@ public class CachedTreeMap ext private final Class valueClazz; transient volatile Collection values; private final LoadingCache valueCache; - private final TreeSet fileList; private final Configuration conf; - private final String baseDir; - private final String tmpDir; + private final Path baseDir; + private final Path versionDir; + private final Path workingDir; private final FileSystem fs; - private final boolean persistent; private final boolean immutable; - private long writeValueTime = 0; - private long readValueTime = 0; + private final int maxVersions; + private final long versionTTL; + private boolean keepAppend; - private static final int BUFFER_SIZE = 8 * 1024 * 1024; + public static final int BUFFER_SIZE = 8 * 1024 * 1024; + + public static final String CACHED_PREFIX = "cached_"; + public static final String VERSION_PREFIX = "version_"; public static class CachedTreeMapBuilder { private Class keyClazz; private Class valueClazz; private int maxCount = 8; private String baseDir; - private boolean persistent; private boolean immutable; + private int maxVersions; + private long versionTTL; public static CachedTreeMapBuilder newBuilder() { return new CachedTreeMapBuilder(); @@ -103,13 +109,18 @@ public class CachedTreeMap ext return this; } - public CachedTreeMapBuilder persistent(boolean persistent) { - this.persistent = persistent; + public CachedTreeMapBuilder immutable(boolean immutable) { + this.immutable = immutable; return this; } - public CachedTreeMapBuilder immutable(boolean immutable) { - this.immutable = immutable; + public CachedTreeMapBuilder maxVersions(int maxVersions) { + this.maxVersions = maxVersions; + return this; + } + + public CachedTreeMapBuilder versionTTL(long versionTTL) { + this.versionTTL = versionTTL; return this; } @@ -120,26 +131,38 @@ public class CachedTreeMap ext if (keyClazz == null || valueClazz == null) { throw new RuntimeException("CachedTreeMap need key and value clazz to serialize data"); } - CachedTreeMap map = new CachedTreeMap(maxCount, keyClazz, valueClazz, baseDir, persistent, immutable); + CachedTreeMap map = new CachedTreeMap(maxCount, keyClazz, valueClazz, baseDir, immutable, maxVersions, versionTTL); return map; } } - private CachedTreeMap(int maxCount, Class keyClazz, Class valueClazz, String baseDir, boolean persistent, boolean immutable) throws IOException { + private CachedTreeMap(int maxCount, Class keyClazz, Class valueClazz, String basePath, + boolean immutable, int maxVersions, long versionTTL) throws IOException { super(); this.keyClazz = keyClazz; this.valueClazz = valueClazz; - this.fileList = new TreeSet<>(); + this.immutable = immutable; + this.keepAppend = true; + this.maxVersions = maxVersions; + this.versionTTL = versionTTL; this.conf = new Configuration(); - if (baseDir.endsWith("/")) { - this.baseDir = baseDir.substring(0, baseDir.length()-1); - } else { - this.baseDir = baseDir; + if (basePath.endsWith("/")) { + basePath = basePath.substring(0, basePath.length()-1); + } + this.baseDir = new Path(basePath); + this.fs = FileSystem.get(baseDir.toUri(), conf); + if (!fs.exists(baseDir)) { + fs.mkdirs(baseDir); + } + this.versionDir = getLatestVersion(conf, fs, baseDir); + this.workingDir = new Path(baseDir, "working"); + if (!this.immutable) { + // For mutable map, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt + if (fs.exists(workingDir)) { + fs.delete(workingDir, true); + } + FileUtil.copy(fs, versionDir, fs, workingDir, false, true, conf); } - this.tmpDir = this.baseDir + ".tmp"; - this.fs = FileSystem.get(new Path(baseDir).toUri(), conf); - this.persistent = persistent; - this.immutable = immutable; CacheBuilder builder = CacheBuilder.newBuilder().removalListener(new RemovalListener() { @Override public void onRemoval(RemovalNotification notification) { @@ -152,24 +175,14 @@ public class CachedTreeMap ext deleteValue(notification.getKey()); break; default: - throw new RuntimeException("unexpected evict reason " + notification.getCause()); } } }); - // For immutable values, load all values as much as possible, and evict by soft reference to free memory when gc if (this.immutable) { + // For immutable values, load all values as much as possible, and evict by soft reference to free memory when gc builder.softValues(); } else { builder.maximumSize(maxCount); - // For mutable map, copy all data into tmp and modify on tmp data, avoiding suddenly server crash made data corrupt - if (fs.exists(new Path(tmpDir))) { - fs.delete(new Path(tmpDir), true); - } - if (fs.exists(new Path(this.baseDir))) { - FileUtil.copy(fs, new Path(this.baseDir), fs, new Path(tmpDir), false, true, conf); - } else { - fs.mkdirs(new Path(this.baseDir)); - } } this.valueCache = builder.build(new CacheLoader() { @Override @@ -182,38 +195,108 @@ public class CachedTreeMap ext } private String generateFileName(K key) { - String file = (immutable ? baseDir : tmpDir) + "/cached_" + key.toString(); + String file = getCurrentDir() + "/" + CACHED_PREFIX + key.toString(); return file; } - public String getCurrentDir() { - return immutable ? baseDir : tmpDir; + private String getCurrentDir() { + return immutable ? versionDir.toString() : workingDir.toString(); } - public void commit(boolean stillMutable) throws IOException { - assert !immutable : "Only support commit method with immutable false"; + private static String[] listAllVersions(FileSystem fs, Path baseDir) throws IOException { + FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() { + @Override + public boolean accept(Path path) { + if (path.getName().startsWith(VERSION_PREFIX)) { + return true; + } + return false; + } + }); + TreeSet versions = new TreeSet<>(); + for (FileStatus status : fileStatus) { + versions.add(status.getPath().toString()); + } + return versions.toArray(new String[versions.size()]); + } - Path basePath = new Path(baseDir); - Path backupPath = new Path(baseDir+".bak"); - Path tmpPath = new Path(tmpDir); - try { - fs.rename(basePath, backupPath); - } catch (IOException e) { - logger.info("CachedTreeMap commit backup basedir failed, " + e, e); - throw e; + // only for test + public String getLatestVersion() throws IOException { + return getLatestVersion(conf, fs, baseDir).toUri().getPath(); + } + + private static Path getLatestVersion(Configuration conf, FileSystem fs, Path baseDir) throws IOException { + String[] versions = listAllVersions(fs, baseDir); + if (versions.length > 0) { + return new Path(versions[versions.length - 1]); + } else { + // Old format, directly use base dir, convert to new format + Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis()); + Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis()); + Path indexFile = new Path(baseDir, ".index"); + FileStatus[] cachedFiles; + try { + cachedFiles = fs.listStatus(baseDir, new PathFilter() { + @Override + public boolean accept(Path path) { + if (path.getName().startsWith(CACHED_PREFIX)) { + return true; + } + return false; + } + }); + fs.mkdirs(tmpNewVersionDir); + if (fs.exists(indexFile) && cachedFiles.length > 0) { + FileUtil.copy(fs, indexFile, fs, tmpNewVersionDir, false, true, conf); + for (FileStatus file : cachedFiles) { + FileUtil.copy(fs, file.getPath(), fs, tmpNewVersionDir, false, true, conf); + } + } + fs.rename(tmpNewVersionDir, newVersionDir); + if (fs.exists(indexFile) && cachedFiles.length > 0) { + fs.delete(indexFile, true); + for (FileStatus file : cachedFiles) { + fs.delete(file.getPath(), true); + } + } + } finally { + if (fs.exists(tmpNewVersionDir)) { + fs.delete(tmpNewVersionDir, true); + } + } + return newVersionDir; } + } - try { - if (stillMutable) { - FileUtil.copy(fs, tmpPath, fs, basePath, false, true, conf); - } else { - fs.rename(tmpPath, basePath); + public void commit(boolean keepAppend) throws IOException { + assert this.keepAppend & !immutable : "Only support commit method with immutable false and keepAppend true"; + + Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis()); + if (keepAppend) { + // Copy to tmp dir, and rename to new version, make sure it's complete when be visible + Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis()); + try { + FileUtil.copy(fs, workingDir, fs, tmpNewVersionDir, false, true, conf); + fs.rename(tmpNewVersionDir, newVersionDir); + } finally { + if (fs.exists(tmpNewVersionDir)) { + fs.delete(tmpNewVersionDir, true); + } + } + } else { + fs.rename(workingDir, newVersionDir); + } + this.keepAppend = keepAppend; + + // Check versions count, delete expired versions + String[] versions = listAllVersions(fs, baseDir); + long timestamp = System.currentTimeMillis(); + for (int i = 0; i < versions.length - maxVersions; i++) { + String versionString = versions[i].substring(versions[i].lastIndexOf(VERSION_PREFIX) + VERSION_PREFIX.length()); + long version = Long.parseLong(versionString); + if (version + versionTTL < timestamp) { + fs.delete(new Path(versions[i]), true); } - fs.delete(backupPath, true); - } catch (IOException e) { - fs.rename(backupPath, basePath); - logger.info("CachedTreeMap commit move/copy tmpdir failed, " + e, e); - throw e; } } @@ -227,25 +310,17 @@ public class CachedTreeMap ext if (immutable) { return; } - long t0 = System.currentTimeMillis(); String fileName = generateFileName(key); Path filePath = new Path(fileName); try (FSDataOutputStream out = fs.create(filePath, true, BUFFER_SIZE, (short) 5, BUFFER_SIZE * 8)) { value.write(out); - if (!persistent) { - fs.deleteOnExit(filePath); - } } catch (Exception e) { logger.error(String.format("write value into %s exception: %s", fileName, e), e); throw new RuntimeException(e.getCause()); - } finally { - fileList.add(fileName); - writeValueTime += System.currentTimeMillis() - t0; } } private V readValue(K key) throws Exception { - long t0 = System.currentTimeMillis(); String fileName = generateFileName(key); Path filePath = new Path(fileName); try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) { @@ -255,13 +330,11 @@ public class CachedTreeMap ext } catch (Exception e) { logger.error(String.format("read value from %s exception: %s", fileName, e), e); return null; - } finally { - readValueTime += System.currentTimeMillis() - t0; } } private void deleteValue(K key) { - if (persistent && immutable) { + if (immutable) { return; } String fileName = generateFileName(key); @@ -272,14 +345,12 @@ public class CachedTreeMap ext } } catch (Exception e) { logger.error(String.format("delete value file %s exception: %s", fileName, e), e); - } finally { - fileList.remove(fileName); } } @Override public V put(K key, V value) { - assert !immutable : "Only support put method with immutable false"; + assert keepAppend & !immutable : "Only support put method with immutable false and keepAppend true"; super.put(key, null); valueCache.put(key, value); return null; @@ -301,7 +372,7 @@ public class CachedTreeMap ext @Override public V remove(Object key) { - assert !immutable : "Only support remove method with immutable false"; + assert keepAppend & !immutable : "Only support remove method with immutable false keepAppend true"; super.remove(key); valueCache.invalidate(key); return null; @@ -357,15 +428,32 @@ public class CachedTreeMap ext @Override public void remove() { - assert !immutable : "Only support remove method with immutable false"; + assert keepAppend & !immutable : "Only support remove method with immutable false and keepAppend true"; keyIterator.remove(); valueCache.invalidate(currentKey); } } + public FSDataOutputStream openIndexOutput() throws IOException { + assert keepAppend & !immutable : "Only support write method with immutable false and keepAppend true"; + Path indexPath = new Path(getCurrentDir(), ".index"); + return fs.create(indexPath, true, 8 * 1024 * 1024, (short) 5, 8 * 1024 * 1024 * 8); + } + + public FSDataInputStream openIndexInput() throws IOException { + Path indexPath = new Path(getCurrentDir(), ".index"); + return fs.open(indexPath, 8 * 1024 * 1024); + } + + public static FSDataInputStream openLatestIndexInput(Configuration conf, String baseDir) throws IOException { + Path basePath = new Path(baseDir); + FileSystem fs = FileSystem.get(basePath.toUri(), conf); + Path indexPath = new Path(getLatestVersion(conf, fs, basePath), ".index"); + return fs.open(indexPath, 8 * 1024 * 1024); + } + @Override public void write(DataOutput out) throws IOException { - assert persistent : "Only support serialize with persistent true"; out.writeInt(size()); for (K key : keySet()) { key.write(out); @@ -378,7 +466,6 @@ public class CachedTreeMap ext @Override public void readFields(DataInput in) throws IOException { - assert persistent : "Only support deserialize with persistent true"; int size = in.readInt(); try { for (int i = 0; i < size; i++) { @@ -390,27 +477,4 @@ public class CachedTreeMap ext throw new IOException(e); } } - - // clean up all tmp files - @Override - public void finalize() throws Throwable { - if (persistent) { - return; - } - try { - this.clear(); - for (String file : fileList) { - try { - Path filePath = new Path(file); - fs.delete(filePath, true); - } catch (Throwable t) { - //do nothing? - } - } - } catch (Throwable t) { - //do nothing - } finally { - super.finalize(); - } - } } diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java index 7adc262..b8caf62 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java @@ -20,13 +20,8 @@ package org.apache.kylin.dict; import java.io.IOException; import java.util.ArrayList; -import java.util.NavigableSet; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.metadata.MetadataManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * GlobalDictinary based on whole cube, to ensure one value has same dict id in different segments. @@ -34,39 +29,14 @@ import org.slf4j.LoggerFactory; * Created by sunyerui on 16/5/24. */ public class GlobalDictionaryBuilder implements IDictionaryBuilder { - private static final Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class); @Override public Dictionary build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList returnSamples) throws IOException { if (dictInfo == null) { throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo"); } - String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + dictInfo.getResourceDir() + "/"; - - // Try to load the existing dict from cache, making sure there's only the same one object in memory - NavigableSet dicts = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().listResources(dictInfo.getResourceDir()); - ArrayList appendDicts = new ArrayList<>(); - if (dicts != null && !dicts.isEmpty()) { - for (String dict : dicts) { - DictionaryInfo info = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().getResource(dict, DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER); - if (info.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) { - appendDicts.add(dict); - } - } - } - - AppendTrieDictionary.Builder builder; - if (appendDicts.isEmpty()) { - logger.info("GlobalDict {} is empty, create new one", dictInfo.getResourceDir()); - builder = AppendTrieDictionary.Builder.create(dictDir); - } else if (appendDicts.size() == 1) { - logger.info("GlobalDict {} exist, append value", appendDicts.get(0)); - AppendTrieDictionary dict = (AppendTrieDictionary) DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0)); - builder = AppendTrieDictionary.Builder.create(dict); - } else { - throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", dictInfo.getResourceDir(), appendDicts.size())); - } + AppendTrieDictionary.Builder builder = AppendTrieDictionary.Builder.getInstance(dictInfo.getResourceDir()); String value; while (valueEnumerator.moveNext()) { value = valueEnumerator.current(); diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java index 4266f2a..28d8f83 100644 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java @@ -20,13 +20,15 @@ package org.apache.kylin.dict; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -35,6 +37,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Random; import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -51,6 +55,7 @@ import org.junit.Test; public class AppendTrieDictionaryTest { public static final String BASE_DIR = "/tmp/kylin_append_dict"; + public static final String RESOURCE_DIR = "/dict/append_dict_test"; @BeforeClass public static void setUp() { @@ -64,22 +69,28 @@ public class AppendTrieDictionaryTest { @AfterClass public static void tearDown() { - String workingDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(); + cleanup(); + } + +// @After + public void afterTest() { + cleanup(); + } + + public static void cleanup() { + Configuration conf = new Configuration(); + Path basePath = new Path(BASE_DIR); try { - FileSystem.get(new Path(workingDir).toUri(), new Configuration()).delete(new Path(workingDir), true); - } catch (IOException e) { - } - File tmpLocalDir = new File(BASE_DIR); - if (tmpLocalDir.exists()) { - for (File f : tmpLocalDir.listFiles()) { - f.delete(); - } - tmpLocalDir.delete(); - } + FileSystem.get(basePath.toUri(), conf).delete(basePath, true); + } catch (IOException e) {} } public static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "字典", "字典树", "字母", // non-ascii characters "", // empty + "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", + "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", + "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk", "paint", "tar", "try", // some dup }; @@ -131,8 +142,7 @@ public class AppendTrieDictionaryTest { @Ignore("need huge key set") @Test public void testHugeKeySet() throws IOException { - BytesConverter converter = new StringBytesConverter(); - AppendTrieDictionary.Builder b = AppendTrieDictionary.Builder.create(BASE_DIR); + AppendTrieDictionary.Builder b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); AppendTrieDictionary dict = null; InputStream is = new FileInputStream("src/test/resources/dict/huge_key"); @@ -162,7 +172,7 @@ public class AppendTrieDictionaryTest { } BytesConverter converter = new StringBytesConverter(); - AppendTrieDictionary.Builder b = AppendTrieDictionary.Builder.create(BASE_DIR); + AppendTrieDictionary.Builder b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); AppendTrieDictionary dict = null; TreeMap checkMap = new TreeMap<>(); int firstAppend = rnd.nextInt(strList.size() / 2); @@ -179,12 +189,14 @@ public class AppendTrieDictionaryTest { String str = strList.get(checkIndex); byte[] bytes = converter.convertToBytes(str); int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id))); checkMap.put(id, str); } // reopen dict and append - b = AppendTrieDictionary.Builder.create(dict); +// b = AppendTrieDictionary.Builder.create(dict); + b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict); for (; appendIndex < secondAppend; appendIndex++) { b.addValue(strList.get(appendIndex)); } @@ -197,6 +209,7 @@ public class AppendTrieDictionaryTest { String str = strList.get(checkIndex); byte[] bytes = converter.convertToBytes(str); int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); if (checkIndex < firstAppend) { assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); } else { @@ -207,7 +220,7 @@ public class AppendTrieDictionaryTest { } // reopen dict and append rest str - b = AppendTrieDictionary.Builder.create(dict); + b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict); for (; appendIndex < strList.size(); appendIndex++) { b.addValue(strList.get(appendIndex)); } @@ -220,6 +233,7 @@ public class AppendTrieDictionaryTest { String str = strList.get(checkIndex); byte[] bytes = converter.convertToBytes(str); int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); if (checkIndex < secondAppend) { assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); } else { @@ -240,6 +254,7 @@ public class AppendTrieDictionaryTest { for (String str : strList) { byte[] bytes = converter.convertToBytes(str); int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0); + assertNotEquals(String.format("Value %s not exist", str), -1, id); assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id)); } } @@ -260,4 +275,103 @@ public class AppendTrieDictionaryTest { throw new RuntimeException(e); } } -} \ No newline at end of file + + @Test + public void testMaxInteger() throws IOException { + AppendTrieDictionary.Builder builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); + builder.setMaxId(Integer.MAX_VALUE - 2); + builder.addValue("a"); + builder.addValue("ab"); + builder.addValue("acd"); + builder.addValue("ac"); + AppendTrieDictionary dict = builder.build(0); + assertEquals(2147483646, dict.getIdFromValueImpl("a", 0)); + assertEquals(2147483647, dict.getIdFromValueImpl("ab", 0)); + assertEquals(-2147483647, dict.getIdFromValueImpl("ac", 0)); + assertEquals(-2147483648, dict.getIdFromValueImpl("acd", 0)); + } + + @Ignore("Only occurred when value is very long (>8000 bytes)") + @Test + public void testSuperLongValue() throws IOException { + AppendTrieDictionary.Builder builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR); + String value = "a"; + for (int i = 0; i < 10000; i++) { + value += "a"; + try { + builder.addValue(value); + } catch (StackOverflowError e) { + System.out.println("\nstack overflow " + i); + throw e; + } + } + AppendTrieDictionary dictionary = builder.build(0); + dictionary.getMaxId(); + } + + private static class SharedBuilderThread extends Thread { + CountDownLatch startLatch; + CountDownLatch finishLatch; + String resourcePath; + String prefix; + int count; + + SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String resourcePath, String prefix, int count) { + this.startLatch = startLatch; + this.finishLatch = finishLatch; + this.resourcePath = resourcePath; + this.prefix = prefix; + this.count = count; + } + + @Override + public void run() { + try { + AppendTrieDictionary.Builder builder = AppendTrieDictionary.Builder.getInstance(resourcePath); + startLatch.countDown(); + for (int i = 0; i < count; i++) { + builder.addValue(prefix + i); + } + builder.build(0); + finishLatch.countDown(); + } catch (IOException e) {} + } + } + + @Test + public void testSharedBuilder() throws IOException, InterruptedException { + String resourcePath = "shared_builder"; + final CountDownLatch startLatch = new CountDownLatch(3); + final CountDownLatch finishLatch = new CountDownLatch(3); + + AppendTrieDictionary.Builder builder = AppendTrieDictionary.Builder.getInstance(resourcePath); + Thread t1 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t1_", 10000); + Thread t2 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t2_", 10); + Thread t3 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t3_", 100000); + t1.start(); + t2.start(); + t3.start(); + startLatch.await(); + AppendTrieDictionary dict = builder.build(0); + assertTrue("AppendDictBuilder Thread too slow", finishLatch.await(3000, TimeUnit.MILLISECONDS)); + assertEquals(110010, dict.getMaxId()); + try { + builder.addValue("fail"); + fail("Builder should be closed"); + } catch (Exception e) {} + + builder = AppendTrieDictionary.Builder.getInstance(resourcePath, dict); + builder.addValue("success"); + dict = builder.build(0); + for (int i = 0; i < 10000; i ++) { + assertNotEquals(-1, dict.getIdFromValue("t1_" + i)); + } + for (int i = 0; i < 10; i ++) { + assertNotEquals(-1, dict.getIdFromValue("t2_" + i)); + } + for (int i = 0; i < 100000; i ++) { + assertNotEquals(-1, dict.getIdFromValue("t3_" + i)); + } + assertEquals(110011, dict.getIdFromValue("success")); + } +} diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java index df64f3f..381e0b1 100644 --- a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java +++ b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java @@ -17,11 +17,12 @@ */ package org.apache.kylin.dict; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; import org.junit.Test; import java.io.*; @@ -91,39 +92,26 @@ public class CachedTreeMapTest { public static class CachedFileFilter implements FileFilter { @Override public boolean accept(File pathname) { - return pathname.getName().startsWith("cached_"); + return pathname.getName().startsWith(CachedTreeMap.CACHED_PREFIX); } } - public static final String baseDir = "/tmp/kylin_cachedtreemap_test/"; - public static final String backupDir = "/tmp/kylin_cachedtreemap_test.bak/"; - public static final String tmpDir = "/tmp/kylin_cachedtreemap_test.tmp/"; - - private static void cleanup() { - File dir = new File(baseDir); - if (dir.exists()) { - for (File f : dir.listFiles()) { - f.delete(); - } - dir.delete(); - } - - dir = new File(tmpDir); - if (dir.exists()) { - for (File f : dir.listFiles()) { - f.delete(); - } - dir.delete(); + public static class VersionFilter implements FileFilter { + @Override + public boolean accept(File pathname) { + return pathname.getName().startsWith(CachedTreeMap.VERSION_PREFIX); } + } - dir = new File(backupDir); - if (dir.exists()) { - for (File f : dir.listFiles()) { - f.delete(); - } - dir.delete(); - } + public static final String baseDir = "/tmp/kylin_cachedtreemap_test/"; + public static final String workingDir = "/tmp/kylin_cachedtreemap_test/working"; + private static void cleanup() { + Configuration conf = new Configuration(); + Path basePath = new Path(baseDir); + try { + FileSystem.get(basePath.toUri(), conf).delete(basePath, true); + } catch (IOException e) {} VALUE_WRITE_ERROR_TOGGLE = false; } @@ -139,154 +127,240 @@ public class CachedTreeMapTest { @Test public void testCachedTreeMap() throws IOException { - CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .persistent(true).immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build(); + CachedTreeMap map = createMutableMap(); map.put(Key.of(1), Value.of("a")); map.put(Key.of(2), Value.of("b")); map.put(Key.of(3), Value.of("c")); map.put(Key.of(4), Value.of("d")); map.put(Key.of(5), Value.of("e")); - File dir = new File(tmpDir); + File dir = new File(workingDir); assertEquals(3, dir.listFiles(new CachedFileFilter()).length); - DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpDir+"/.index")); - map.write(out); - out.flush(); - out.close(); - map.commit(false); + flushAndCommit(map, true, true, false); + assertFalse(new File(workingDir).exists()); - dir = new File(baseDir); + dir = new File(map.getLatestVersion()); assertEquals(5, dir.listFiles(new CachedFileFilter()).length); - DataInputStream in = new DataInputStream(new FileInputStream(baseDir+".index")); - CachedTreeMap map2 = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build(); - map2.readFields(in); + CachedTreeMap map2 = createImmutableMap(); assertEquals(5, map2.size()); assertEquals("b", ((Value)map2.get(Key.of(2))).valueStr); try { map2.put(Key.of(6), Value.of("f")); fail("Should be error when put value into immutable map"); - } catch (AssertionError error) { + } catch (AssertionError error) {} + } + + @Test + public void testMultiVersions() throws IOException, InterruptedException { + CachedTreeMap map = createMutableMap(); + Thread.sleep(3000); + map.put(Key.of(1), Value.of("a")); + map.put(Key.of(2), Value.of("b")); + map.put(Key.of(3), Value.of("c")); + flushAndCommit(map, true, true, false); + + CachedTreeMap map2 = createImmutableMap(); + assertEquals("b", ((Value)map2.get(Key.of(2))).valueStr); + + // re-open dict, append new data + map = createMutableMap(); + map.put(Key.of(4), Value.of("d")); + flushAndCommit(map, true, true, true); + + // new data is not visible for map2 + assertNull(map2.get(Key.of(4))); + + // append data, and be visible for new immutable map + map.put(Key.of(5), Value.of("e")); + flushAndCommit(map, true, true, true); + + CachedTreeMap map3 = createImmutableMap(); + assertEquals("d", ((Value)map3.get(Key.of(4))).valueStr); + assertEquals("e", ((Value)map3.get(Key.of(5))).valueStr); + + // Check versions retention + File dir = new File(baseDir); + assertEquals(3, dir.listFiles(new VersionFilter()).length); + } + + @Test + public void testKeepAppend() throws IOException { + CachedTreeMap map = createMutableMap(); + map.put(Key.of(1), Value.of("a")); + map.put(Key.of(2), Value.of("b")); + map.put(Key.of(3), Value.of("c")); + map.put(Key.of(4), Value.of("d")); + map.put(Key.of(5), Value.of("e")); + + // flush with keepAppend false, map can't be append + flushAndCommit(map, true, true, false); + // append into map has closed + try { + map.put(Key.of(6), Value.of("f")); + fail(); + } catch (AssertionError e) { + assertEquals("Only support put method with immutable false and keepAppend true", e.getMessage()); } - assertFalse(new File(tmpDir).exists()); - assertFalse(new File(backupDir).exists()); + CachedTreeMap map2 = createImmutableMap(); + assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr); + assertEquals("d", ((Value)map2.get(Key.of(4))).valueStr); + assertEquals("e", ((Value)map2.get(Key.of(5))).valueStr); + + map = createMutableMap(); + map.put(Key.of(6), Value.of("f")); + map.put(Key.of(7), Value.of("g")); + map.put(Key.of(8), Value.of("h")); + // flush with keepAppend true + flushAndCommit(map, true, true, true); + map.put(Key.of(9), Value.of("i")); + // can still append data + flushAndCommit(map, true, true, false); + + map2 = createImmutableMap(); + assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr); + assertEquals("d", ((Value)map2.get(Key.of(4))).valueStr); + assertEquals("f", ((Value)map2.get(Key.of(6))).valueStr); + assertEquals("i", ((Value)map2.get(Key.of(9))).valueStr); + } + + @Test + public void testVersionRetention() throws IOException, InterruptedException { + File dir = new File(baseDir); + // TTL for 3s and keep 3 versions + CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) + .immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class) + .maxVersions(3).versionTTL(1000 * 3).build(); + map.put(Key.of(1), Value.of("a")); + + // has version 0 when create map + assertEquals(1, dir.listFiles(new VersionFilter()).length); + Thread.sleep(2500); + + // flush version 1 + flushAndCommit(map, true, true, true); + assertEquals(2, dir.listFiles(new VersionFilter()).length); + + // flush version 2 + flushAndCommit(map, true, true, true); + assertEquals(3, dir.listFiles(new VersionFilter()).length); + + // flush version 3 + flushAndCommit(map, true, true, true); + // won't delete version since 3s TTL + assertEquals(4, dir.listFiles(new VersionFilter()).length); + + // sleep to make version 0 expired + Thread.sleep(500); + // flush verion 4 + flushAndCommit(map, true, true, false); + assertEquals(4, dir.listFiles(new VersionFilter()).length); + + // TTL for 100ms and keep 2 versions + map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) + .immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class) + .maxVersions(2).versionTTL(100).build(); + flushAndCommit(map, true, true, false); + assertEquals(2, dir.listFiles(new VersionFilter()).length); + } + + @Test + public void testWithOldFormat() throws IOException { + File dir = new File(baseDir); + CachedTreeMap map = createMutableMap(); + map.put(Key.of(1), Value.of("a")); + map.put(Key.of(2), Value.of("b")); + map.put(Key.of(3), Value.of("c")); + map.put(Key.of(4), Value.of("d")); + map.put(Key.of(5), Value.of("e")); + flushAndCommit(map, true, true, true); + + // move version dir to base dir, to simulate the older format + Path versionPath = new Path(map.getLatestVersion()); + Path tmpVersionPath = new Path(versionPath.getParent().getParent(), versionPath.getName()); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(versionPath.toUri(), conf); + fs.rename(versionPath, tmpVersionPath); + fs.delete(new Path(baseDir), true); + fs.rename(tmpVersionPath, new Path(baseDir)); + assertEquals(0, dir.listFiles(new VersionFilter()).length); + assertEquals(5, dir.listFiles(new CachedFileFilter()).length); + + CachedTreeMap map2 = createImmutableMap(); + assertEquals(5, map2.size()); + assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr); + assertEquals("e", ((Value)map2.get(Key.of(5))).valueStr); + + assertEquals(1, dir.listFiles(new VersionFilter()).length); + assertEquals(0, dir.listFiles(new CachedFileFilter()).length); } @Test public void testWriteFailed() throws IOException { // normal case - CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .persistent(true).immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build(); + CachedTreeMap map = createMutableMap(); map.put(Key.of(1), Value.of("a")); map.put(Key.of(2), Value.of("b")); map.put(Key.of(3), Value.of("c")); map.remove(Key.of(3)); map.put(Key.of(4), Value.of("d")); - DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpDir+".index")); - map.write(out); - out.flush(); - out.close(); - map.commit(false); + flushAndCommit(map, true, true, false); - DataInputStream in = new DataInputStream(new FileInputStream(baseDir+".index")); - CachedTreeMap map2 = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build(); - map2.readFields(in); + CachedTreeMap map2 = createImmutableMap(); assertEquals(3, map2.size()); assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr); // suppose write value failed and didn't commit data - map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .persistent(true).immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build(); + map = createMutableMap(); VALUE_WRITE_ERROR_TOGGLE = true; map.put(Key.of(1), Value.of("aa")); map.put(Key.of(2), Value.of("bb")); VALUE_WRITE_ERROR_TOGGLE = false; map.put(Key.of(3), Value.of("cc")); map.put(Key.of(4), Value.of("dd")); - out = new DataOutputStream(new FileOutputStream(tmpDir+".index")); - map.write(out); - out.flush(); - out.close(); // suppose write value failed and didn't commit data - //map.commit(false); + flushAndCommit(map, true, false, false); // read map data should not be modified - in = new DataInputStream(new FileInputStream(baseDir+".index")); - map2 = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build(); - map2.readFields(in); + map2 = createImmutableMap(); assertEquals(3, map2.size()); assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr); - assertTrue(new File(tmpDir).exists()); - assertFalse(new File(backupDir).exists()); + assertTrue(new File(workingDir).exists()); } - @Test - public void testCommit() throws IOException { + private CachedTreeMap createImmutableMap() throws IOException { CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .persistent(true).immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build(); - map.put(Key.of(1), Value.of("a")); - map.put(Key.of(2), Value.of("b")); - map.put(Key.of(3), Value.of("c")); - map.put(Key.of(4), Value.of("d")); - - DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpDir+".index")); - map.write(out); - out.flush(); - out.close(); - map.commit(true); - - assertTrue(new File(tmpDir).exists()); - assertFalse(new File(backupDir).exists()); + .immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build(); + try (DataInputStream in = map.openIndexInput()) { + map.readFields(in); + } + return map; + } - DataInputStream in = new DataInputStream(new FileInputStream(baseDir+".index")); - CachedTreeMap map2 = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build(); - map2.readFields(in); - assertEquals(4, map2.size()); - assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr); + private CachedTreeMap createMutableMap() throws IOException { + CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) + .immutable(false).maxSize(2).maxVersions(3).versionTTL(1000 * 3).keyClazz(Key.class).valueClazz(Value.class).build(); + try (DataInputStream in = map.openIndexInput()) { + map.readFields(in); + } catch (IOException e) {} + return map; + } - // continue modify map, but not commit - map.put(Key.of(1), Value.of("aa")); - map.put(Key.of(2), Value.of("bb")); - map.put(Key.of(3), Value.of("cc")); - map.put(Key.of(5), Value.of("e")); - map.put(Key.of(6), Value.of("f")); - out = new DataOutputStream(new FileOutputStream(tmpDir+".index")); - map.write(out); - out.flush(); - out.close(); - - assertTrue(new File(tmpDir).exists()); - assertEquals(6, new File(tmpDir).listFiles(new CachedFileFilter()).length); - assertEquals(4, new File(baseDir).listFiles(new CachedFileFilter()).length); - - in = new DataInputStream(new FileInputStream(baseDir+".index")); - map2 = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build(); - map2.readFields(in); - assertEquals(4, map2.size()); - assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr); + private void flushAndCommit(CachedTreeMap map, boolean doFlush, boolean doCommit, boolean keepAppend) throws IOException { + if (doFlush) { + try (DataOutputStream out = map.openIndexOutput()) { + map.write(out); + } + } - // commit data - map.commit(false); - assertFalse(new File(tmpDir).exists()); - assertEquals(6, new File(baseDir).listFiles(new CachedFileFilter()).length); - - in = new DataInputStream(new FileInputStream(baseDir+".index")); - map2 = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir) - .persistent(true).immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build(); - map2.readFields(in); - assertEquals(6, map2.size()); - assertEquals("aa", ((Value)map2.get(Key.of(1))).valueStr); - assertEquals("f", ((Value)map2.get(Key.of(6))).valueStr); + if (doCommit) { + map.commit(keepAppend); + } } } diff --git a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java index f0e21cf..f7796c0 100644 --- a/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java +++ b/core-metadata/src/test/java/org/apache/kylin/measure/bitmap/BitmapCounterTest.java @@ -44,10 +44,12 @@ public class BitmapCounterTest { counter2.add(12273456); counter2.add("4258"); counter2.add(123); - assertEquals(4, counter2.getCount()); + counter2.add(-2147483648); + counter2.add(-2); + assertEquals(6, counter2.getCount()); counter.merge(counter2); - assertEquals(6, counter.getCount()); + assertEquals(8, counter.getCount()); System.out.print("counter size: " + counter.getMemBytes() + ", counter2 size: " + counter2.getMemBytes()); } diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json index 0470dc6..a16f949 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json @@ -116,12 +116,12 @@ }, "dependent_measure_ref" : null }, { - "name" : "SITE_NAME_BITMAP", + "name" : "PRICE_BITMAP", "function" : { "expression" : "COUNT_DISTINCT", "parameter" : { "type" : "column", - "value" : "SITE_NAME", + "value" : "PRICE", "next_parameter" : null }, "returntype" : "bitmap" @@ -211,7 +211,7 @@ } ], "dictionaries" : [ { - "column" : "SITE_NAME", + "column" : "PRICE", "builder": "org.apache.kylin.dict.GlobalDictionaryBuilder" } ], @@ -257,7 +257,7 @@ "name" : "f2", "columns" : [ { "qualifier" : "m", - "measure_refs" : [ "seller_cnt_bitmap", "site_name_bitmap", "seller_format_cnt"] + "measure_refs" : [ "seller_cnt_bitmap", "price_bitmap", "seller_format_cnt"] } ] }, { "name" : "f3", diff --git a/kylin-it/src/test/resources/query/sql_distinct_precisely/query00.sql b/kylin-it/src/test/resources/query/sql_distinct_precisely/query00.sql index a3948c3..28d73bc 100644 --- a/kylin-it/src/test/resources/query/sql_distinct_precisely/query00.sql +++ b/kylin-it/src/test/resources/query/sql_distinct_precisely/query00.sql @@ -19,6 +19,6 @@ select lstg_format_name, cal_dt, sum(price) as GMV, count(1) as TRANS_CNT, - count(distinct seller_id) as seller_count + count(distinct price) as price_count from test_kylin_fact group by lstg_format_name, cal_dt diff --git a/kylin-it/src/test/resources/query/sql_distinct_precisely/query01.sql b/kylin-it/src/test/resources/query/sql_distinct_precisely/query01.sql index e8579ef..a965f77 100644 --- a/kylin-it/src/test/resources/query/sql_distinct_precisely/query01.sql +++ b/kylin-it/src/test/resources/query/sql_distinct_precisely/query01.sql @@ -19,7 +19,7 @@ select lstg_format_name, sum(price) as GMV, count(1) as TRANS_CNT, - count(distinct seller_id) as seller_count + count(distinct price) as price_count from test_kylin_fact where lstg_format_name='FP-GTC' group by lstg_format_name diff --git a/kylin-it/src/test/resources/query/sql_distinct_precisely/query02.sql b/kylin-it/src/test/resources/query/sql_distinct_precisely/query02.sql index 48f49e9..9fa9844 100644 --- a/kylin-it/src/test/resources/query/sql_distinct_precisely/query02.sql +++ b/kylin-it/src/test/resources/query/sql_distinct_precisely/query02.sql @@ -19,7 +19,7 @@ select lstg_format_name, sum(price) as GMV, count(1) as TRANS_CNT, - count(distinct seller_id) as seller_count + count(distinct price) as price_count from test_kylin_fact where lstg_format_name='FP-GTC' group by lstg_format_name diff --git a/kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql b/kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql index dbc2fac..1e26891 100644 --- a/kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql +++ b/kylin-it/src/test/resources/query/sql_distinct_precisely/query03.sql @@ -17,7 +17,8 @@ -- select test_cal_dt.week_beg_dt,sum(test_kylin_fact.price) as GMV - , count(1) as TRANS_CNT, count(distinct seller_id) as seller_count + , count(1) as TRANS_CNT + , count(distinct price) as price_count , count(distinct site_name) as site_count from test_kylin_fact inner JOIN edw.test_cal_dt as test_cal_dt diff --git a/kylin-it/src/test/resources/query/sql_distinct_precisely/query04.sql b/kylin-it/src/test/resources/query/sql_distinct_precisely/query04.sql index 69006ce..60b936e 100644 --- a/kylin-it/src/test/resources/query/sql_distinct_precisely/query04.sql +++ b/kylin-it/src/test/resources/query/sql_distinct_precisely/query04.sql @@ -17,7 +17,8 @@ -- select test_cal_dt.week_beg_dt,sum(test_kylin_fact.price) as GMV - , count(1) as TRANS_CNT, count(distinct seller_id) as seller_count + , count(1) as TRANS_CNT + , count(distinct price) as price_count , count(distinct site_name) as site_count from test_kylin_fact inner JOIN edw.test_cal_dt as test_cal_dt diff --git a/kylin-it/src/test/resources/query/sql_distinct_precisely/query05.sql b/kylin-it/src/test/resources/query/sql_distinct_precisely/query05.sql index dea09f7..1787d8d 100644 --- a/kylin-it/src/test/resources/query/sql_distinct_precisely/query05.sql +++ b/kylin-it/src/test/resources/query/sql_distinct_precisely/query05.sql @@ -19,7 +19,7 @@ select lstg_format_name, sum(price) as GMV, count(1) as TRANS_CNT, - count(distinct seller_id) as seller_count + count(distinct price) as price_count from test_kylin_fact group by lstg_format_name order by lstg_format_name diff --git a/kylin-it/src/test/resources/query/sql_distinct_precisely/query06.sql b/kylin-it/src/test/resources/query/sql_distinct_precisely/query06.sql index eb12620..08e6ffa 100644 --- a/kylin-it/src/test/resources/query/sql_distinct_precisely/query06.sql +++ b/kylin-it/src/test/resources/query/sql_distinct_precisely/query06.sql @@ -19,7 +19,7 @@ select lstg_format_name, sum(price) as GMV, count(1) as TRANS_CNT, - count(distinct seller_id) as seller_count + count(distinct price) as price_count from test_kylin_fact where lstg_format_name='FP-GTC' group by lstg_format_name diff --git a/kylin-it/src/test/resources/query/sql_distinct_precisely/query07.sql b/kylin-it/src/test/resources/query/sql_distinct_precisely/query07.sql index 9bd2663..4ffd606 100644 --- a/kylin-it/src/test/resources/query/sql_distinct_precisely/query07.sql +++ b/kylin-it/src/test/resources/query/sql_distinct_precisely/query07.sql @@ -19,6 +19,6 @@ select lstg_format_name, sum(price) as GMV, count(1) as TRANS_CNT, - count(distinct seller_id) as seller_count + count(distinct price) as price_count from test_kylin_fact group by lstg_format_name -- 2.3.2 (Apple Git-55)