diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 14235b5719..8425dde05a 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -457,6 +457,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Inteval for cmroot cleanup thread."), REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/hive/repl/functions/", "Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"), + REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",5, + "Number of threads that will be used to dump partition data information during repl dump."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index f9bdff8381..0c2dbef312 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -262,7 +262,7 @@ private void dumpTable(String dbName, String tblName, Path dbRoot) throws Except TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null); TableExport.Paths exportPaths = new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf); - new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).run(); + new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).write(); REPL_STATE_LOG.info( "Repl Dump: Analyzed dump for table/view: {}.{} and dumping metadata and data to path {}", dbName, tblName, exportPaths.exportRootDir.toString()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index 0932dff283..86575e076d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -74,7 +74,7 @@ public void analyzeInternal(ASTNode ast) throws SemanticException { TableExport.Paths exportPaths = new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf); TableExport.AuthEntities authEntities = - new TableExport(exportPaths, ts, replicationSpec, db, conf, LOG).run(); + new TableExport(exportPaths, ts, replicationSpec, db, conf, LOG).write(); inputs.addAll(authEntities.inputs); outputs.addAll(authEntities.outputs); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java new file mode 100644 index 0000000000..9cc5fe28c0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -0,0 +1,99 @@ +package org.apache.hadoop.hive.ql.parse.repl.dump; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.PartitionIterable; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.AuthEntities; +import static org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.Paths; + +/** + * This class manages writing multiple partitions _data files simultaneously. + * it has a blocking queue that stores partitions to be dumped via a producer thread. + * it has a worker thread pool that reads of the queue to perform the various tasks. + */ +class PartitionExport { + private final Paths paths; + private final PartitionIterable partitionIterable; + private final HiveConf hiveConf; + private final int nThreads; + private final AuthEntities authEntities; + + private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class); + private BlockingQueue queue; + + PartitionExport(Paths paths, PartitionIterable partitionIterable, HiveConf hiveConf, + AuthEntities authEntities) { + this.paths = paths; + this.partitionIterable = partitionIterable; + this.hiveConf = hiveConf; + this.authEntities = authEntities; + this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM); + this.queue = new ArrayBlockingQueue<>(2 * nThreads); + } + + void write(final ReplicationSpec forReplicationSpec) throws InterruptedException { + ExecutorService producer = Executors.newFixedThreadPool(1); + producer.submit(() -> { + for (Partition partition : partitionIterable) { + try { + queue.put(partition); + } catch (InterruptedException e) { + throw new RuntimeException( + "Error while queuing up the partitions for export of data files", e); + } + } + }); + producer.shutdown(); + + ThreadFactory namingThreadFactory = + new ThreadFactoryBuilder().setNameFormat("partition-dump-thread-%d").build(); + ExecutorService consumer = Executors.newFixedThreadPool(nThreads, namingThreadFactory); + + while (!producer.isTerminated() || !queue.isEmpty()) { + /* + This is removed using a poll because there can be a case where there partitions iterator is empty + but because both the producer and consumer are started simultaneously the while loop will execute + because producer is not terminated but it wont produce anything so queue will be empty and then we + should only wait for a specific time before continuing, as the next loop cycle will fail. + */ + Partition partition = queue.poll(1, TimeUnit.SECONDS); + if (partition == null) { + continue; + } + LOG.debug("scheduling partition dump {}", partition.getName()); + consumer.submit(() -> { + String partitionName = partition.getName(); + String threadName = Thread.currentThread().getName(); + LOG.debug("Thread: {}, start partition dump {}", threadName, partitionName); + Path fromPath = partition.getDataLocation(); + try { + // this the data copy + Path rootDataDumpDir = paths.partitionExportDir(partitionName); + new FileOperations(fromPath, rootDataDumpDir, hiveConf).export(forReplicationSpec); + authEntities.inputs.add(new ReadEntity(partition)); + LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); + } catch (Exception e) { + throw new RuntimeException("Error while export of data files", e); + } + }); + } + consumer.shutdown(); + // may be drive this via configuration as well. + consumer.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 9f22f230b4..06d88940b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -26,7 +26,6 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.EximUtil; @@ -38,8 +37,10 @@ Licensed to the Apache Software Foundation (ASF) under one import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity; @@ -70,7 +71,7 @@ public TableExport(Paths paths, TableSpec tableSpec, this.paths = paths; } - public AuthEntities run() throws SemanticException { + public AuthEntities write() throws SemanticException { if (tableSpec == null) { writeMetaData(null); } else if (shouldExport()) { @@ -139,13 +140,7 @@ private void writeData(PartitionIterable partitions) throws SemanticException { throw new IllegalStateException( "partitions cannot be null for partitionTable :" + tableSpec.tableName); } - for (Partition partition : partitions) { - Path fromPath = partition.getDataLocation(); - // this the data copy - Path rootDataDumpDir = paths.partitionExportDir(partition.getName()); - new FileOperations(fromPath, rootDataDumpDir, conf).export(replicationSpec); - authEntities.inputs.add(new ReadEntity(partition)); - } + new PartitionExport(paths, partitions, conf, authEntities).write(replicationSpec); } else { Path fromPath = tableSpec.tableHandle.getDataLocation(); //this is the data copy @@ -210,7 +205,7 @@ public Paths(String astRepresentationForErrorMsg, String path, HiveConf conf) } } - private Path partitionExportDir(String partitionName) throws SemanticException { + Path partitionExportDir(String partitionName) throws SemanticException { return exportDir(new Path(exportRootDir, partitionName)); } @@ -271,7 +266,12 @@ private void validateTargetDir(URI rootDirExportFile) throws SemanticException { } public static class AuthEntities { - public final Set inputs = new HashSet<>(); + /** + * This is concurrent implementation as + * @see org.apache.hadoop.hive.ql.parse.repl.dump.PartitionExport + * uses multiple threads to flush out partitions. + */ + public final Set inputs = Collections.newSetFromMap(new ConcurrentHashMap<>()); public final Set outputs = new HashSet<>(); } }