diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index efe9fff780..376007b14d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -1503,6 +1503,11 @@ public void testIncrementalLoad() throws IOException { Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName); + //Verify dump data structure + Path hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), hconf); + verifyDataFileExist(fs, hiveDumpDir, null, new Path(unptnLocn).getName()); + verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror); run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver); @@ -1523,7 +1528,11 @@ public void testIncrementalLoad() throws IOException { + ".ptned WHERE b=2", driver); verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver); - incrementalLoadAndVerify(dbName, replDbName); + incrementalDump = incrementalLoadAndVerify(dbName, replDbName); + hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR); + verifyDataFileExist(fs, hiveDumpDir, "b=1", new Path(ptnLocn1).getName()); + verifyDataFileExist(fs, hiveDumpDir, "b=2", new Path(ptnLocn2).getName()); + verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptnData2, driverMirror); verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror); @@ -3784,6 +3793,24 @@ private void verifyFail(String cmd, IDriver myDriver) throws RuntimeException { assertFalse(success); } + private void verifyDataFileExist(FileSystem fs, Path hiveDumpDir, String part, String dataFile) throws IOException { + FileStatus[] eventFileStatuses = fs.listStatus(hiveDumpDir); + boolean dataFileFound = false; + for (FileStatus eventFileStatus: eventFileStatuses) { + String dataRelativePath = null; + if (part == null) { + dataRelativePath = EximUtil.DATA_PATH_NAME + File.separator + dataFile; + } else { + dataRelativePath = EximUtil.DATA_PATH_NAME + File.separator + part + File.separator + dataFile; + } + if (fs.exists(new Path(eventFileStatus.getPath(), dataRelativePath))) { + dataFileFound = true; + break; + } + } + assertTrue(dataFileFound); + } + private void verifyRunWithPatternMatch(String cmd, String key, String pattern, IDriver myDriver) throws IOException { run(cmd, myDriver); List results = getOutput(myDriver); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 7354a3e7e0..68628250c7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -77,6 +77,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import java.io.File; import java.io.IOException; import java.io.Serializable; import java.net.URI; @@ -505,9 +506,8 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, Task copyTask = null; if (replicationSpec.isInReplicationScope()) { - boolean isImport = ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType()); copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(), - isAutoPurge, needRecycle, copyToMigratedTxnTable, !isImport); + isAutoPurge, needRecycle, copyToMigratedTxnTable, false); } else { copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } @@ -597,6 +597,18 @@ private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, return addPartTask; } else { String srcLocation = partSpec.getLocation(); + if (!(replicationSpec.isInReplicationScope() + && ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType()))) { + Path partLocation = new Path(partSpec.getLocation()); + Path dataDirBase = partLocation.getParent(); + String bucketDir = partLocation.getName(); + for (int i=1; i copyTask = null; if (replicationSpec.isInReplicationScope()) { - boolean isImport = ReplicationSpec.Type.IMPORT.equals(replicationSpec.getReplSpecType()); copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath, - x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, !isImport); + x.getConf(), isAutoPurge, needRecycle, copyToMigratedTxnTable, false); } else { copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java index 049c06b3dd..107eaeccde 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java @@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; -import java.io.BufferedWriter; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -89,13 +88,12 @@ public long toEventId() { return event.getEventId(); } - protected void writeFileEntry(String dbName, Table table, String file, BufferedWriter fileListWriter, - Context withinContext) + protected void writeFileEntry(Table table, String file, Context withinContext) throws IOException, LoginException, MetaException, HiveFatalException { HiveConf hiveConf = withinContext.hiveConf; String distCpDoAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); if (!Utils.shouldDumpMetaDataOnly(withinContext.hiveConf)) { - Path dataPath = new Path(withinContext.dumpRoot.toString(), EximUtil.DATA_PATH_NAME); + Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); List filePaths = new ArrayList<>(); String[] decodedURISplits = ReplChangeManager.decodeFileUri(file); String srcDataFile = decodedURISplits[0]; @@ -103,7 +101,7 @@ protected void writeFileEntry(String dbName, Table table, String file, BufferedW if (dataPath.toUri().getScheme() == null) { dataPath = new Path(srcDataPath.toUri().getScheme(), srcDataPath.toUri().getAuthority(), dataPath.toString()); } - String eventTblPath = event.getEventId() + File.separator + dbName + File.separator + table.getTableName(); + String srcDataFileRelativePath = null; if (srcDataFile.contains(table.getPath().toString())) { srcDataFileRelativePath = srcDataFile.substring(table.getPath().toString().length() + 1); @@ -112,9 +110,7 @@ protected void writeFileEntry(String dbName, Table table, String file, BufferedW } else { srcDataFileRelativePath = srcDataFileRelativePath + File.separator + srcDataPath.getName(); } - Path targetPath = new Path(dataPath, eventTblPath + File.separator + srcDataFileRelativePath); - String encodedTargetPath = ReplChangeManager.encodeFileUri( - targetPath.toString(), decodedURISplits[1], decodedURISplits[3]); + Path targetPath = new Path(dataPath, srcDataFileRelativePath); ReplChangeManager.FileInfo f = ReplChangeManager.getFileInfo(new Path(decodedURISplits[0]), decodedURISplits[1], decodedURISplits[2], decodedURISplits[3], hiveConf); filePaths.add(f); @@ -124,7 +120,6 @@ protected void writeFileEntry(String dbName, Table table, String file, BufferedW finalTargetPath = finalTargetPath.getParent(); } new CopyUtils(distCpDoAsUser, hiveConf, dstFs).copyAndVerify(finalTargetPath, filePaths, srcDataPath); - fileListWriter.write(encodedTargetPath + "\n"); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java index a06b90d6ef..ab8c15ad0b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java @@ -108,10 +108,8 @@ public void handle(Context withinContext) throws Exception { Iterable files = partitionFilesIter.next().getFiles(); if (files != null) { // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext, qlPtn)) { - for (String file : files) { - writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); - } + for (String file : files) { + writeFileEntry(qlMdTable, file, withinContext); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java index dc87506d2e..cfa650b0eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java @@ -68,10 +68,8 @@ private BufferedWriter writer(Context withinContext, Path dataPath) throws IOExc private void writeDumpFiles(Table qlMdTable, Context withinContext, Iterable files, Path dataPath) throws IOException, LoginException, MetaException, HiveFatalException { // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { - for (String file : files) { - writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); - } + for (String file : files) { + writeFileEntry(qlMdTable, file, withinContext); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java index 7a6ddf9e19..6037ba3494 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java @@ -79,14 +79,11 @@ public void handle(Context withinContext) throws Exception { withinContext.replicationSpec, withinContext.hiveConf); - Path dataPath = new Path(withinContext.eventRoot, "data"); Iterable files = eventMessage.getFiles(); if (files != null) { // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { - for (String file : files) { - writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); - } + for (String file : files) { + writeFileEntry(qlMdTable, file, withinContext); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java index 4e02620ece..20dc7bc58b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java @@ -81,24 +81,9 @@ public void handle(Context withinContext) throws Exception { Iterable files = eventMessage.getFiles(); if (files != null) { - Path dataPath; - if ((null == qlPtns) || qlPtns.isEmpty()) { - dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME); - } else { - /* - * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple - * tables. But, Insert event is generated for each partition to which the data is inserted. So, qlPtns list - * will have only one entry. - */ - assert(1 == qlPtns.size()); - dataPath = new Path(withinContext.eventRoot, qlPtns.get(0).getName()); - } - // encoded filename/checksum of files, write into _files - try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) { - for (String file : files) { - writeFileEntry(qlMdTable.getDbName(), qlMdTable, file, fileListWriter, withinContext); - } + for (String file : files) { + writeFileEntry(qlMdTable, file, withinContext); } }