From a8d39d2400e8fbcc3feed0715e034003bc417313 Mon Sep 17 00:00:00 2001 From: wangxianbin1987 Date: Fri, 11 Dec 2015 15:25:07 +0800 Subject: [PATCH] KYLIN-1079 add time filter for job history metastore --- .../common/persistence/FileResourceStore.java | 6 ++ .../kylin/common/persistence/ResourceStore.java | 22 +++++++ .../kylin/job/constant/JobTimeFilterEnum.java | 43 +++++++++++++ .../org/apache/kylin/job/dao/ExecutableDao.java | 32 ++++++++++ .../kylin/job/manager/ExecutableManager.java | 32 ++++++++++ .../kylin/rest/controller/JobController.java | 5 +- .../apache/kylin/rest/request/JobListRequest.java | 8 +++ .../apache/kylin/rest/service/BasicService.java | 37 +++++++++++ .../org/apache/kylin/rest/service/JobService.java | 73 ++++++++++++++++++++-- .../kylin/storage/hbase/HBaseResourceStore.java | 50 +++++++++++++++ webapp/app/js/controllers/job.js | 4 +- webapp/app/js/model/jobConfig.js | 7 +++ webapp/app/partials/jobs/jobs.html | 6 ++ 13 files changed, 318 insertions(+), 7 deletions(-) create mode 100644 core-job/src/main/java/org/apache/kylin/job/constant/JobTimeFilterEnum.java diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java index 89e3a1d..49ff441 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/FileResourceStore.java @@ -95,6 +95,12 @@ public class FileResourceStore extends ResourceStore { } @Override + protected List getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException { + //just ignore time filter + return getAllResources(rangeStart, rangeEnd); + } + + @Override protected RawResource getResourceImpl(String resPath) throws IOException { File f = file(resPath); if (f.exists() && f.isFile()) { diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index 848d412..b2a4ce3 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -181,8 +181,30 @@ abstract public class ResourceStore { } } + final public List getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis, Class clazz, Serializer serializer) throws IOException { + final List allResources = getAllResources(rangeStart, rangeEnd, timeStartInMillis, timeEndInMillis); + if (allResources.isEmpty()) { + return Collections.emptyList(); + } + List result = Lists.newArrayList(); + try { + for (RawResource rawResource : allResources) { + final T element = serializer.deserialize(new DataInputStream(rawResource.inputStream)); + element.setLastModified(rawResource.timestamp); + result.add(element); + } + return result; + } finally { + for (RawResource rawResource : allResources) { + IOUtils.closeQuietly(rawResource.inputStream); + } + } + } + abstract protected List getAllResources(String rangeStart, String rangeEnd) throws IOException; + abstract protected List getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException; + /** returns null if not exists */ abstract protected RawResource getResourceImpl(String resPath) throws IOException; diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/JobTimeFilterEnum.java b/core-job/src/main/java/org/apache/kylin/job/constant/JobTimeFilterEnum.java new file mode 100644 index 0000000..c4787f7 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/constant/JobTimeFilterEnum.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.constant; + +public enum JobTimeFilterEnum { + LAST_ONE_DAY(0), LAST_ONE_WEEK(1), LAST_ONE_MONTH(2), LAST_ONE_YEAR(3), ALL(4); + + private final int code; + + private JobTimeFilterEnum(int code) { + this.code = code; + } + + public static JobTimeFilterEnum getByCode(int code) { + for (JobTimeFilterEnum timeFilter : values()) { + if (timeFilter.getCode() == code) { + return timeFilter; + } + } + + return null; + } + + public int getCode() { + return code; + } +} diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java index 18e36b4..4b1336f 100644 --- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java +++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java @@ -108,6 +108,22 @@ public class ExecutableDao { } } + public List getJobOutputs(long timeStartInMillis, long timeEndInMillis) throws PersistentException { + try { + ArrayList resources = store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT); + if (resources == null || resources.isEmpty()) { + return Collections.emptyList(); + } + Collections.sort(resources); + String rangeStart = resources.get(0); + String rangeEnd = resources.get(resources.size() - 1); + return store.getAllResources(rangeStart, rangeEnd, timeStartInMillis, timeEndInMillis, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER); + } catch (IOException e) { + logger.error("error get all Jobs:", e); + throw new PersistentException(e); + } + } + public List getJobs() throws PersistentException { try { final List jobIds = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT); @@ -124,6 +140,22 @@ public class ExecutableDao { } } + public List getJobs(long timeStartInMillis, long timeEndInMillis) throws PersistentException { + try { + final List jobIds = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT); + if (jobIds == null || jobIds.isEmpty()) { + return Collections.emptyList(); + } + Collections.sort(jobIds); + String rangeStart = jobIds.get(0); + String rangeEnd = jobIds.get(jobIds.size() - 1); + return store.getAllResources(rangeStart, rangeEnd, timeStartInMillis, timeEndInMillis, ExecutablePO.class, JOB_SERIALIZER); + } catch (IOException e) { + logger.error("error get all Jobs:", e); + throw new PersistentException(e); + } + } + public List getJobIds() throws PersistentException { try { ArrayList resources = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT); diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java index ff92a50..7d513a1 100644 --- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java @@ -148,6 +148,20 @@ public class ExecutableManager { } } + public Map getAllOutputs(long timeStartInMillis, long timeEndInMillis) { + try { + final List jobOutputs = executableDao.getJobOutputs(timeStartInMillis, timeEndInMillis); + HashMap result = Maps.newHashMap(); + for (ExecutableOutputPO jobOutput : jobOutputs) { + result.put(jobOutput.getId(), parseOutput(jobOutput)); + } + return result; + } catch (PersistentException e) { + logger.error("fail to get all job output:", e); + throw new RuntimeException(e); + } + } + public List getAllExecutables() { try { List ret = Lists.newArrayList(); @@ -166,6 +180,24 @@ public class ExecutableManager { } } + public List getAllExecutables(long timeStartInMillis, long timeEndInMillis) { + try { + List ret = Lists.newArrayList(); + for (ExecutablePO po : executableDao.getJobs(timeStartInMillis, timeEndInMillis)) { + try { + AbstractExecutable ae = parseTo(po); + ret.add(ae); + } catch (IllegalArgumentException e) { + logger.error("error parsing one executabePO: ", e); + } + } + return ret; + } catch (PersistentException e) { + logger.error("error get All Jobs", e); + throw new RuntimeException(e); + } + } + public List getAllJobIds() { try { return executableDao.getJobIds(); diff --git a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java index f6323ed..5c835ac 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/JobController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/JobController.java @@ -29,6 +29,7 @@ import java.util.TimeZone; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.constant.JobStatusEnum; +import org.apache.kylin.job.constant.JobTimeFilterEnum; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.job.lock.JobLock; @@ -118,8 +119,10 @@ public class JobController extends BasicController implements InitializingBean { } } + JobTimeFilterEnum timeFilter = JobTimeFilterEnum.getByCode(jobRequest.getTimeFilter()); + try { - jobInstanceList = jobService.listAllJobs(jobRequest.getCubeName(), jobRequest.getProjectName(), statusList, jobRequest.getLimit(), jobRequest.getOffset()); + jobInstanceList = jobService.listAllJobs(jobRequest.getCubeName(), jobRequest.getProjectName(), statusList, jobRequest.getLimit(), jobRequest.getOffset(), timeFilter); } catch (Exception e) { logger.error(e.getLocalizedMessage(), e); throw new InternalErrorException(e); diff --git a/server/src/main/java/org/apache/kylin/rest/request/JobListRequest.java b/server/src/main/java/org/apache/kylin/rest/request/JobListRequest.java index 9ce8e3a..51160d2 100644 --- a/server/src/main/java/org/apache/kylin/rest/request/JobListRequest.java +++ b/server/src/main/java/org/apache/kylin/rest/request/JobListRequest.java @@ -31,6 +31,7 @@ public class JobListRequest { private String projectName; private Integer offset; private Integer limit; + private Integer timeFilter; public JobListRequest() { } @@ -75,4 +76,11 @@ public class JobListRequest { this.limit = limit; } + public Integer getTimeFilter() { + return timeFilter; + } + + public void setTimeFilter(Integer timeFilter) { + this.timeFilter = timeFilter; + } } diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java index 9135dfa..20fff47 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java @@ -136,6 +136,43 @@ public abstract class BasicService { return results; } + protected List listAllCubingJobs(final String cubeName, final String projectName, final Set statusList, long timeStartInMillis, long timeEndInMillis, final Map allOutputs) { + List results = Lists.newArrayList(FluentIterable.from(getExecutableManager().getAllExecutables(timeStartInMillis, timeEndInMillis)).filter(new Predicate() { + @Override + public boolean apply(AbstractExecutable executable) { + if (executable instanceof CubingJob) { + if (cubeName == null) { + return true; + } + return ((CubingJob) executable).getCubeName().equalsIgnoreCase(cubeName); + } else { + return false; + } + } + }).transform(new Function() { + @Override + public CubingJob apply(AbstractExecutable executable) { + return (CubingJob) executable; + } + }).filter(new Predicate() { + @Override + public boolean apply(CubingJob executable) { + if (null == projectName || null == getProjectManager().getProject(projectName)) { + return true; + } else { + ProjectInstance project = getProjectManager().getProject(projectName); + return project.containsRealization(RealizationType.CUBE, executable.getCubeName()); + } + } + }).filter(new Predicate() { + @Override + public boolean apply(CubingJob executable) { + return statusList.contains(allOutputs.get(executable.getId()).getState()); + } + })); + return results; + } + protected List listAllCubingJobs(final String cubeName, final String projectName, final Set statusList) { return listAllCubingJobs(cubeName, projectName, statusList, getExecutableManager().getAllOutputs()); } diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java index 1386656..a19e4f1 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -19,11 +19,7 @@ package org.apache.kylin.rest.service; import java.io.IOException; -import java.util.Collections; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.cube.CubeInstance; @@ -38,6 +34,7 @@ import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.constant.JobStepStatusEnum; +import org.apache.kylin.job.constant.JobTimeFilterEnum; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; @@ -69,6 +66,32 @@ public class JobService extends BasicService { @Autowired private AccessService accessService; + public List listAllJobs(final String cubeName, final String projectName, final List statusList, final Integer limitValue, final Integer offsetValue, final JobTimeFilterEnum timeFilter) throws IOException, JobException { + Integer limit = (null == limitValue) ? 30 : limitValue; + Integer offset = (null == offsetValue) ? 0 : offsetValue; + List jobs = listAllJobs(cubeName, projectName, statusList, timeFilter); + Collections.sort(jobs); + + if (jobs.size() <= offset) { + return Collections.emptyList(); + } + + if ((jobs.size() - offset) < limit) { + return jobs.subList(offset, jobs.size()); + } + + return jobs.subList(offset, offset + limit); + } + + public List listAllJobs(final String cubeName, final String projectName, final List statusList, final JobTimeFilterEnum timeFilter) { + Calendar calendar= Calendar.getInstance(); + calendar.setTime(new Date()); + long currentTimeMillis = calendar.getTimeInMillis(); + long timeStartInMillis = getTimeStartInMillis(calendar, timeFilter); + return listCubeJobInstance(cubeName, projectName, statusList, timeStartInMillis, currentTimeMillis); + } + + @Deprecated public List listAllJobs(final String cubeName, final String projectName, final List statusList, final Integer limitValue, final Integer offsetValue) throws IOException, JobException { Integer limit = (null == limitValue) ? 30 : limitValue; Integer offset = (null == offsetValue) ? 0 : offsetValue; @@ -90,6 +113,25 @@ public class JobService extends BasicService { return listCubeJobInstance(cubeName, projectName, statusList); } + private List listCubeJobInstance(final String cubeName, final String projectName, List statusList, final long timeStartInMillis, final long timeEndInMillis) { + Set states; + if (statusList == null || statusList.isEmpty()) { + states = EnumSet.allOf(ExecutableState.class); + } else { + states = Sets.newHashSet(); + for (JobStatusEnum status : statusList) { + states.add(parseToExecutableState(status)); + } + } + final Map allOutputs = getExecutableManager().getAllOutputs(timeStartInMillis, timeEndInMillis); + return Lists.newArrayList(FluentIterable.from(listAllCubingJobs(cubeName, projectName, states, timeStartInMillis, timeEndInMillis, allOutputs)).transform(new Function() { + @Override + public JobInstance apply(CubingJob cubingJob) { + return parseToJobInstance(cubingJob, allOutputs); + } + })); + } + private List listCubeJobInstance(final String cubeName, final String projectName, List statusList) { Set states; if (statusList == null || statusList.isEmpty()) { @@ -109,6 +151,27 @@ public class JobService extends BasicService { })); } + private long getTimeStartInMillis(Calendar calendar, JobTimeFilterEnum timeFilter) { + switch (timeFilter) { + case LAST_ONE_DAY: + calendar.add(Calendar.DAY_OF_MONTH, -1); + return calendar.getTimeInMillis(); + case LAST_ONE_WEEK: + calendar.add(Calendar.WEEK_OF_MONTH, -1); + return calendar.getTimeInMillis(); + case LAST_ONE_MONTH: + calendar.add(Calendar.MONTH, -1); + return calendar.getTimeInMillis(); + case LAST_ONE_YEAR: + calendar.add(Calendar.YEAR, -1); + return calendar.getTimeInMillis(); + case ALL: + return 0; + default: + throw new RuntimeException("illegal timeFilter for job history:" + timeFilter); + } + } + private ExecutableState parseToExecutableState(JobStatusEnum status) { switch (status) { case DISCARDED: diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 6f638c4..ab94bf7 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -40,7 +40,10 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; @@ -175,6 +178,53 @@ public class HBaseResourceStore extends ResourceStore { return result; } + @Override + protected List getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis) throws IOException { + byte[] startRow = Bytes.toBytes(rangeStart); + byte[] endRow = plusZero(Bytes.toBytes(rangeEnd)); + + Scan scan = new Scan(startRow, endRow); + scan.addColumn(B_FAMILY, B_COLUMN_TS); + scan.addColumn(B_FAMILY, B_COLUMN); + scan.setFilter(generateTimeFilterList(timeStartInMillis, timeEndInMillis)); + + HTableInterface table = getConnection().getTable(getAllInOneTableName()); + List result = Lists.newArrayList(); + try { + ResultScanner scanner = table.getScanner(scan); + for (Result r : scanner) { + result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r))); + } + } catch (IOException e) { + for (RawResource rawResource : result) { + IOUtils.closeQuietly(rawResource.inputStream); + } + throw e; + } finally { + IOUtils.closeQuietly(table); + } + return result; + } + + private FilterList generateTimeFilterList(long timeStartInMillis, long timeEndInMillis) { + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); + SingleColumnValueFilter timeStartFilter = new SingleColumnValueFilter( + B_FAMILY, + B_COLUMN_TS, + CompareFilter.CompareOp.GREATER, + Bytes.toBytes(timeStartInMillis) + ); + filterList.addFilter(timeStartFilter); + SingleColumnValueFilter timeEndFilter = new SingleColumnValueFilter( + B_FAMILY, + B_COLUMN_TS, + CompareFilter.CompareOp.LESS_OR_EQUAL, + Bytes.toBytes(timeEndInMillis) + ); + filterList.addFilter(timeEndFilter); + return filterList; + } + private InputStream getInputStream(String resPath, Result r) throws IOException { if (r == null) { return null; diff --git a/webapp/app/js/controllers/job.js b/webapp/app/js/controllers/job.js index 4838529..c859c12 100644 --- a/webapp/app/js/controllers/job.js +++ b/webapp/app/js/controllers/job.js @@ -27,6 +27,7 @@ KylinApp $scope.cubeName = null; //$scope.projects = []; $scope.action = {}; + $scope.timeFilter = jobConfig.timeFilter[1]; $scope.status = []; $scope.toggleSelection = function toggleSelection(current) { @@ -61,7 +62,8 @@ KylinApp projectName: $scope.state.projectName, status: statusIds, offset: offset, - limit: limit + limit: limit, + timeFilter: $scope.timeFilter.value }; $scope.state.loading = true; diff --git a/webapp/app/js/model/jobConfig.js b/webapp/app/js/model/jobConfig.js index 2e74845..af7044c 100644 --- a/webapp/app/js/model/jobConfig.js +++ b/webapp/app/js/model/jobConfig.js @@ -25,6 +25,13 @@ KylinApp.constant('jobConfig', { {name: 'ERROR', value: 8}, {name: 'DISCARDED', value: 16} ], + timeFilter: [ + {name: 'LAST ONE DAY', value: 0}, + {name: 'LAST ONE WEEK', value: 1}, + {name: 'LAST ONE MONTH', value: 2}, + {name: 'LAST ONE YEAR', value: 3}, + {name: 'ALL', value: 4}, + ], theaditems: [ {attr: 'name', name: 'Job Name'}, {attr: 'related_cube', name: 'Cube'}, diff --git a/webapp/app/partials/jobs/jobs.html b/webapp/app/partials/jobs/jobs.html index fca5f88..124a1f6 100644 --- a/webapp/app/partials/jobs/jobs.html +++ b/webapp/app/partials/jobs/jobs.html @@ -49,6 +49,12 @@
+ +