diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index e598a6b97a..6d7ee4cb82 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -219,7 +219,7 @@ public String next() { FileStatus file = files[i]; i++; return ReplChangeManager.encodeFileUri(file.getPath().toString(), - ReplChangeManager.getChksumString(file.getPath(), fs)); + ReplChangeManager.checksumFor(file.getPath(), fs)); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java index 1495c1a8a1..ebbb0b6db3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -156,15 +156,15 @@ public void testRecyclePartTable() throws Exception { Path part1Path = new Path(warehouse.getDefaultPartitionPath(db, tblName, ImmutableMap.of("dt", "20160101")), "part"); createFile(part1Path, "p1"); - String path1Chksum = ReplChangeManager.getChksumString(part1Path, fs); + String path1Chksum = ReplChangeManager.checksumFor(part1Path, fs); Path part2Path = new Path(warehouse.getDefaultPartitionPath(db, tblName, ImmutableMap.of("dt", "20160102")), "part"); createFile(part2Path, "p2"); - String path2Chksum = ReplChangeManager.getChksumString(part2Path, fs); + String path2Chksum = ReplChangeManager.checksumFor(part2Path, fs); Path part3Path = new Path(warehouse.getDefaultPartitionPath(db, tblName, ImmutableMap.of("dt", "20160103")), "part"); createFile(part3Path, "p3"); - String path3Chksum = ReplChangeManager.getChksumString(part3Path, fs); + String path3Chksum = ReplChangeManager.checksumFor(part3Path, fs); assertTrue(part1Path.getFileSystem(hiveConf).exists(part1Path)); assertTrue(part2Path.getFileSystem(hiveConf).exists(part2Path)); @@ -226,15 +226,15 @@ public void testRecycleNonPartTable() throws Exception { Path filePath1 = new Path(warehouse.getDefaultTablePath(db, tblName), "part1"); createFile(filePath1, "f1"); - String fileChksum1 = ReplChangeManager.getChksumString(filePath1, fs); + String fileChksum1 = ReplChangeManager.checksumFor(filePath1, fs); Path filePath2 = new Path(warehouse.getDefaultTablePath(db, tblName), "part2"); createFile(filePath2, "f2"); - String fileChksum2 = ReplChangeManager.getChksumString(filePath2, fs); + String fileChksum2 = ReplChangeManager.checksumFor(filePath2, fs); Path filePath3 = new Path(warehouse.getDefaultTablePath(db, tblName), "part3"); createFile(filePath3, "f3"); - String fileChksum3 = ReplChangeManager.getChksumString(filePath3, fs); + String fileChksum3 = ReplChangeManager.checksumFor(filePath3, fs); assertTrue(filePath1.getFileSystem(hiveConf).exists(filePath1)); assertTrue(filePath2.getFileSystem(hiveConf).exists(filePath2)); @@ -272,26 +272,26 @@ public void testClearer() throws Exception { fs.mkdirs(dirTbl1); Path part11 = new Path(dirTbl1, "part1"); createFile(part11, "testClearer11"); - String fileChksum11 = ReplChangeManager.getChksumString(part11, fs); + String fileChksum11 = ReplChangeManager.checksumFor(part11, fs); Path part12 = new Path(dirTbl1, "part2"); createFile(part12, "testClearer12"); - String fileChksum12 = ReplChangeManager.getChksumString(part12, fs); + String fileChksum12 = ReplChangeManager.checksumFor(part12, fs); Path dirTbl2 = new Path(dirDb, "tbl2"); fs.mkdirs(dirTbl2); Path part21 = new Path(dirTbl2, "part1"); createFile(part21, "testClearer21"); - String fileChksum21 = ReplChangeManager.getChksumString(part21, fs); + String fileChksum21 = ReplChangeManager.checksumFor(part21, fs); Path part22 = new Path(dirTbl2, "part2"); createFile(part22, "testClearer22"); - String fileChksum22 = ReplChangeManager.getChksumString(part22, fs); + String fileChksum22 = ReplChangeManager.checksumFor(part22, fs); Path dirTbl3 = new Path(dirDb, "tbl3"); fs.mkdirs(dirTbl3); Path part31 = new Path(dirTbl3, "part1"); createFile(part31, "testClearer31"); - String fileChksum31 = ReplChangeManager.getChksumString(part31, fs); + String fileChksum31 = ReplChangeManager.checksumFor(part31, fs); Path part32 = new Path(dirTbl3, "part2"); createFile(part32, "testClearer32"); - String fileChksum32 = ReplChangeManager.getChksumString(part32, fs); + String fileChksum32 = ReplChangeManager.checksumFor(part32, fs); ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, false); ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, false); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 6713dff73e..41e834d752 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -107,8 +107,8 @@ public void testCreateFunctionIncrementalReplication() throws Throwable { @Test public void testDropFunctionIncrementalReplication() throws Throwable { primary.run("CREATE FUNCTION " + primaryDbName - + ".testFunction as 'com.yahoo.sketches.hive.theta.DataToSketchUDAF' " - + "using jar 'ivy://com.yahoo.datasketches:sketches-hive:0.8.2'"); + + ".testFunction as 'hivemall.tools.string.StopwordUDF' " + + "using jar 'ivy://io.github.myui:hivemall:0.4.0-2'"); WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null); replica.load(replicatedDbName, bootStrapDump.dumpLocation) .run("REPL STATUS " + replicatedDbName) diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index c955470f91..6f17d23649 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -54,9 +54,9 @@ private String msGroup; private FileSystem fs; - public static final String ORIG_LOC_TAG = "user.original-loc"; - public static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; - public static final String URI_FRAGMENT_SEPARATOR = "#"; + private static final String ORIG_LOC_TAG = "user.original-loc"; + static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; + private static final String URI_FRAGMENT_SEPARATOR = "#"; public static ReplChangeManager getInstance(HiveConf hiveConf) throws MetaException { if (instance == null) { @@ -65,7 +65,7 @@ public static ReplChangeManager getInstance(HiveConf hiveConf) throws MetaExcept return instance; } - ReplChangeManager(HiveConf hiveConf) throws MetaException { + private ReplChangeManager(HiveConf hiveConf) throws MetaException { try { if (!inited) { if (hiveConf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) { @@ -109,7 +109,7 @@ void addFile(Path path) throws MetaException { if (fs.isDirectory(path)) { throw new IllegalArgumentException(path + " cannot be a directory"); } - Path cmPath = getCMPath(hiveConf, getChksumString(path, fs)); + Path cmPath = getCMPath(hiveConf, checksumFor(path, fs)); boolean copySuccessful = FileUtils .copy(path.getFileSystem(hiveConf), path, cmPath.getFileSystem(hiveConf), cmPath, false, false, hiveConf); @@ -134,10 +134,10 @@ void addFile(Path path) throws MetaException { * recursively move files inside directory to cmroot. Note the table must be managed table * @param path a single file or directory * @param ifPurge if the file should skip Trash when delete - * @return + * @return int * @throws MetaException */ - public int recycle(Path path, boolean ifPurge) throws MetaException { + int recycle(Path path, boolean ifPurge) throws MetaException { if (!enabled) { return 0; } @@ -151,7 +151,7 @@ public int recycle(Path path, boolean ifPurge) throws MetaException { count += recycle(file.getPath(), ifPurge); } } else { - Path cmPath = getCMPath(hiveConf, getChksumString(path, fs)); + Path cmPath = getCMPath(hiveConf, checksumFor(path, fs)); if (LOG.isDebugEnabled()) { LOG.debug("Moving " + path.toString() + " to " + cmPath.toString()); @@ -207,7 +207,7 @@ public int recycle(Path path, boolean ifPurge) throws MetaException { } // Get checksum of a file - static public String getChksumString(Path path, FileSystem fs) throws IOException { + static public String checksumFor(Path path, FileSystem fs) throws IOException { // TODO: fs checksum only available on hdfs, need to // find a solution for other fs (eg, local fs, s3, etc) String checksumString = null; @@ -228,13 +228,10 @@ static public void setCmRoot(Path cmRoot) { * to a deterministic location of cmroot. So user can retrieve the file back * with the original location plus checksum. * @param conf - * @param checkSum checksum of the file, can be retrieved by {@link getCksumString} - * @return - * @throws IOException - * @throws MetaException + * @param checkSum checksum of the file, can be retrieved by {@link #checksumFor(Path, FileSystem)} + * @return Path */ - static Path getCMPath(Configuration conf, String checkSum) - throws IOException, MetaException { + static Path getCMPath(Configuration conf, String checkSum) throws IOException, MetaException { String newFileName = checkSum; int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT); @@ -250,28 +247,27 @@ static Path getCMPath(Configuration conf, String checkSum) * Get original file specified by src and chksumString. If the file exists and checksum * matches, return the file; otherwise, use chksumString to retrieve it from cmroot * @param src Original file location - * @param chksumString Checksum of the original file - * @param conf + * @param checksumString Checksum of the original file + * @param hiveConf * @return Corresponding FileStatus object - * @throws MetaException */ - static public FileStatus getFileStatus(Path src, String chksumString, - HiveConf conf) throws MetaException { + static public FileStatus getFileStatus(Path src, String checksumString, + HiveConf hiveConf) throws MetaException { try { - FileSystem srcFs = src.getFileSystem(conf); - if (chksumString == null) { + FileSystem srcFs = src.getFileSystem(hiveConf); + if (checksumString == null) { return srcFs.getFileStatus(src); } if (!srcFs.exists(src)) { - return srcFs.getFileStatus(getCMPath(conf, chksumString)); + return srcFs.getFileStatus(getCMPath(hiveConf, checksumString)); } - String currentChksumString = getChksumString(src, srcFs); - if (currentChksumString == null || chksumString.equals(currentChksumString)) { + String currentChecksumString = checksumFor(src, srcFs); + if (currentChecksumString == null || checksumString.equals(currentChecksumString)) { return srcFs.getFileStatus(src); } else { - return srcFs.getFileStatus(getCMPath(conf, chksumString)); + return srcFs.getFileStatus(getCMPath(hiveConf, checksumString)); } } catch (IOException e) { throw new MetaException(StringUtils.stringifyException(e)); @@ -372,7 +368,7 @@ public void run() { } // Schedule CMClearer thread. Will be invoked by metastore - public static void scheduleCMClearer(HiveConf hiveConf) { + static void scheduleCMClearer(HiveConf hiveConf) { if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.REPLCMENABLED)) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( new BasicThreadFactory.Builder() diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 71db33289f..d0f30bcc2d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -167,7 +167,7 @@ protected int execute(DriverContext driverContext) { }else{ LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri()); console.printInfo("Tracking file: " + oneSrc.getPath().toUri()); - String chksumString = ReplChangeManager.getChksumString(oneSrc.getPath(), actualSrcFs); + String chksumString = ReplChangeManager.checksumFor(oneSrc.getPath(), actualSrcFs); listBW.write(ReplChangeManager.encodeFileUri (oneSrc.getPath().toUri().toString(), chksumString) + "\n"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java index 6c2a4021de..f72f430a09 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java @@ -55,7 +55,7 @@ public void writeTo(JsonWriter writer, ReplicationSpec additionalPropertiesProvi if ("hdfs".equals(inputPath.toUri().getScheme())) { FileSystem fileSystem = inputPath.getFileSystem(hiveConf); Path qualifiedUri = PathBuilder.fullyQualifiedHDFSUri(inputPath, fileSystem); - String checkSum = ReplChangeManager.getChksumString(qualifiedUri, fileSystem); + String checkSum = ReplChangeManager.checksumFor(qualifiedUri, fileSystem); String newFileUri = ReplChangeManager.encodeFileUri(qualifiedUri.toString(), checkSum); resourceUris.add(new ResourceUri(uri.getResourceType(), newFileUri)); } else {