From e2663118697325d8727023f9ef1b0a30ccbed8d2 Mon Sep 17 00:00:00 2001 From: sunyerui Date: Wed, 8 Jun 2016 19:08:07 +0800 Subject: [PATCH] KYLIN-1775 Add Cube Migrate Support for Global Dictionary --- .../org/apache/kylin/common/util/Dictionary.java | 7 +++++ .../apache/kylin/dict/AppendTrieDictionary.java | 34 +++++++++++++--------- .../apache/kylin/dict/GlobalDictionaryBuilder.java | 2 +- .../kylin/storage/hbase/util/CubeMigrationCLI.java | 4 ++- 4 files changed, 32 insertions(+), 15 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java index 818b4d8..f561cf8 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java @@ -18,6 +18,8 @@ package org.apache.kylin.common.util; +import org.apache.kylin.common.KylinConfig; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -207,6 +209,11 @@ abstract public class Dictionary implements Serializable { return (nullId & id) == nullId; } + // Some dict need updated when copy from one metadata environment to another + public Dictionary copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException { + return this; + } + /** utility that converts a dictionary ID to string, preserving order */ public static String dictIdToString(byte[] idBytes, int offset, int length) { try { diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java index 0d5b7df..f6d97c2 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java @@ -18,15 +18,7 @@ package org.apache.kylin.dict; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.io.UnsupportedEncodingException; +import java.io.*; import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.Arrays; @@ -37,10 +29,7 @@ import java.util.List; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.kylin.common.KylinConfig; @@ -1066,6 +1055,25 @@ public class AppendTrieDictionary extends Dictionary { } @Override + public AppendTrieDictionary copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException { + Configuration conf = new Configuration(); + AppendTrieDictionary newDict = new AppendTrieDictionary(); + newDict.update(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), + baseId, maxId, maxValueLength, nValues, bytesConverter, writeDictMap()); + logger.info("Copy AppendDict from {} to {}", this.baseDir, newDict.baseDir); + Path srcPath = new Path(this.baseDir); + Path dstPath = new Path(newDict.baseDir); + FileSystem dstFs = FileSystem.get(dstPath.toUri(), conf); + if (dstFs.exists(dstPath)) { + logger.info("Delete existing AppendDict {}", dstPath); + dstFs.delete(dstPath, true); + } + FileUtil.copy(FileSystem.get(srcPath.toUri(), conf), srcPath, FileSystem.get(dstPath.toUri(), conf), dstPath, false, true, conf); + + return newDict; + } + + @Override public void write(DataOutput out) throws IOException { out.writeUTF(baseDir); flushIndex(); diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java index 0f4d8bb..9405f46 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java @@ -42,7 +42,7 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder { if (dictInfo == null) { throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo"); } - String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict/" + dictInfo.getResourceDir() + "/"; + String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + dictInfo.getResourceDir() + "/"; // Try to load the existing dict from cache, making sure there's only the same one object in memory NavigableSet dicts = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().listResources(dictInfo.getResourceDir()); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java index 5acecbc..d33975e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java @@ -30,6 +30,7 @@ import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -336,7 +337,8 @@ public class CubeMigrationCLI { long ts = dictSrc.getLastModified(); dictSrc.setLastModified(0);//to avoid resource store write conflict - DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictSrc.getDictionaryObject(), dictSrc); + Dictionary dictObj = dictSrc.getDictionaryObject().copyToAnotherMeta(srcConfig, dstConfig); + DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictObj, dictSrc); dictSrc.setLastModified(ts); if (dictSaved == dictSrc) { -- 2.3.2 (Apple Git-55)