diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java index c25e6e2159..09d7e0f5ef 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -220,9 +220,10 @@ public void externalTableReplicationWithCustomPaths() throws Throwable { // Create base directory but use HDFS path without schema or authority details. // Hive should pick up the local cluster's HDFS schema/authority. externalTableBasePathWithClause(); - List loadWithClause = Collections.singletonList( + List loadWithClause = Arrays.asList( "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" - + REPLICA_EXTERNAL_BASE + "'" + + REPLICA_EXTERNAL_BASE + "'", + "'distcp.options.update'=''" ); WarehouseInstance.Tuple bootstrapTuple = primary.run("use " + primaryDbName) @@ -359,19 +360,62 @@ public void externalTableIncrementalReplication() throws Throwable { WarehouseInstance.Tuple tuple = primary.dump("repl dump " + primaryDbName); replica.load(replicatedDbName, tuple.dumpLocation); + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/t1/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + tuple = primary.run("use " + primaryDbName) - .run("create external table t1 (place string) partitioned by (country string)") + .run("create external table t1 (place string) partitioned by (country string) row format " + + "delimited fields terminated by ',' location '" + externalTableLocation.toString() + + "'") .run("alter table t1 add partition(country='india')") .run("alter table t1 add partition(country='us')") .dump(primaryDbName, tuple.lastReplicationId); + assertExternalFileInfo(Collections.singletonList("t1"), new Path(tuple.dumpLocation, FILE_NAME)); + + // Add new data externally, to a partition, but under the partition level top directory + // Also, it is added after dumping the events but data should be seen at target after REPL LOAD. + Path partitionDir = new Path(externalTableLocation, "country=india"); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) { + outputStream.write("pune\n".getBytes()); + outputStream.write("mumbai\n".getBytes()); + } + + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) { + outputStream.write("bangalore\n".getBytes()); + } + List loadWithClause = externalTableBasePathWithClause(); replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) .run("use " + replicatedDbName) .run("show tables like 't1'") .verifyResult("t1") .run("show partitions t1") - .verifyResults(new String[] { "country=india", "country=us" }); + .verifyResults(new String[] { "country=india", "country=us" }) + .run("select place from t1 order by place") + .verifyResults(new String[] { "bangalore", "mumbai", "pune" }); + + // Delete one of the file and update another one. + fs.delete(new Path(partitionDir, "file.txt"), true); + fs.delete(new Path(partitionDir, "file1.txt"), true); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file1.txt"))) { + outputStream.write("chennai\n".getBytes()); + } + + // Repl load with zero events but external tables location info should present. + tuple = primary.dump(primaryDbName, tuple.lastReplicationId); + assertExternalFileInfo(Collections.singletonList("t1"), new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show partitions t1") + .verifyResults(new String[] { "country=india", "country=us" }) + .run("select place from t1 order by place") + .verifyResults(new String[] { "chennai" }); Hive hive = Hive.get(replica.getConf()); Set partitions = @@ -391,7 +435,6 @@ public void externalTableIncrementalReplication() throws Throwable { for (String path : paths) { assertTrue(replica.miniDFSCluster.getFileSystem().exists(new Path(path))); } - } @Test @@ -492,9 +535,10 @@ public void bootstrapExternalTablesDuringIncrementalPhase() throws Throwable { fileSystem.mkdirs(externalTableLocation); // this is required since the same filesystem is used in both source and target - return Collections.singletonList( - "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" - + externalTableLocation.toString() + "'" + return Arrays.asList( + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + + externalTableLocation.toString() + "'", + "'distcp.options.pugpb'=''" ); } diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index b6f70ebe63..e774419eb8 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -1110,20 +1110,30 @@ public void setStoragePolicy(Path path, StoragePolicyValue policy) private static final String DISTCP_OPTIONS_PREFIX = "distcp.options."; List constructDistCpParams(List srcPaths, Path dst, Configuration conf) { + // -update and -delete are mandatory options for directory copy to work. + // -pbx is default preserve options if user doesn't pass any. List params = new ArrayList(); + boolean needToAddPreserveOption = true; for (Map.Entry entry : conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){ String distCpOption = entry.getKey(); String distCpVal = entry.getValue(); + if (distCpOption.startsWith("p")) { + needToAddPreserveOption = false; + } params.add("-" + distCpOption); if ((distCpVal != null) && (!distCpVal.isEmpty())){ params.add(distCpVal); } } - if (params.size() == 0){ - // if no entries were added via conf, we initiate our defaults - params.add("-update"); + if (needToAddPreserveOption) { params.add("-pbx"); } + if (!params.contains("-update")) { + params.add("-update"); + } + if (!params.contains("-delete")) { + params.add("-delete"); + } for (Path src : srcPaths) { params.add(src.toString()); } @@ -1150,10 +1160,11 @@ public Boolean run() throws Exception { @Override public boolean runDistCp(List srcPaths, Path dst, Configuration conf) throws IOException { DistCpOptions options = new DistCpOptions.Builder(srcPaths, dst) - .withSyncFolder(true) - .withCRC(true) - .preserve(FileAttribute.BLOCKSIZE) - .build(); + .withSyncFolder(true) + .withDeleteMissing(true) + .preserve(FileAttribute.BLOCKSIZE) + .preserve(FileAttribute.XATTR) + .build(); // Creates the command-line parameters for distcp List params = constructDistCpParams(srcPaths, dst, conf);