From b30718f4ebe4b724425b884fec09531e59591954 Mon Sep 17 00:00:00 2001 From: "Ma,Gang" Date: Mon, 4 Jun 2018 15:00:22 +0800 Subject: [PATCH] KYLIN-3396 NPE throws when materialize lookup table to HBase --- .../kylin/engine/mr/BatchCubingJobBuilder2.java | 13 ++-- .../kylin/engine/mr/ILookupMaterializer.java | 18 ++++- .../apache/kylin/engine/mr/JobBuilderSupport.java | 5 +- .../kylin/engine/mr/LookupMaterializeContext.java | 84 ++++++++++++++++++++++ .../kylin/engine/mr/LookupSnapshotJobBuilder.java | 6 +- .../kylin/engine/mr/common/BatchConstants.java | 6 +- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 21 ++++-- .../steps/lookup/UpdateCubeAfterSnapshotStep.java | 15 ++-- .../mr/steps/LookupMaterializeContextTest.java | 41 +++++++++++ .../org/apache/kylin/rest/service/CubeService.java | 1 + .../storage/hbase/lookup/HBaseLookupMRSteps.java | 20 +++--- .../hbase/lookup/HBaseLookupMaterializer.java | 10 +-- .../hbase/lookup/LookupTableToHFileJob.java | 11 --- 13 files changed, 200 insertions(+), 51 deletions(-) create mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupMaterializeContext.java create mode 100644 engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/LookupMaterializeContextTest.java diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index a840bf7..b1149ed 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -72,7 +72,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { result.addTask(createSaveStatisticsStep(jobId)); // add materialize lookup tables if needed - addMaterializeLookupTableSteps(result); + LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result); outputSide.addStepPhase2_BuildDictionary(result); @@ -82,7 +82,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { outputSide.addStepPhase3_BuildCube(result); // Phase 4: Update Metadata & Cleanup - result.addTask(createUpdateCubeInfoAfterBuildStep(jobId)); + result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext)); inputSide.addStepPhase4_Cleanup(result); outputSide.addStepPhase4_Cleanup(result); @@ -102,16 +102,19 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { return true; } - private void addMaterializeLookupTableSteps(final CubingJob result) { + private LookupMaterializeContext addMaterializeLookupTableSteps(final CubingJob result) { + LookupMaterializeContext lookupMaterializeContext = new LookupMaterializeContext(result); CubeDesc cubeDesc = seg.getCubeDesc(); List allSnapshotTypes = cubeDesc.getAllExtLookupSnapshotTypes(); if (allSnapshotTypes.isEmpty()) { - return; + return null; } for (String snapshotType : allSnapshotTypes) { + logger.info("add lookup table materialize steps for storage type:{}", snapshotType); ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotType); - materializer.materializeLookupTablesForCube(result, seg.getCubeInstance()); + materializer.materializeLookupTablesForCube(lookupMaterializeContext, seg.getCubeInstance()); } + return lookupMaterializeContext; } protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java index f103da2..6ec4857 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java @@ -19,10 +19,22 @@ package org.apache.kylin.engine.mr; import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.job.execution.DefaultChainedExecutable; public interface ILookupMaterializer { - void materializeLookupTable(DefaultChainedExecutable jobFlow, CubeInstance cube, String lookupTableName); + /** + * materialize lookup table + * @param context materialize context, the snapshotPath of lookup table should be put into context + * via {@code LookupMaterializeContext.addLookupSnapshotPath} method + * @param cube + * @param lookupTableName + */ + void materializeLookupTable(LookupMaterializeContext context, CubeInstance cube, String lookupTableName); - void materializeLookupTablesForCube(DefaultChainedExecutable jobFlow, CubeInstance cube); + /** + * materialize all ext lookup tables in the cube + * @param context materialize context, the snapshotPath of lookup table should be put into context + * via {@code LookupMaterializeContext.addLookupSnapshotPath} method + * @param cube + */ + void materializeLookupTablesForCube(LookupMaterializeContext context, CubeInstance cube); } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 8a420df..6458a6a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -135,10 +135,13 @@ public class JobBuilderSupport { return buildDictionaryStep; } - public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) { + public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId, LookupMaterializeContext lookupMaterializeContext) { final UpdateCubeInfoAfterBuildStep result = new UpdateCubeInfoAfterBuildStep(); result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); result.getParams().put(BatchConstants.CFG_OUTPUT_PATH, getFactDistinctColumnsPath(jobId)); + if (lookupMaterializeContext != null) { + result.getParams().put(BatchConstants.ARG_EXT_LOOKUP_SNAPSHOTS_INFO, lookupMaterializeContext.getAllLookupSnapshotsInString()); + } CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupMaterializeContext.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupMaterializeContext.java new file mode 100644 index 0000000..f235283 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupMaterializeContext.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr; + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.kylin.job.execution.DefaultChainedExecutable; + +import com.google.common.collect.Maps; + +public class LookupMaterializeContext { + private DefaultChainedExecutable jobFlow; + private Map lookupSnapshotMap; + + public LookupMaterializeContext(DefaultChainedExecutable jobFlow) { + this.jobFlow = jobFlow; + this.lookupSnapshotMap = Maps.newHashMap(); + } + + public DefaultChainedExecutable getJobFlow() { + return jobFlow; + } + + /** + * add snapshot path info into the context + * @param lookupTable + * @param snapshotPath + */ + public void addLookupSnapshotPath(String lookupTable, String snapshotPath) { + lookupSnapshotMap.put(lookupTable, snapshotPath); + } + + /** + * + * @return string format of lookup snapshotPath info, it will return like: "lookup1=/path/uuid1,lookup2=/path/uuid2" + * + */ + public String getAllLookupSnapshotsInString() { + StringBuilder result = new StringBuilder(); + boolean first = true; + for (Entry lookupSnapshotEntry : lookupSnapshotMap.entrySet()) { + if (!first) { + result.append(","); + } + first = false; + result.append(lookupSnapshotEntry.getKey()); + result.append("="); + result.append(lookupSnapshotEntry.getValue()); + } + return result.toString(); + } + + /** + * parse the lookup snapshot string to lookup snapshot path map. + * @param snapshotsString + * @return + */ + public static Map parseLookupSnapshots(String snapshotsString) { + Map lookupSnapshotMap = Maps.newHashMap(); + String[] lookupSnapshotEntries = snapshotsString.split(","); + for (String lookupSnapshotEntryStr : lookupSnapshotEntries) { + String[] split = lookupSnapshotEntryStr.split("="); + lookupSnapshotMap.put(split[0], split[1]); + } + return lookupSnapshotMap; + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotJobBuilder.java index e7888a5..fff9bcf 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotJobBuilder.java @@ -24,6 +24,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.SnapshotTableDesc; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.steps.lookup.LookupExecutableUtil; import org.apache.kylin.engine.mr.steps.lookup.LookupSnapshotToMetaStoreStep; import org.apache.kylin.engine.mr.steps.lookup.UpdateCubeAfterSnapshotStep; @@ -62,11 +63,14 @@ public class LookupSnapshotJobBuilder { private void addExtMaterializeLookupTableSteps(final LookupSnapshotBuildJob result, SnapshotTableDesc snapshotTableDesc) { + LookupMaterializeContext lookupMaterializeContext = new LookupMaterializeContext(result); ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotTableDesc.getStorageType()); - materializer.materializeLookupTable(result, cube, lookupTable); + materializer.materializeLookupTable(lookupMaterializeContext, cube, lookupTable); UpdateCubeAfterSnapshotStep afterSnapshotStep = new UpdateCubeAfterSnapshotStep(); afterSnapshotStep.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE); + + afterSnapshotStep.getParams().put(BatchConstants.ARG_EXT_LOOKUP_SNAPSHOTS_INFO, lookupMaterializeContext.getAllLookupSnapshotsInString()); LookupExecutableUtil.setCubeName(cube.getName(), afterSnapshotStep.getParams()); LookupExecutableUtil.setLookupTableName(lookupTable, afterSnapshotStep.getParams()); LookupExecutableUtil.setSegments(segments, afterSnapshotStep.getParams()); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 36f2566..18ac4ac 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -99,6 +99,7 @@ public interface BatchConstants { String ARG_DICT_PATH = "dictPath"; String ARG_TABLE_NAME = "tableName"; String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID"; + String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots"; /** * logger and counter @@ -111,10 +112,5 @@ public interface BatchConstants { */ String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder"; - /** - * the prefix of ext lookup table snapshot resource path that stored in the build context - */ - String LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX = "lookup.ext.snapshot.res.path."; - String LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX = "lookup.ext.snapshot.src.record.cnt."; } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index 3167bca..f749c80 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -22,6 +22,7 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.List; +import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataInputStream; @@ -33,6 +34,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.SnapshotTableDesc; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.LookupMaterializeContext; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; @@ -73,7 +75,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { segment.setInputRecordsSize(sourceSizeBytes); try { - saveExtSnapshotIfNeeded(cubeManager, cubingJob, cube, segment); + saveExtSnapshotIfNeeded(cubeManager, cube, segment); if (segment.isOffsetCube()) { updateTimeRange(segment); } @@ -86,19 +88,26 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { } } - private void saveExtSnapshotIfNeeded(CubeManager cubeManager, CubingJob cubingJob, CubeInstance cube, CubeSegment segment) throws IOException { + private void saveExtSnapshotIfNeeded(CubeManager cubeManager, CubeInstance cube, CubeSegment segment) throws IOException { + String extLookupSnapshotStr = this.getParam(BatchConstants.ARG_EXT_LOOKUP_SNAPSHOTS_INFO); + if (extLookupSnapshotStr == null || extLookupSnapshotStr.isEmpty()) { + return; + } + Map extLookupSnapshotMap = LookupMaterializeContext.parseLookupSnapshots(extLookupSnapshotStr); + logger.info("update ext lookup snapshots:{}", extLookupSnapshotMap); List snapshotTableDescList = cube.getDescriptor().getSnapshotTableDescList(); for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescList) { String tableName = snapshotTableDesc.getTableName(); if (snapshotTableDesc.isExtSnapshotTable()) { - String contextKey = BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + tableName; - String newSnapshotResPath = cubingJob.getExtraInfo(contextKey); - if (newSnapshotResPath == null) { + String newSnapshotResPath = extLookupSnapshotMap.get(tableName); + if (newSnapshotResPath == null || newSnapshotResPath.isEmpty()) { continue; } if (snapshotTableDesc.isGlobal()) { - cubeManager.updateCubeLookupSnapshot(cube, tableName, newSnapshotResPath); + if (!newSnapshotResPath.equals(cube.getSnapshotResPath(tableName))) { + cubeManager.updateCubeLookupSnapshot(cube, tableName, newSnapshotResPath); + } } else { segment.putSnapshotResPath(tableName, newSnapshotResPath); } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java index 42290b0..514c940 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java @@ -20,15 +20,16 @@ package org.apache.kylin.engine.mr.steps.lookup; import java.io.IOException; import java.util.List; +import java.util.Map; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.LookupMaterializeContext; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; import org.slf4j.Logger; @@ -53,17 +54,21 @@ public class UpdateCubeAfterSnapshotStep extends AbstractExecutable { CubeInstance cube = cubeManager.getCube(LookupExecutableUtil.getCubeName(this.getParams())); List segmentIDs = LookupExecutableUtil.getSegments(this.getParams()); String lookupTableName = LookupExecutableUtil.getLookupTableName(this.getParams()); - DefaultChainedExecutable job = (DefaultChainedExecutable) getManager().getJob(LookupExecutableUtil.getJobID(this.getParams())); - String contextKey = BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + lookupTableName; - String snapshotResPath = job.getExtraInfo(contextKey); + String extLookupSnapshotStr = this.getParam(BatchConstants.ARG_EXT_LOOKUP_SNAPSHOTS_INFO); + if (extLookupSnapshotStr == null || extLookupSnapshotStr.isEmpty()) { + return new ExecuteResult(); + } + + Map extLookupSnapshotMap = LookupMaterializeContext.parseLookupSnapshots(extLookupSnapshotStr); + String snapshotResPath = extLookupSnapshotMap.get(lookupTableName); if (snapshotResPath == null) { logger.info("no snapshot path exist in the context, so no need to update snapshot path"); return new ExecuteResult(); } CubeDesc cubeDesc = cube.getDescriptor(); try { - logger.info("update snapshot path to cube metadata"); + logger.info("update snapshot path:{} to cube:{}", snapshotResPath, cube.getName()); if (cubeDesc.isGlobalSnapshotTable(lookupTableName)) { if (!snapshotResPath.equals(cube.getSnapshotResPath(lookupTableName))) { LookupExecutableUtil.updateSnapshotPathToCube(cubeManager, cube, lookupTableName, diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/LookupMaterializeContextTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/LookupMaterializeContextTest.java new file mode 100644 index 0000000..a876cb2 --- /dev/null +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/LookupMaterializeContextTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import org.apache.kylin.engine.mr.LookupMaterializeContext; +import org.junit.Test; + +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class LookupMaterializeContextTest { + @Test + public void parseAndToStringTest() throws Exception { + LookupMaterializeContext context = new LookupMaterializeContext(null); + context.addLookupSnapshotPath("lookup1", "/ext_snapshot/uuid1"); + context.addLookupSnapshotPath("lookup2", "/ext_snapshot/uuid2"); + + String lookupSnapshotsStr = context.getAllLookupSnapshotsInString(); + Map lookupSnapshotMap = LookupMaterializeContext.parseLookupSnapshots(lookupSnapshotsStr); + assertEquals(2, lookupSnapshotMap.size()); + assertEquals("/ext_snapshot/uuid1", lookupSnapshotMap.get("lookup1")); + assertEquals("/ext_snapshot/uuid2", lookupSnapshotMap.get("lookup2")); + } +} diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index 953fa58..da90771 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -512,6 +512,7 @@ public class CubeService extends BasicService implements InitializingBean { update.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); update.setCuboids(Maps. newHashMap()); update.setCuboidsRecommend(Sets. newHashSet()); + update.setUpdateTableSnapshotPath(Maps.newHashMap()); CubeManager.getInstance(getConfig()).updateCube(update); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java index 6101fca..1c91be2 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java @@ -31,6 +31,7 @@ import org.apache.kylin.cube.model.SnapshotTableDesc; import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo; import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager; import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.LookupMaterializeContext; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; @@ -59,7 +60,7 @@ public class HBaseLookupMRSteps { this.config = new JobEngineConfig(cube.getConfig()); } - public void addMaterializeLookupTablesSteps(DefaultChainedExecutable jobFlow) { + public void addMaterializeLookupTablesSteps(LookupMaterializeContext context) { CubeDesc cubeDesc = cube.getDescriptor(); Set allLookupTables = Sets.newHashSet(); for (DimensionDesc dim : cubeDesc.getDimensions()) { @@ -72,22 +73,22 @@ public class HBaseLookupMRSteps { for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescs) { if (ExtTableSnapshotInfo.STORAGE_TYPE_HBASE.equals(snapshotTableDesc.getStorageType()) && allLookupTables.contains(snapshotTableDesc.getTableName())) { - addMaterializeLookupTableSteps(jobFlow, snapshotTableDesc.getTableName(), snapshotTableDesc); + addMaterializeLookupTableSteps(context, snapshotTableDesc.getTableName(), snapshotTableDesc); } } } - public void addMaterializeLookupTableSteps(DefaultChainedExecutable jobFlow, String tableName, SnapshotTableDesc snapshotTableDesc) { + public void addMaterializeLookupTableSteps(LookupMaterializeContext context, String tableName, SnapshotTableDesc snapshotTableDesc) { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig); TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(tableName, cube.getProject()); IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc); try { - ExtTableSnapshotInfo latestSnapshot = extTableSnapshotInfoManager.getLatestSnapshot(sourceTable.getSignature(), tableName); + ExtTableSnapshotInfo latestSnapshot = extTableSnapshotInfoManager.getLatestSnapshot( + sourceTable.getSignature(), tableName); if (latestSnapshot != null) { logger.info("there is latest snapshot exist for table:{}, skip build snapshot step.", tableName); - jobFlow.addExtraInfo(BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + latestSnapshot.getTableName(), - latestSnapshot.getResourcePath()); + context.addLookupSnapshotPath(tableName, latestSnapshot.getResourcePath()); return; } } catch (IOException ioException) { @@ -95,10 +96,11 @@ public class HBaseLookupMRSteps { } logger.info("add build snapshot steps for table:{}", tableName); String snapshotID = genLookupSnapshotID(); - addLookupTableConvertToHFilesStep(jobFlow, tableName, snapshotID); - addLookupTableHFilesBulkLoadStep(jobFlow, tableName, snapshotID); + context.addLookupSnapshotPath(tableName, ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID)); + addLookupTableConvertToHFilesStep(context.getJobFlow(), tableName, snapshotID); + addLookupTableHFilesBulkLoadStep(context.getJobFlow(), tableName, snapshotID); if (snapshotTableDesc !=null && snapshotTableDesc.isEnableLocalCache()) { - addUpdateSnapshotQueryCacheStep(jobFlow, tableName, snapshotID); + addUpdateSnapshotQueryCacheStep(context.getJobFlow(), tableName, snapshotID); } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java index cf28ed6..31d9150 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java @@ -21,20 +21,20 @@ package org.apache.kylin.storage.hbase.lookup; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.model.SnapshotTableDesc; import org.apache.kylin.engine.mr.ILookupMaterializer; -import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.engine.mr.LookupMaterializeContext; public class HBaseLookupMaterializer implements ILookupMaterializer{ @Override - public void materializeLookupTable(DefaultChainedExecutable jobFlow, CubeInstance cube, String lookupTableName) { + public void materializeLookupTable(LookupMaterializeContext context, CubeInstance cube, String lookupTableName) { HBaseLookupMRSteps lookupMRSteps = new HBaseLookupMRSteps(cube); SnapshotTableDesc snapshotTableDesc = cube.getDescriptor().getSnapshotTableDesc(lookupTableName); - lookupMRSteps.addMaterializeLookupTableSteps(jobFlow, lookupTableName, snapshotTableDesc); + lookupMRSteps.addMaterializeLookupTableSteps(context, lookupTableName, snapshotTableDesc); } @Override - public void materializeLookupTablesForCube(DefaultChainedExecutable jobFlow, CubeInstance cube) { + public void materializeLookupTablesForCube(LookupMaterializeContext context, CubeInstance cube) { HBaseLookupMRSteps lookupMRSteps = new HBaseLookupMRSteps(cube); - lookupMRSteps.addMaterializeLookupTablesSteps(jobFlow); + lookupMRSteps.addMaterializeLookupTablesSteps(context); } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java index aac0108..39ebe99 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java @@ -52,8 +52,6 @@ import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.JoinDesc; @@ -91,7 +89,6 @@ public class LookupTableToHFileJob extends AbstractHadoopJob { Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); String tableName = getOptionValue(OPTION_TABLE_NAME); - String cubingJobID = getOptionValue(OPTION_CUBING_JOB_ID); String lookupSnapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); @@ -112,7 +109,6 @@ public class LookupTableToHFileJob extends AbstractHadoopJob { ExtTableSnapshotInfo snapshot = createSnapshotResource(extSnapshotInfoManager, tableName, lookupSnapshotID, keyColumns, hTableNameAndShard.getFirst(), hTableNameAndShard.getSecond(), sourceTable); logger.info("created snapshot information at:{}", snapshot.getResourcePath()); - saveSnapshotInfoToJobContext(kylinConfig, cubingJobID, snapshot); job = Job.getInstance(HBaseConfiguration.create(getConf()), getOptionValue(OPTION_JOB_NAME)); @@ -194,13 +190,6 @@ public class LookupTableToHFileJob extends AbstractHadoopJob { return result; } - private void saveSnapshotInfoToJobContext(KylinConfig kylinConfig, String jobID, ExtTableSnapshotInfo snapshot) { - ExecutableManager execMgr = ExecutableManager.getInstance(kylinConfig); - DefaultChainedExecutable job = (DefaultChainedExecutable) execMgr.getJob(jobID); - job.addExtraInfo(BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + snapshot.getTableName(), - snapshot.getResourcePath()); - } - /** * * @param sourceTableName -- 2.6.4