From 1a24246d2031ab9962974bb6e8a7290dcdd85538 Mon Sep 17 00:00:00 2001 From: kangkaisen Date: Wed, 6 Sep 2017 16:11:48 +0800 Subject: [PATCH] DeployCoprocessorCLI Log failed tables info --- .../storage/hbase/util/DeployCoprocessorCLI.java | 27 +++++++++++++++------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index c437e6613..56d549762 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinVersion; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -122,7 +123,7 @@ public class DeployCoprocessorCLI { Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths); logger.info("New coprocessor jar: " + hdfsCoprocessorJar); - List processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames); + Pair, List> results = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames); // Don't remove old jars, missing coprocessor jar will fail hbase // removeOldJars(oldJarPaths, fileSystem); @@ -130,8 +131,10 @@ public class DeployCoprocessorCLI { hbaseAdmin.close(); logger.info("Processed time: " + (System.currentTimeMillis() - start)); - logger.info("Processed tables count: " + processedTables.size()); - logger.info("Processed tables: " + processedTables); + logger.info("Processed tables count: " + results.getFirst().size()); + logger.info("Processed tables: " + results.getFirst()); + logger.error("Failed tables count: " + results.getSecond().size()); + logger.error("Failed tables : " + results.getSecond()); logger.info("Active coprocessor jar: " + hdfsCoprocessorJar); } @@ -157,7 +160,7 @@ public class DeployCoprocessorCLI { ProjectInstance projectInstance = projectManager.getProject(p); List cubeList = projectInstance.getRealizationEntries(RealizationType.CUBE); - for (RealizationEntry cube: cubeList) { + for (RealizationEntry cube : cubeList) { CubeInstance cubeInstance = cubeManager.getCube(cube.getRealization()); for (CubeSegment segment : cubeInstance.getSegments()) { String tableName = segment.getStorageLocationIdentifier(); @@ -279,13 +282,15 @@ public class DeployCoprocessorCLI { return true; } - private static List resetCoprocessorOnHTables(final Admin hbaseAdmin, final Path hdfsCoprocessorJar, List tableNames) throws IOException { + private static Pair, List> resetCoprocessorOnHTables(final Admin hbaseAdmin, final Path hdfsCoprocessorJar, List tableNames) throws IOException { List processedTables = Collections.synchronizedList(new ArrayList()); + List failedTables = Collections.synchronizedList(new ArrayList()); + ExecutorService coprocessorPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); CountDownLatch countDownLatch = new CountDownLatch(tableNames.size()); for (final String tableName : tableNames) { - coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, tableName, processedTables)); + coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, tableName, processedTables, failedTables)); } try { @@ -295,7 +300,7 @@ public class DeployCoprocessorCLI { } coprocessorPool.shutdown(); - return processedTables; + return new Pair<>(processedTables, failedTables); } private static class ResetCoprocessorWorker implements Runnable { @@ -304,13 +309,16 @@ public class DeployCoprocessorCLI { private final Path hdfsCoprocessorJar; private final String tableName; private final List processedTables; + private final List failedTables; + + public ResetCoprocessorWorker(CountDownLatch countDownLatch, Admin hbaseAdmin, Path hdfsCoprocessorJar, String tableName, List processedTables, List failedTables) { - public ResetCoprocessorWorker(CountDownLatch countDownLatch, Admin hbaseAdmin, Path hdfsCoprocessorJar, String tableName, List processedTables) { this.countDownLatch = countDownLatch; this.hbaseAdmin = hbaseAdmin; this.hdfsCoprocessorJar = hdfsCoprocessorJar; this.tableName = tableName; this.processedTables = processedTables; + this.failedTables = failedTables; } @Override @@ -319,8 +327,11 @@ public class DeployCoprocessorCLI { boolean isProcessed = resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar); if (isProcessed) { processedTables.add(tableName); + } else { + failedTables.add(tableName); } } catch (Exception ex) { + failedTables.add(tableName); logger.error("Error processing " + tableName, ex); } finally { countDownLatch.countDown(); -- 2.13.6 (Apple Git-96)