From 4c7708af15ec16c80f7346dda787f40428dc1946 Mon Sep 17 00:00:00 2001 From: kangkaisen Date: Wed, 12 Oct 2016 20:12:15 +0800 Subject: [PATCH] KYLIN-2089 Make update HBase coprocessor concurrent --- .../storage/hbase/util/DeployCoprocessorCLI.java | 53 ++++++++++++++++++---- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index a1193e7..f38f8d5 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -23,10 +23,14 @@ import java.io.FileInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.regex.Matcher; import org.apache.commons.io.IOUtils; @@ -107,14 +111,15 @@ public class DeployCoprocessorCLI { Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths); logger.info("New coprocessor jar: " + hdfsCoprocessorJar); - List processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames); + resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames); // Don't remove old jars, missing coprocessor jar will fail hbase // removeOldJars(oldJarPaths, fileSystem); hbaseAdmin.close(); - logger.info("Processed " + processedTables); + logger.info("Processed tables count: " + processedTables.size()); + logger.info("Processed tables: " + processedTables); logger.info("Active coprocessor jar: " + hdfsCoprocessorJar); } @@ -220,18 +225,50 @@ public class DeployCoprocessorCLI { hbaseAdmin.enableTable(tableName); } - private static List resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List tableNames) throws IOException { - List processed = new ArrayList(); + private static List processedTables = Collections.synchronizedList(new ArrayList()); - for (String tableName : tableNames) { + private static void resetCoprocessorOnHTables(final HBaseAdmin hbaseAdmin, final Path hdfsCoprocessorJar, List tableNames) throws IOException { + ExecutorService coprocessorPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); + CountDownLatch countDownLatch = new CountDownLatch(tableNames.size()); + + for (final String tableName : tableNames) { + coprocessorPool.execute(new ResetCoprocessorWorker(countDownLatch, hbaseAdmin, hdfsCoprocessorJar, tableName)); + } + + try { + countDownLatch.await(); + } catch (InterruptedException e) { + logger.error("reset coprocessor failed: ", e); + } + + coprocessorPool.shutdown(); + } + + private static class ResetCoprocessorWorker implements Runnable { + private final CountDownLatch countDownLatch; + private final HBaseAdmin hbaseAdmin; + private final Path hdfsCoprocessorJar; + private final String tableName; + + public ResetCoprocessorWorker(CountDownLatch countDownLatch, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, String tableName) { + this.countDownLatch = countDownLatch; + this.hbaseAdmin = hbaseAdmin; + this.hdfsCoprocessorJar = hdfsCoprocessorJar; + this.tableName = tableName; + } + + @Override + public void run() { try { resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar); - processed.add(tableName); - } catch (IOException ex) { + processedTables.add(tableName); + } catch (Exception ex) { logger.error("Error processing " + tableName, ex); + } finally { + countDownLatch.countDown(); } + } - return processed; } public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException { -- 2.8.4 (Apple Git-73)