From 8ee4abff18234dd5f19b8f2f7950823ef92b5639 Mon Sep 17 00:00:00 2001 From: jian Date: Tue, 8 Dec 2015 20:04:09 +0800 Subject: [PATCH] KYLIN-1204 update rule when save and update cube --- .../kylin/engine/streaming/StreamingManager.java | 2 +- .../kylin/rest/controller/CubeController.java | 291 ++++++++++++++------- webapp/app/js/controllers/cubeEdit.js | 2 + 3 files changed, 199 insertions(+), 96 deletions(-) diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java index 583eb7b..af04a11 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java @@ -187,7 +187,7 @@ public class StreamingManager { public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException { // Validate CubeDesc if (desc.getUuid() == null || desc.getName() == null) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("SteamingConfig Illegal."); } String name = desc.getName(); if (!streamingMap.containsKey(name)) { diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 52047ac..41f767d 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -92,13 +92,13 @@ public class CubeController extends BasicController { @Autowired private JobService jobService; - @RequestMapping(value = "", method = { RequestMethod.GET }) + @RequestMapping(value = "", method = {RequestMethod.GET}) @ResponseBody public List getCubes(@RequestParam(value = "cubeName", required = false) String cubeName, @RequestParam(value = "modelName", required = false) String modelName, @RequestParam(value = "projectName", required = false) String projectName, @RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) { return cubeService.getCubes(cubeName, projectName, modelName, limit, offset); } - @RequestMapping(value = "/{cubeName}", method = { RequestMethod.GET }) + @RequestMapping(value = "/{cubeName}", method = {RequestMethod.GET}) @ResponseBody public CubeInstance getCube(@PathVariable String cubeName) { CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); @@ -117,7 +117,7 @@ public class CubeController extends BasicController { * @throws UnknownHostException * @throws IOException */ - @RequestMapping(value = "/{cubeName}/segs/{segmentName}/sql", method = { RequestMethod.GET }) + @RequestMapping(value = "/{cubeName}/segs/{segmentName}/sql", method = {RequestMethod.GET}) @ResponseBody public GeneralResponse getSql(@PathVariable String cubeName, @PathVariable String segmentName) { CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); @@ -139,7 +139,7 @@ public class CubeController extends BasicController { * @param notifyList * @throws IOException */ - @RequestMapping(value = "/{cubeName}/notify_list", method = { RequestMethod.PUT }) + @RequestMapping(value = "/{cubeName}/notify_list", method = {RequestMethod.PUT}) @ResponseBody public void updateNotifyList(@PathVariable String cubeName, @RequestBody List notifyList) { CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); @@ -157,7 +157,7 @@ public class CubeController extends BasicController { } - @RequestMapping(value = "/{cubeName}/cost", method = { RequestMethod.PUT }) + @RequestMapping(value = "/{cubeName}/cost", method = {RequestMethod.PUT}) @ResponseBody public CubeInstance updateCubeCost(@PathVariable String cubeName, @RequestParam(value = "cost") int cost) { try { @@ -169,7 +169,7 @@ public class CubeController extends BasicController { } } - @RequestMapping(value = "/{cubeName}/coprocessor", method = { RequestMethod.PUT }) + @RequestMapping(value = "/{cubeName}/coprocessor", method = {RequestMethod.PUT}) @ResponseBody public Map updateCubeCoprocessor(@PathVariable String cubeName, @RequestParam(value = "force") String force) { try { @@ -187,7 +187,7 @@ public class CubeController extends BasicController { * * @throws IOException */ - @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = { RequestMethod.PUT }) + @RequestMapping(value = "/{cubeName}/segs/{segmentName}/refresh_lookup", method = {RequestMethod.PUT}) @ResponseBody public CubeInstance rebuildLookupSnapshot(@PathVariable String cubeName, @PathVariable String segmentName, @RequestParam(value = "lookupTable") String lookupTable) { try { @@ -205,7 +205,7 @@ public class CubeController extends BasicController { * @return * @throws IOException */ - @RequestMapping(value = "/{cubeName}/rebuild", method = { RequestMethod.PUT }) + @RequestMapping(value = "/{cubeName}/rebuild", method = {RequestMethod.PUT}) @ResponseBody public JobInstance rebuild(@PathVariable String cubeName, @RequestBody JobBuildRequest jobBuildRequest) { try { @@ -222,7 +222,7 @@ public class CubeController extends BasicController { } } - @RequestMapping(value = "/{cubeName}/disable", method = { RequestMethod.PUT }) + @RequestMapping(value = "/{cubeName}/disable", method = {RequestMethod.PUT}) @ResponseBody public CubeInstance disableCube(@PathVariable String cubeName) { try { @@ -240,7 +240,7 @@ public class CubeController extends BasicController { } } - @RequestMapping(value = "/{cubeName}/purge", method = { RequestMethod.PUT }) + @RequestMapping(value = "/{cubeName}/purge", method = {RequestMethod.PUT}) @ResponseBody public CubeInstance purgeCube(@PathVariable String cubeName) { try { @@ -258,7 +258,7 @@ public class CubeController extends BasicController { } } - @RequestMapping(value = "/{cubeName}/enable", method = { RequestMethod.PUT }) + @RequestMapping(value = "/{cubeName}/enable", method = {RequestMethod.PUT}) @ResponseBody public CubeInstance enableCube(@PathVariable String cubeName) { try { @@ -275,7 +275,7 @@ public class CubeController extends BasicController { } } - @RequestMapping(value = "/{cubeName}", method = { RequestMethod.DELETE }) + @RequestMapping(value = "/{cubeName}", method = {RequestMethod.DELETE}) @ResponseBody public void deleteCube(@PathVariable String cubeName) { CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); @@ -320,7 +320,7 @@ public class CubeController extends BasicController { * @return Table metadata array * @throws IOException */ - @RequestMapping(value = "", method = { RequestMethod.POST }) + @RequestMapping(value = "", method = {RequestMethod.POST}) @ResponseBody public CubeRequest saveCubeDesc(@RequestBody CubeRequest cubeRequest) { @@ -346,58 +346,69 @@ public class CubeController extends BasicController { throw new InternalErrorException(e.getLocalizedMessage(), e); } - //streaming Cube - if (cubeRequest.getStreamingCube() != null && cubeRequest.getStreamingCube().equals("true")) { - StreamingConfig streamingConfig = deserializeStreamingDesc(cubeRequest); - KafkaConfig kafkaConfig = deserializeKafkaDesc(cubeRequest); - if (streamingConfig == null) { - cubeRequest.setMessage("No StreamingConfig info defined."); - return cubeRequest; - } - if (kafkaConfig == null) { - cubeRequest.setMessage("No KafkaConfig info defined."); - return cubeRequest; - } - if (StringUtils.isEmpty(streamingConfig.getName())) { - logger.info("StreamingConfig Name should not be empty."); - throw new BadRequestException("StremingConfig name should not be empty."); - } - - if (StringUtils.isEmpty(kafkaConfig.getName())) { - logger.info("KafkaConfig Name should not be empty."); - throw new BadRequestException("KafkaConfig name should not be empty."); - } + boolean createStreamingConfigSuccess = false,createKafkaConfigSuccess = false; + StreamingConfig streamingConfig = null; + KafkaConfig kafkaConfig = null; - try { - streamingConfig.setUuid(UUID.randomUUID().toString()); - streamingService.createStreamingConfig(streamingConfig); + boolean isStreamingCube = cubeRequest.getStreamingCube() != null && cubeRequest.getStreamingCube().equals("true"); + try { + //streaming Cube + if (isStreamingCube) { + streamingConfig = deserializeStreamingDesc(cubeRequest); + kafkaConfig = deserializeKafkaDesc(cubeRequest); + // validate before create, rollback when error + if (kafkaConfig == null) { + cubeRequest.setMessage("No KafkaConfig info defined."); + return cubeRequest; + } + if(streamingConfig == null){ + cubeRequest.setMessage("No StreamingConfig info defined."); + return cubeRequest; } - } catch (IOException e) { try { - cubeService.deleteCube(cubeInstance); - } catch (Exception ex) { - throw new InternalErrorException("Failed to rollback on delete cube. " + " Caused by: " + e.getMessage(), ex); + streamingConfig.setUuid(UUID.randomUUID().toString()); + streamingService.createStreamingConfig(streamingConfig); + createStreamingConfigSuccess = true; + } catch (IOException e) { + logger.error("Failed to save StreamingConfig:" + e.getLocalizedMessage(), e); + throw new InternalErrorException("Failed to save StreamingConfig: " + e.getLocalizedMessage()); } - logger.error("Failed to save StreamingConfig:" + e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to save StreamingConfig: " + e.getLocalizedMessage()); - } - try { - kafkaConfig.setUuid(UUID.randomUUID().toString()); - kafkaConfigService.createKafkaConfig(kafkaConfig); - } catch (IOException e) { try { - streamingService.dropStreamingConfig(streamingConfig); - } catch (IOException e1) { - throw new InternalErrorException("StreamingConfig is created, but failed to create KafkaConfig: " + e.getLocalizedMessage()); + kafkaConfig.setUuid(UUID.randomUUID().toString()); + kafkaConfigService.createKafkaConfig(kafkaConfig); + createKafkaConfigSuccess = true; + } catch (IOException e) { + logger.error("Failed to save KafkaConfig:" + e.getLocalizedMessage(), e); + throw new InternalErrorException("Failed to save KafkaConfig: " + e.getLocalizedMessage()); } - try { - cubeService.deleteCube(cubeInstance); - } catch (Exception ex) { - throw new InternalErrorException("Failed to rollback on delete cube. " + " Caused by: " + e.getMessage(), ex); + } + }finally { + //rollback if failed + if (isStreamingCube) { + if(createStreamingConfigSuccess == false || createKafkaConfigSuccess == false){ + try { + cubeService.deleteCube(cubeInstance); + } catch (Exception ex) { + throw new InternalErrorException("Failed to rollback on delete cube. " + " Caused by: " + ex.getMessage(), ex); + } + if(createStreamingConfigSuccess == true){ + try { + streamingService.dropStreamingConfig(streamingConfig); + } catch (IOException e) { + throw new InternalErrorException("Failed to create cube, and StreamingConfig created and failed to delete: " + e.getLocalizedMessage()); + } + } + if(createKafkaConfigSuccess == true){ + try { + kafkaConfigService.dropKafkaConfig(kafkaConfig); + } catch (IOException e) { + throw new InternalErrorException("Failed to create cube, and KafkaConfig created and failed to delete: " + e.getLocalizedMessage()); + } + } + } - logger.error("Failed to save KafkaConfig:" + e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to save KafkaConfig: " + e.getLocalizedMessage()); + } } @@ -414,12 +425,13 @@ public class CubeController extends BasicController { * @throws JsonProcessingException * @throws IOException */ - @RequestMapping(value = "", method = { RequestMethod.PUT }) + @RequestMapping(value = "", method = {RequestMethod.PUT}) @ResponseBody public CubeRequest updateCubeDesc(@RequestBody CubeRequest cubeRequest) throws JsonProcessingException { //update cube CubeDesc desc = deserializeCubeDesc(cubeRequest); + CubeDesc oldCubeDesc = null; if (desc == null) { return cubeRequest; @@ -438,9 +450,10 @@ public class CubeController extends BasicController { saveCubeDesc(cubeRequest); } + String projectName = (null == cubeRequest.getProject()) ? ProjectInstance.DEFAULT_PROJECT_NAME : cubeRequest.getProject(); try { CubeInstance cube = cubeService.getCubeManager().getCube(cubeRequest.getCubeName()); - String projectName = (null == cubeRequest.getProject()) ? ProjectInstance.DEFAULT_PROJECT_NAME : cubeRequest.getProject(); + oldCubeDesc = cube.getDescriptor(); desc = cubeService.updateCubeAndDesc(cube, desc, projectName); } catch (AccessDeniedException accessDeniedException) { @@ -450,50 +463,138 @@ public class CubeController extends BasicController { throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage()); } - if (desc.getError().isEmpty()) { - cubeRequest.setSuccessful(true); - } else { + if (!desc.getError().isEmpty()) { logger.warn("Cube " + desc.getName() + " fail to update because " + desc.getError()); updateRequest(cubeRequest, false, omitMessage(desc.getError())); + return cubeRequest; } - //streaming Cube - if (cubeRequest.getStreamingCube().equals("true")) { - StreamingConfig streamingConfig = deserializeStreamingDesc(cubeRequest); - KafkaConfig kafkaConfig = deserializeKafkaDesc(cubeRequest); - if (streamingConfig == null) { - cubeRequest.setMessage("No StreamingConfig info to update."); - return cubeRequest; - } - if (kafkaConfig == null) { - cubeRequest.setMessage("No KafkaConfig info to update."); - return cubeRequest; - } - if (StringUtils.isEmpty(streamingConfig.getName())) { - logger.info("StreamingConfig Name should not be empty."); - throw new BadRequestException("StremingConfig name should not be empty."); - } + boolean updateStreamingConfigSuccess = false,updateKafkaConfigSuccess = false; - if (StringUtils.isEmpty(kafkaConfig.getName())) { - logger.info("KafkaConfig Name should not be empty."); - throw new BadRequestException("KafkaConfig name should not be empty."); - } + boolean isStreamingCube = cubeRequest.getStreamingCube() != null && cubeRequest.getStreamingCube().equals("true"); + //oldConfig is for recover use + StreamingConfig streamingConfig = null,oldStreamingConfig =null; + KafkaConfig kafkaConfig = null,oldKafkaConfig = null; + if(isStreamingCube){ + streamingConfig = deserializeStreamingDesc(cubeRequest); + kafkaConfig = deserializeKafkaDesc(cubeRequest); try { - streamingService.updateStreamingConfig(streamingConfig); - + oldKafkaConfig = kafkaConfigService.getKafkaConfig(kafkaConfig.getName()); } catch (IOException e) { - logger.error("Failed to update StreamingConfig:" + e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to update StreamingConfig: " + e.getLocalizedMessage()); + e.printStackTrace(); } - try { - kafkaConfigService.updateKafkaConfig(kafkaConfig); - } catch (IOException e) { - logger.error("Failed to update KafkaConfig:" + e.getLocalizedMessage(), e); - throw new InternalErrorException("Failed to update KafkaConfig: " + e.getLocalizedMessage()); + oldStreamingConfig = streamingService.getStreamingManager().getStreamingConfig(streamingConfig.getName()); + } + try { + //streaming Cube + if (isStreamingCube) { + if (streamingConfig == null) { + cubeRequest.setMessage("No StreamingConfig info to update."); + return cubeRequest; + } + if (kafkaConfig == null) { + cubeRequest.setMessage("No KafkaConfig info to update."); + return cubeRequest; + } + + if(oldStreamingConfig == null){ + streamingConfig.setUuid(UUID.randomUUID().toString()); + try { + streamingService.createStreamingConfig(streamingConfig); + updateStreamingConfigSuccess = true; + } catch (IOException e) { + logger.error("Failed to add StreamingConfig:" + e.getLocalizedMessage(), e); + throw new InternalErrorException("Failed to add StreamingConfig: " + e.getLocalizedMessage()); + } + }else{ + try { + streamingConfig = streamingService.updateStreamingConfig(streamingConfig); + updateStreamingConfigSuccess = true; + + } catch (IOException e) { + logger.error("Failed to update StreamingConfig:" + e.getLocalizedMessage(), e); + throw new InternalErrorException("Failed to update StreamingConfig: " + e.getLocalizedMessage()); + } + } + if(oldKafkaConfig == null){ + kafkaConfig.setUuid(UUID.randomUUID().toString()); + try { + kafkaConfigService.createKafkaConfig(kafkaConfig); + updateKafkaConfigSuccess = true; + } catch (IOException e) { + logger.error("Failed to add KafkaConfig:" + e.getLocalizedMessage(), e); + throw new InternalErrorException("Failed to add KafkaConfig: " + e.getLocalizedMessage()); + } + + }else{ + try { + kafkaConfig = kafkaConfigService.updateKafkaConfig(kafkaConfig); + updateKafkaConfigSuccess = true; + } catch (IOException e) { + logger.error("Failed to update KafkaConfig:" + e.getLocalizedMessage(), e); + throw new InternalErrorException("Failed to update KafkaConfig: " + e.getLocalizedMessage()); + } + } + } + }finally { + if (isStreamingCube) { + //recover cube desc + if(updateStreamingConfigSuccess == false || updateKafkaConfigSuccess ==false){ + oldCubeDesc.setLastModified(desc.getLastModified()); + CubeInstance cube = cubeService.getCubeManager().getCube(cubeRequest.getCubeName()); + try { + desc = cubeService.updateCubeAndDesc(cube, oldCubeDesc, projectName); + } catch (Exception e) { + logger.error("Failed to recover CubeDesc:" + e.getLocalizedMessage(), e); + throw new InternalErrorException("Failed to recover CubeDesc: " + e.getLocalizedMessage()); + } + + if(updateStreamingConfigSuccess == true){ + + if(oldStreamingConfig!=null){ + + oldStreamingConfig.setLastModified(streamingConfig.getLastModified()); + try { + streamingService.updateStreamingConfig(oldStreamingConfig); + } catch (IOException e) { + logger.error("Failed to recover StreamingConfig:" + e.getLocalizedMessage(), e); + throw new InternalErrorException("Failed to recover StreamingConfig: " + e.getLocalizedMessage()); + } + } else{ + try { + streamingService.dropStreamingConfig(streamingConfig); + } catch (IOException e) { + logger.error("Failed to remove added StreamingConfig:" + e.getLocalizedMessage(), e); + throw new InternalErrorException("Failed to remove added StreamingConfig: " + e.getLocalizedMessage()); + } + } + } + + if(updateKafkaConfigSuccess == true){ + if(oldKafkaConfig!=null) { + oldKafkaConfig.setLastModified(kafkaConfig.getLastModified()); + try { + kafkaConfigService.updateKafkaConfig(oldKafkaConfig); + } catch (IOException e) { + logger.error("Failed to recover KafkaConfig:" + e.getLocalizedMessage(), e); + throw new InternalErrorException("Failed to recover KafkaConfig: " + e.getLocalizedMessage()); + } + }else{ + try { + kafkaConfigService.dropKafkaConfig(kafkaConfig); + } catch (IOException e) { + logger.error("Failed to remove added KafkaConfig:" + e.getLocalizedMessage(), e); + throw new InternalErrorException("Failed to remove added KafkaConfig: " + e.getLocalizedMessage()); + } + } + } - } + } + } + + } String descData = JsonUtil.writeValueAsIndentString(desc); cubeRequest.setCubeDescData(descData); @@ -507,7 +608,7 @@ public class CubeController extends BasicController { * @return true * @throws IOException */ - @RequestMapping(value = "/{cubeName}/hbase", method = { RequestMethod.GET }) + @RequestMapping(value = "/{cubeName}/hbase", method = {RequestMethod.GET}) @ResponseBody public List getHBaseInfo(@PathVariable String cubeName) { List hbase = new ArrayList(); @@ -544,7 +645,7 @@ public class CubeController extends BasicController { return hbase; } - @RequestMapping(value = "/{cubeName}/segments", method = { RequestMethod.POST }) + @RequestMapping(value = "/{cubeName}/segments", method = {RequestMethod.POST}) @ResponseBody public CubeSegmentRequest appendSegment(@PathVariable String cubeName, @RequestBody CubeSegmentRequest cubeSegmentRequest) { CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); @@ -650,7 +751,7 @@ public class CubeController extends BasicController { */ private String omitMessage(List errors) { StringBuffer buffer = new StringBuffer(); - for (Iterator iterator = errors.iterator(); iterator.hasNext();) { + for (Iterator iterator = errors.iterator(); iterator.hasNext(); ) { String string = (String) iterator.next(); buffer.append(string); buffer.append("\n"); diff --git a/webapp/app/js/controllers/cubeEdit.js b/webapp/app/js/controllers/cubeEdit.js index 30f2ab4..c7b1fed 100755 --- a/webapp/app/js/controllers/cubeEdit.js +++ b/webapp/app/js/controllers/cubeEdit.js @@ -123,6 +123,8 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio //fetch cube info and model info in edit model // ~ init if ($scope.isEdit = !!$routeParams.cubeName) { + $scope.streamingMeta = StreamingModel.createStreamingConfig(); + $scope.kafkaMeta = StreamingModel.createKafkaConfig(); CubeDescService.query({cube_name: $routeParams.cubeName}, function (detail) { if (detail.length > 0) { $scope.cubeMetaFrame = detail[0]; -- 2.4.9 (Apple Git-60)