diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExport.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExport.java new file mode 100644 index 0000000000..0907b9f150 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestExport.java @@ -0,0 +1,60 @@ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class TestExport { + + protected static final Logger LOG = LoggerFactory.getLogger(TestExport.class); + private static WarehouseInstance hiveWarehouse; + + @Rule + public final TestName testName = new TestName(); + private String dbName; + + @BeforeClass + public static void classLevelSetup() throws Exception { + Configuration conf = new Configuration(); + conf.set("dfs.client.use.datanode.hostname", "true"); + MiniDFSCluster miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + hiveWarehouse = new WarehouseInstance(LOG, miniDFSCluster, false); + } + + @AfterClass + public static void classLevelTearDown() throws IOException { + hiveWarehouse.close(); + } + + @Before + public void setup() throws Throwable { + dbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); + hiveWarehouse.run("create database " + dbName); + } + + @Test + public void shouldExportImportATemporaryTable() throws Throwable { + String path = "hdfs:///tmp/" + dbName + "/"; + String exportPath = "'" + path + "'"; + String importDataPath = path + "/data"; + hiveWarehouse + .run("use " + dbName) + .run("create temporary table t1 (i int)") + .run("insert into table t1 values (1),(2)") + .run("export table t1 to " + exportPath) + .run("create temporary table t2 like t1") + .run("load data inpath '" + importDataPath + "' overwrite into table t2") + .run("select * from t2") + .verifyResults(new String[] { "1", "2" }); + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 1128eae6f9..8dfab084c9 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -64,7 +64,7 @@ Licensed to the Apache Software Foundation (ASF) under one private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName(); - WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception { + WarehouseInstance(Logger logger, MiniDFSCluster cluster, boolean hiveInTests) throws Exception { this.logger = logger; this.miniDFSCluster = cluster; assert miniDFSCluster.isClusterUp(); @@ -73,10 +73,14 @@ Licensed to the Apache Software Foundation (ASF) under one Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier); this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString(); - initialize(cmRootPath.toString()); + initialize(cmRootPath.toString(), hiveInTests); + } + + WarehouseInstance(Logger logger, MiniDFSCluster cluster) throws Exception { + this(logger, cluster, true); } - private void initialize(String cmRoot) throws Exception { + private void initialize(String cmRoot, boolean hiveInTest) throws Exception { hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class); String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname); String hiveWarehouseLocation = System.getProperty("test.warehouse.dir", "/tmp") @@ -89,6 +93,7 @@ private void initialize(String cmRoot) throws Exception { return; } + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, hiveInTest); // turn on db notification listener on meta store hiveConf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, LISTENER_CLASS); hiveConf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); @@ -177,8 +182,7 @@ WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Thro } WarehouseInstance verify(String data) throws IOException { - verifyResults(data == null ? new String[] {} : new String[] { data }); - return this; + return verifyResults(data == null ? new String[] {} : new String[] { data }); } /** @@ -188,7 +192,7 @@ WarehouseInstance verify(String data) throws IOException { * Unless for Null Values it actually returns in UpperCase and hence explicitly lowering case * before assert. */ - private void verifyResults(String[] data) throws IOException { + WarehouseInstance verifyResults(String[] data) throws IOException { List results = getOutput(); logger.info("Expecting {}", StringUtils.join(data, ",")); logger.info("Got {}", results); @@ -196,6 +200,7 @@ private void verifyResults(String[] data) throws IOException { for (int i = 0; i < data.length; i++) { assertEquals(data[i].toLowerCase(), results.get(i).toLowerCase()); } + return this; } List getOutput() throws IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 9f22f230b4..4d23efc5e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -57,7 +57,7 @@ public TableExport(Paths paths, TableSpec tableSpec, throws SemanticException { this.tableSpec = (tableSpec != null && tableSpec.tableHandle.isTemporary() - && !replicationSpec.isInReplicationScope()) + && replicationSpec.isInReplicationScope()) ? null : tableSpec; this.replicationSpec = replicationSpec; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java new file mode 100644 index 0000000000..f20bfaa61b --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java @@ -0,0 +1,121 @@ +package org.apache.hadoop.hive.ql.parse.repl.dump.io; + +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class CopyUtils { + + private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class); + + private final HiveConf hiveConf; + private final long maxCopyFileSize; + private final long maxNumberOfFiles; + private final boolean hiveInTest; + private final String copyAsUser; + + CopyUtils(HiveConf hiveConf) { + this.hiveConf = hiveConf; + maxNumberOfFiles = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES); + maxCopyFileSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE); + hiveInTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST); + this.copyAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + } + + void doCopy(Path destination, List srcPaths) throws IOException { + Map> map = fsToFileMap(srcPaths); + FileSystem destinationFs = destination.getFileSystem(hiveConf); + + for (Map.Entry> entry : map.entrySet()) { + if (regularCopy(destinationFs, entry)) { + Path[] paths = entry.getValue().toArray(new Path[] {}); + FileUtil.copy(entry.getKey(), paths, destinationFs, destination, false, true, hiveConf); + } else { + FileUtils.distCp( + entry.getKey(), // source file system + entry.getValue(), // list of source paths + destination, + false, + copyAsUser, + hiveConf, + ShimLoader.getHadoopShims() + ); + } + } + } + + /* + Check for conditions that will lead to local copy, checks are: + 1. we are testing hive. + 2. both source and destination are same FileSystem + 3. either source or destination is a "local" FileSystem("file") + 4. aggregate fileSize of all source Paths(can be directory / file) is less than configured size. + 5. number of files of all source Paths(can be directory / file) is less than configured size. + */ + private boolean regularCopy(FileSystem destinationFs, Map.Entry> entry) + throws IOException { + if (hiveInTest) { + return true; + } + FileSystem sourceFs = entry.getKey(); + boolean isLocalFs = isLocal(sourceFs) || isLocal(destinationFs); + boolean sameFs = sourceFs.equals(destinationFs); + if (isLocalFs || sameFs) { + return true; + } + + /* + we have reached the point where we are transferring files across fileSystems. + */ + long size = 0; + long numberOfFiles = 0; + + for (Path path : entry.getValue()) { + ContentSummary contentSummary = sourceFs.getContentSummary(path); + size += contentSummary.getLength(); + numberOfFiles += contentSummary.getFileCount(); + if (limitReachedForLocalCopy(size, numberOfFiles)) { + return false; + } + } + return true; + } + + private boolean limitReachedForLocalCopy(long size, long numberOfFiles) { + boolean result = size > maxCopyFileSize || numberOfFiles > maxNumberOfFiles; + if (result) { + LOG.info("Source is {} bytes. (MAX: {})", size, maxCopyFileSize); + LOG.info("Source is {} files. (MAX: {})", numberOfFiles, maxNumberOfFiles); + LOG.info("going to launch distributed copy (distcp) job."); + } + return result; + } + + private boolean isLocal(FileSystem fs) { + return fs.getScheme().equals("file"); + } + + private Map> fsToFileMap(List srcPaths) throws IOException { + Map> result = new HashMap<>(); + for (Path path : srcPaths) { + FileSystem fileSystem = path.getFileSystem(hiveConf); + if (!result.containsKey(fileSystem)) { + result.put(fileSystem, new ArrayList<>()); + } + result.get(fileSystem).add(path); + } + return result; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index 164ca74e18..e1e3ae1412 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -22,7 +22,6 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; -import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; @@ -70,8 +69,7 @@ private void copyFiles() throws IOException { for (FileStatus fileStatus : fileStatuses) { srcPaths.add(fileStatus.getPath()); } - - ReplCopyTask.doCopy(exportRootDataDir, exportFileSystem, srcPaths, dataFileSystem, hiveConf); + new CopyUtils(hiveConf).doCopy(exportRootDataDir, srcPaths); } /**