From e972642c26d3e192b31299a3c3da1aaeb0a2927f Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Tue, 2 Dec 2014 18:53:35 +0800 Subject: [PATCH 1/3] use all-in-one htable for meta data --- .settings/org.eclipse.core.resources.prefs | 2 - .../common/persistence/HBaseResourceStore.java | 74 +++++++++------------- .../kylinolap/cube/project/ProjectManagerTest.java | 29 +++++---- 3 files changed, 46 insertions(+), 59 deletions(-) delete mode 100644 .settings/org.eclipse.core.resources.prefs diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs deleted file mode 100644 index 4824b80..0000000 --- a/.settings/org.eclipse.core.resources.prefs +++ /dev/null @@ -1,2 +0,0 @@ -eclipse.preferences.version=1 -encoding/=UTF-8 diff --git a/common/src/main/java/com/kylinolap/common/persistence/HBaseResourceStore.java b/common/src/main/java/com/kylinolap/common/persistence/HBaseResourceStore.java index 3e101af..341ee27 100644 --- a/common/src/main/java/com/kylinolap/common/persistence/HBaseResourceStore.java +++ b/common/src/main/java/com/kylinolap/common/persistence/HBaseResourceStore.java @@ -70,7 +70,7 @@ final String tableNameBase; final String hbaseUrl; - final Map tableNameMap; // path prefix ==> HBase table name +// final Map tableNameMap; // path prefix ==> HBase table name private HConnection getConnection() throws IOException { return HBaseConnection.get(hbaseUrl); @@ -85,13 +85,15 @@ public HBaseResourceStore(KylinConfig kylinConfig) throws IOException { tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut); hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1); - tableNameMap = new LinkedHashMap(); - for (Entry entry : TABLE_SUFFIX_MAP.entrySet()) { - String pathPrefix = entry.getKey(); - String tableName = tableNameBase + entry.getValue(); - tableNameMap.put(pathPrefix, tableName); - createHTableIfNeeded(tableName); - } + createHTableIfNeeded(getAllInOneTableName()); + +// tableNameMap = new LinkedHashMap(); +// for (Entry entry : TABLE_SUFFIX_MAP.entrySet()) { +// String pathPrefix = entry.getKey(); +// String tableName = tableNameBase + entry.getValue(); +// tableNameMap.put(pathPrefix, tableName); +// createHTableIfNeeded(tableName); +// } } @@ -99,13 +101,8 @@ private void createHTableIfNeeded(String tableName) throws IOException { HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY); } - private String getTableName(String path) { - for (Entry entry : tableNameMap.entrySet()) { - String pathPrefix = entry.getKey(); - if (path.startsWith(pathPrefix)) - return entry.getValue(); - } - throw new IllegalStateException("failed to find HBase table for path -- " + path); + private String getAllInOneTableName() { + return tableNameBase; } @Override @@ -118,30 +115,21 @@ private String getTableName(String path) { ArrayList result = new ArrayList(); - for (Entry entry : tableNameMap.entrySet()) { - String pathPrefix = entry.getKey(); - String tableName = entry.getValue(); - - if ((pathPrefix.startsWith(lookForPrefix) || lookForPrefix.startsWith(pathPrefix)) == false) - continue; - - HTableInterface table = getConnection().getTable(tableName); - - Scan scan = new Scan(startRow, endRow); - scan.setFilter(new KeyOnlyFilter()); - try { - ResultScanner scanner = table.getScanner(scan); - for (Result r : scanner) { - String path = Bytes.toString(r.getRow()); - assert path.startsWith(lookForPrefix); - int cut = path.indexOf('/', lookForPrefix.length()); - String child = cut < 0 ? path : path.substring(0, cut); - if (result.contains(child) == false) - result.add(child); - } - } finally { - IOUtils.closeQuietly(table); + HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Scan scan = new Scan(startRow, endRow); + scan.setFilter(new KeyOnlyFilter()); + try { + ResultScanner scanner = table.getScanner(scan); + for (Result r : scanner) { + String path = Bytes.toString(r.getRow()); + assert path.startsWith(lookForPrefix); + int cut = path.indexOf('/', lookForPrefix.length()); + String child = cut < 0 ? path : path.substring(0, cut); + if (result.contains(child) == false) + result.add(child); } + } finally { + IOUtils.closeQuietly(table); } // return null to indicate not a folder return result.isEmpty() ? null : result; @@ -186,7 +174,7 @@ protected void putResourceImpl(String resPath, InputStream content, long ts) thr IOUtils.copy(content, bout); bout.close(); - HTableInterface table = getConnection().getTable(getTableName(resPath)); + HTableInterface table = getConnection().getTable(getAllInOneTableName()); try { byte[] row = Bytes.toBytes(resPath); Put put = buildPut(resPath, ts, row, bout.toByteArray(), table); @@ -200,7 +188,7 @@ protected void putResourceImpl(String resPath, InputStream content, long ts) thr @Override protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { - HTableInterface table = getConnection().getTable(getTableName(resPath)); + HTableInterface table = getConnection().getTable(getAllInOneTableName()); try { byte[] row = Bytes.toBytes(resPath); byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); @@ -220,7 +208,7 @@ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldT @Override protected void deleteResourceImpl(String resPath) throws IOException { - HTableInterface table = getConnection().getTable(getTableName(resPath)); + HTableInterface table = getConnection().getTable(getAllInOneTableName()); try { Delete del = new Delete(Bytes.toBytes(resPath)); table.delete(del); @@ -232,7 +220,7 @@ protected void deleteResourceImpl(String resPath) throws IOException { @Override protected String getReadableResourcePathImpl(String resPath) { - return tableNameBase + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl(); + return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl(); } private Result getByScan(String path, byte[] family, byte[] column) throws IOException { @@ -246,7 +234,7 @@ private Result getByScan(String path, byte[] family, byte[] column) throws IOExc scan.addColumn(family, column); } - HTableInterface table = getConnection().getTable(getTableName(path)); + HTableInterface table = getConnection().getTable(getAllInOneTableName()); try { ResultScanner scanner = table.getScanner(scan); Result result = null; diff --git a/cube/src/test/java/com/kylinolap/cube/project/ProjectManagerTest.java b/cube/src/test/java/com/kylinolap/cube/project/ProjectManagerTest.java index de54438..6f9067a 100644 --- a/cube/src/test/java/com/kylinolap/cube/project/ProjectManagerTest.java +++ b/cube/src/test/java/com/kylinolap/cube/project/ProjectManagerTest.java @@ -64,9 +64,10 @@ public void testDropNonemptyProject2() throws IOException { @Test public void testNewProject() throws Exception { - int originalProjectCount = ProjectManager.getInstance(this.getTestConfig()).listAllProjects().size(); + ProjectManager projectmanager = ProjectManager.getInstance(this.getTestConfig()); + int originalProjectCount = projectmanager.listAllProjects().size(); int originalCubeCount = CubeManager.getInstance(this.getTestConfig()).listAllCubes().size(); - int originalCubeCountInDefault = ProjectManager.getInstance(this.getTestConfig()).listAllCubes("default").size(); + int originalCubeCountInDefault = projectmanager.listAllCubes("default").size(); ResourceStore store = getStore(); // clean legacy in case last run failed @@ -80,31 +81,31 @@ public void testNewProject() throws Exception { System.out.println(JsonUtil.writeValueAsIndentString(createdCube)); - assertTrue(ProjectManager.getInstance(this.getTestConfig()).listAllProjects().size() == originalProjectCount + 1); - assertTrue(ProjectManager.getInstance(this.getTestConfig()).listAllCubes("ALIEN").get(0).getName().equalsIgnoreCase("CUBE_IN_ALIEN_PROJECT")); + assertTrue(projectmanager.listAllProjects().size() == originalProjectCount + 1); + assertTrue(projectmanager.listAllCubes("ALIEN").get(0).getName().equalsIgnoreCase("CUBE_IN_ALIEN_PROJECT")); assertTrue(CubeManager.getInstance(this.getTestConfig()).listAllCubes().size() == originalCubeCount + 1); - ProjectManager.getInstance(this.getTestConfig()).updateCubeToProject("cube_in_alien_project", "default", null); - assertTrue(ProjectManager.getInstance(this.getTestConfig()).listAllCubes("ALIEN").size() == 0); - assertTrue(ProjectManager.getInstance(this.getTestConfig()).listAllCubes("default").size() == originalCubeCountInDefault + 1); + projectmanager.updateCubeToProject("cube_in_alien_project", "default", null); + assertTrue(projectmanager.listAllCubes("ALIEN").size() == 0); + assertTrue(projectmanager.listAllCubes("default").size() == originalCubeCountInDefault + 1); assertTrue(ProjectManager.getInstance(getTestConfig()).listAllCubes("default").contains(createdCube)); - ProjectManager.getInstance(this.getTestConfig()).updateCubeToProject("cube_in_alien_project", "alien", null); - assertTrue(ProjectManager.getInstance(this.getTestConfig()).listAllCubes("ALIEN").size() == 1); - assertTrue(ProjectManager.getInstance(this.getTestConfig()).listAllCubes("default").size() == originalCubeCountInDefault); + projectmanager.updateCubeToProject("cube_in_alien_project", "alien", null); + assertTrue(projectmanager.listAllCubes("ALIEN").size() == 1); + assertTrue(projectmanager.listAllCubes("default").size() == originalCubeCountInDefault); assertTrue(ProjectManager.getInstance(getTestConfig()).listAllCubes("alien").contains(createdCube)); - assertTrue(ProjectManager.getInstance(this.getTestConfig()).isCubeInProject("alien", createdCube)); + assertTrue(projectmanager.isCubeInProject("alien", createdCube)); CubeInstance droppedCube = CubeManager.getInstance(this.getTestConfig()).dropCube("cube_in_alien_project", true); assertTrue(createdCube == droppedCube); assertNull(CubeManager.getInstance(this.getTestConfig()).getCube("cube_in_alien_project")); - assertTrue(ProjectManager.getInstance(this.getTestConfig()).listAllProjects().size() == originalProjectCount + 1); + assertTrue(projectmanager.listAllProjects().size() == originalProjectCount + 1); assertTrue(CubeManager.getInstance(this.getTestConfig()).listAllCubes().size() == originalCubeCount); - ProjectManager.getInstance(this.getTestConfig()).dropProject("alien"); - assertTrue(ProjectManager.getInstance(this.getTestConfig()).listAllProjects().size() == originalProjectCount); + projectmanager.dropProject("alien"); + assertTrue(projectmanager.listAllProjects().size() == originalProjectCount); } @Test From 8177cde55f7f690cd8a2fae8a7d0192ea1dbc8a4 Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Tue, 2 Dec 2014 20:05:25 +0800 Subject: [PATCH 2/3] Refactor ShellCmd to reuse CliCommandExecutor --- .../main/java/com/kylinolap/job/cmd/ShellCmd.java | 127 ++------------------- 1 file changed, 9 insertions(+), 118 deletions(-) diff --git a/job/src/main/java/com/kylinolap/job/cmd/ShellCmd.java b/job/src/main/java/com/kylinolap/job/cmd/ShellCmd.java index d8d99c5..7ca8d61 100644 --- a/job/src/main/java/com/kylinolap/job/cmd/ShellCmd.java +++ b/job/src/main/java/com/kylinolap/job/cmd/ShellCmd.java @@ -28,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; +import com.kylinolap.common.util.CliCommandExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,26 +50,16 @@ private final String executeCommand; private final ICommandOutput output; - private final String remoteHost; - private final String remoteUser; - private final String remotePassword; - private final String identityPath; private final boolean isAsync; + private final CliCommandExecutor cliCommandExecutor; private FutureTask future; protected ShellCmd(String executeCmd, ICommandOutput out, String host, String user, String password, boolean async) { this.executeCommand = executeCmd; this.output = out; - this.remoteHost = host; - this.remoteUser = user; - if (new File(password).exists()) { - this.identityPath = new File(password).getAbsolutePath(); - this.remotePassword = null; - } else { - this.remotePassword = password; - this.identityPath = null; - } + cliCommandExecutor = new CliCommandExecutor(); + cliCommandExecutor.setRunAtRemote(host, user, password); this.isAsync = async; } @@ -81,7 +72,7 @@ public ICommandOutput execute() throws JobException { final ExecutorService executor = Executors.newSingleThreadExecutor(); future = new FutureTask(new Callable() { - public Integer call() throws JobException { + public Integer call() throws JobException, IOException { executor.shutdown(); return executeCommand(executeCommand); } @@ -112,111 +103,11 @@ public Integer call() throws JobException { return output; } - protected int executeCommand(String command) throws JobException { + protected int executeCommand(String command) throws JobException, IOException { output.reset(); - if (remoteHost != null) { - log.debug("Executing remote cmd: " + command); - return remoteExec(command); - } else { - log.debug("Executing local cmd: " + command); - return localExec(command); - } - } - - private int localExec(String command) throws JobException { - output.setStatus(JobStepStatusEnum.RUNNING); - String[] cmd = new String[3]; - cmd[0] = "/bin/bash"; - cmd[1] = "-c"; - cmd[2] = command; - - BufferedReader reader = null; - int exitCode = -1; - try { - ProcessBuilder builder = new ProcessBuilder(cmd); - builder.redirectErrorStream(true); - Process proc = builder.start(); - - reader = new BufferedReader(new InputStreamReader(proc.getInputStream())); - String line = null; - while ((line = reader.readLine()) != null) { - output.appendOutput(line); - } - - exitCode = proc.waitFor(); - } catch (Exception e) { - throw new JobException(e); - } finally { - if (reader != null) { - try { - reader.close(); - } catch (IOException e) { - throw new JobException(e); - } - } - } - return exitCode; - } - - private int remoteExec(String command) throws JobException { - output.setStatus(JobStepStatusEnum.RUNNING); - Session session = null; - Channel channel = null; - int exitCode = -1; - try { - JSch jsch = new JSch(); - if (identityPath != null) { - jsch.addIdentity(identityPath); - } - - session = jsch.getSession(remoteUser, remoteHost, 22); - if (remotePassword != null) { - session.setPassword(remotePassword); - } - session.setConfig("StrictHostKeyChecking", "no"); - session.connect(); - - channel = session.openChannel("exec"); - ((ChannelExec) channel).setCommand(command); - channel.setInputStream(null); - PipedInputStream in = new PipedInputStream(64 * 1024); - PipedOutputStream out = new PipedOutputStream(in); - channel.setOutputStream(out); - ((ChannelExec) channel).setErrStream(out); // redirect error to out - channel.connect(); - - byte[] tmp = new byte[1024]; - while (true) { - while (in.available() > 0) { - int i = in.read(tmp, 0, 1024); - if (i < 0) - break; - output.appendOutput(new String(tmp, 0, i)); - } - if (channel.isClosed()) { - if (in.available() > 0) { - continue; - } - exitCode = channel.getExitStatus(); - break; - } - try { - Thread.sleep(1000); - } catch (Exception ee) { - throw ee; - } - } - } catch (Exception e) { - throw new JobException(e); - } finally { - if (channel != null) { - channel.disconnect(); - } - if (session != null) { - session.disconnect(); - } - } - return exitCode; + String result = cliCommandExecutor.execute(command); + //if cliCommandExecutor doesn't throw IOException, it means command is executed correctly + return 0; } @Override From a5e243bb2946818320f5c13a4314fe22455986e8 Mon Sep 17 00:00:00 2001 From: "qianhao.zhou" Date: Tue, 2 Dec 2014 20:07:38 +0800 Subject: [PATCH 3/3] format code --- .../main/java/com/kylinolap/job/cmd/ShellCmd.java | 25 +++++----------------- .../java/com/kylinolap/job/cmd/ShellHadoopCmd.java | 9 ++++---- 2 files changed, 9 insertions(+), 25 deletions(-) diff --git a/job/src/main/java/com/kylinolap/job/cmd/ShellCmd.java b/job/src/main/java/com/kylinolap/job/cmd/ShellCmd.java index 7ca8d61..811c06f 100644 --- a/job/src/main/java/com/kylinolap/job/cmd/ShellCmd.java +++ b/job/src/main/java/com/kylinolap/job/cmd/ShellCmd.java @@ -16,33 +16,18 @@ package com.kylinolap.job.cmd; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; - import com.kylinolap.common.util.CliCommandExecutor; +import com.kylinolap.job.constant.JobStepStatusEnum; +import com.kylinolap.job.exception.JobException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.jcraft.jsch.Channel; -import com.jcraft.jsch.ChannelExec; -import com.jcraft.jsch.JSch; -import com.jcraft.jsch.Session; -import com.kylinolap.job.constant.JobStepStatusEnum; -import com.kylinolap.job.exception.JobException; +import java.io.IOException; +import java.util.concurrent.*; /** * @author xjiang * - * FIXME should reuse common.util.SSHClient */ public class ShellCmd implements IJobCommand { @@ -105,7 +90,7 @@ public Integer call() throws JobException, IOException { protected int executeCommand(String command) throws JobException, IOException { output.reset(); - String result = cliCommandExecutor.execute(command); + cliCommandExecutor.execute(command); //if cliCommandExecutor doesn't throw IOException, it means command is executed correctly return 0; } diff --git a/job/src/main/java/com/kylinolap/job/cmd/ShellHadoopCmd.java b/job/src/main/java/com/kylinolap/job/cmd/ShellHadoopCmd.java index f0b7a4e..75f34b6 100644 --- a/job/src/main/java/com/kylinolap/job/cmd/ShellHadoopCmd.java +++ b/job/src/main/java/com/kylinolap/job/cmd/ShellHadoopCmd.java @@ -16,15 +16,14 @@ package com.kylinolap.job.cmd; -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.kylinolap.job.JobDAO; import com.kylinolap.job.JobInstance; import com.kylinolap.job.engine.JobEngineConfig; import com.kylinolap.job.exception.JobException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; /** * @author xjiang