commit bb68b887736afe3ded0f32c78ac8bda56bd8b214 Author: Vihang Karajgaonkar Date: Thu Mar 30 16:33:09 2017 -0700 HIVE-16299 : MSCK REPAIR TABLE should enforce partition key order when adding unknown partitions diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java index 84c090239df39d7ea987d561bf4ab1e852f75624..da24c7041fae72768794f3ba47b2d416d8cf8083 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java @@ -331,7 +331,7 @@ void findUnknownPartitions(Table table, Set partPaths, // now check the table folder and see if we find anything // that isn't in the metastore Set allPartDirs = new HashSet(); - checkPartitionDirs(tablePath, allPartDirs, table.getPartCols().size()); + checkPartitionDirs(tablePath, allPartDirs, Collections.unmodifiableList(table.getPartColNames())); // don't want the table dir allPartDirs.remove(tablePath); @@ -415,14 +415,14 @@ static String getPartitionName(Path tablePath, Path partitionPath, * Start directory * @param allDirs * This set will contain the leaf paths at the end. - * @param maxDepth + * @param list * Specify how deep the search goes. * @throws IOException * Thrown if we can't get lists from the fs. * @throws HiveException */ - private void checkPartitionDirs(Path basePath, Set allDirs, int maxDepth) throws IOException, HiveException { + private void checkPartitionDirs(Path basePath, Set allDirs, final List partColNames) throws IOException, HiveException { // Here we just reuse the THREAD_COUNT configuration for // METASTORE_FS_HANDLER_THREADS_COUNT since this results in better performance // The number of missing partitions discovered are later added by metastore using a @@ -440,21 +440,21 @@ private void checkPartitionDirs(Path basePath, Set allDirs, int maxDepth) new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MSCK-GetPaths-%d").build(); executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(poolSize, threadFactory); } - checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), maxDepth); + checkPartitionDirs(executor, basePath, allDirs, basePath.getFileSystem(conf), partColNames); executor.shutdown(); } private final class PathDepthInfoCallable implements Callable { - private final int maxDepth; + private final List partColNames; private final FileSystem fs; private final ConcurrentLinkedQueue pendingPaths; private final boolean throwException; private final PathDepthInfo pd; - private PathDepthInfoCallable(PathDepthInfo pd, int maxDepth, FileSystem fs, + private PathDepthInfoCallable(PathDepthInfo pd, List partColNames, FileSystem fs, ConcurrentLinkedQueue basePaths) { - this.maxDepth = maxDepth; + this.partColNames = partColNames; this.pd = pd; this.fs = fs; this.pendingPaths = basePaths; @@ -474,39 +474,49 @@ private Path processPathDepthInfo(final PathDepthInfo pd) FileStatus[] fileStatuses = fs.listStatus(currentPath, FileUtils.HIDDEN_FILES_PATH_FILTER); // found no files under a sub-directory under table base path; it is possible that the table // is empty and hence there are no partition sub-directories created under base path - if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < maxDepth) { + if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < partColNames.size()) { // since maxDepth is not yet reached, we are missing partition // columns in currentPath - if (throwException) { - throw new HiveException( - "MSCK is missing partition columns under " + currentPath.toString()); - } else { - LOG.warn("MSCK is missing partition columns under " + currentPath.toString()); - } + logOrThrowExceptionWithMsg( + "MSCK is missing partition columns under " + currentPath.toString()); } else { // found files under currentPath add them to the queue if it is a directory for (FileStatus fileStatus : fileStatuses) { - if (!fileStatus.isDirectory() && currentDepth < maxDepth) { + if (!fileStatus.isDirectory() && currentDepth < partColNames.size()) { // found a file at depth which is less than number of partition keys - if (throwException) { - throw new HiveException( - "MSCK finds a file rather than a directory when it searches for " - + fileStatus.getPath().toString()); - } else { - LOG.warn("MSCK finds a file rather than a directory when it searches for " - + fileStatus.getPath().toString()); + logOrThrowExceptionWithMsg( + "MSCK finds a file rather than a directory when it searches for " + + fileStatus.getPath().toString()); + } else if (fileStatus.isDirectory() && currentDepth < partColNames.size()) { + // found a sub-directory at a depth less than number of partition keys + // validate if the partition directory name matches with the corresponding + // partition colName at currentDepth + Path nextPath = fileStatus.getPath(); + String[] parts = nextPath.getName().split("="); + if (parts.length != 2) { + logOrThrowExceptionWithMsg("Invalid partition name " + nextPath); + } else if (!parts[0].equalsIgnoreCase(partColNames.get(currentDepth))) { + logOrThrowExceptionWithMsg( + "Unexpected partition key " + parts[0] + " found at " + nextPath); } - } else if (fileStatus.isDirectory() && currentDepth < maxDepth) { // add sub-directory to the work queue if maxDepth is not yet reached - pendingPaths.add(new PathDepthInfo(fileStatus.getPath(), currentDepth + 1)); + pendingPaths.add(new PathDepthInfo(nextPath, currentDepth + 1)); } } - if (currentDepth == maxDepth) { + if (currentDepth == partColNames.size()) { return currentPath; } } return null; } + + private void logOrThrowExceptionWithMsg(String msg) throws HiveException { + if(throwException) { + throw new HiveException(msg); + } else { + LOG.warn(msg); + } + } } private static class PathDepthInfo { @@ -520,7 +530,7 @@ private Path processPathDepthInfo(final PathDepthInfo pd) private void checkPartitionDirs(final ExecutorService executor, final Path basePath, final Set result, - final FileSystem fs, final int maxDepth) throws HiveException { + final FileSystem fs, final List partColNames) throws HiveException { try { Queue> futures = new LinkedList>(); ConcurrentLinkedQueue nextLevel = new ConcurrentLinkedQueue<>(); @@ -537,7 +547,7 @@ private void checkPartitionDirs(final ExecutorService executor, //process each level in parallel while(!nextLevel.isEmpty()) { futures.add( - executor.submit(new PathDepthInfoCallable(nextLevel.poll(), maxDepth, fs, tempQueue))); + executor.submit(new PathDepthInfoCallable(nextLevel.poll(), partColNames, fs, tempQueue))); } while(!futures.isEmpty()) { Path p = futures.poll().get(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java index 21bc8eed06c8fcd99d1e4eee3b5e5f8a4fb812be..d7fbbce2d8dbbd28b91978ba79639a2616b687bd 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.metadata; +import static org.junit.Assert.*; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -27,7 +29,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -37,17 +38,17 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.thrift.TException; -import org.mockito.Mockito; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import com.google.common.collect.Lists; -import junit.framework.TestCase; - /** * TestHiveMetaStoreChecker. * */ -public class TestHiveMetaStoreChecker extends TestCase { +public class TestHiveMetaStoreChecker { private Hive hive; private FileSystem fs; @@ -62,9 +63,8 @@ private List partCols; private List> parts; - @Override - protected void setUp() throws Exception { - super.setUp(); + @Before + public void setUp() throws Exception { hive = Hive.get(); hive.getConf().setIntVar(HiveConf.ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT, 15); hive.getConf().set(HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION.varname, "throw"); @@ -100,13 +100,13 @@ private void dropDbTable() { } } - @Override - protected void tearDown() throws Exception { + @After + public void tearDown() throws Exception { dropDbTable(); - super.tearDown(); Hive.closeCurrent(); } + @Test public void testTableCheck() throws HiveException, MetaException, IOException, TException, AlreadyExistsException { CheckResult result = new CheckResult(); @@ -196,8 +196,46 @@ public void testTableCheck() throws HiveException, MetaException, assertEquals(Collections.emptySet(), result.getPartitionsNotInMs()); } - public void testPartitionsCheck() throws HiveException, MetaException, - IOException, TException, AlreadyExistsException { + /* + * Tests the case when tblPath/p1=a/p2=b/p3=c/file for a table with partition (p1, p2) + * does not throw HiveException + */ + @Test + public void testAdditionalPartitionDirs() + throws HiveException, AlreadyExistsException, IOException { + Table table = createTestTable(); + List partitions = hive.getPartitions(table); + assertEquals(2, partitions.size()); + // add a fake partition dir on fs + fs = partitions.get(0).getDataLocation().getFileSystem(hive.getConf()); + Path fakePart = new Path(table.getDataLocation().toString(), + partDateName + "=2017-01-01/" + partCityName + "=paloalto/fakePartCol=fakepartValue"); + fs.mkdirs(fakePart); + fs.deleteOnExit(fakePart); + CheckResult result = new CheckResult(); + checker.checkMetastore(dbName, tableName, null, result); + assertEquals(Collections. emptySet(), result.getTablesNotInMs()); + assertEquals(Collections. emptySet(), result.getTablesNotOnFs()); + assertEquals(Collections. emptySet(), result.getPartitionsNotOnFs()); + //fakePart path partition is added since the defined partition keys are valid + assertEquals(1, result.getPartitionsNotInMs().size()); + } + + @Test(expected = HiveException.class) + public void testInvalidPartitionKeyName() throws HiveException, AlreadyExistsException, IOException { + Table table = createTestTable(); + List partitions = hive.getPartitions(table); + assertEquals(2, partitions.size()); + // add a fake partition dir on fs + fs = partitions.get(0).getDataLocation().getFileSystem(hive.getConf()); + Path fakePart = new Path(table.getDataLocation().toString(), + "fakedate=2009-01-01/fakecity=sanjose"); + fs.mkdirs(fakePart); + fs.deleteOnExit(fakePart); + checker.checkMetastore(dbName, tableName, null, new CheckResult()); + } + + private Table createTestTable() throws AlreadyExistsException, HiveException { Database db = new Database(); db.setName(dbName); hive.createDatabase(db); @@ -214,6 +252,13 @@ public void testPartitionsCheck() throws HiveException, MetaException, for (Map partSpec : parts) { hive.createPartition(table, partSpec); } + return table; + } + + @Test + public void testPartitionsCheck() throws HiveException, MetaException, + IOException, TException, AlreadyExistsException { + Table table = createTestTable(); CheckResult result = new CheckResult(); checker.checkMetastore(dbName, tableName, null, result); @@ -225,18 +270,6 @@ public void testPartitionsCheck() throws HiveException, MetaException, List partitions = hive.getPartitions(table); assertEquals(2, partitions.size()); - // add a fake partition dir on fs to ensure that it does not get added - fs = partitions.get(0).getDataLocation().getFileSystem(hive.getConf()); - Path fakePart = new Path(table.getDataLocation().toString(), - "fakedate=2009-01-01/fakecity=sanjose"); - fs.mkdirs(fakePart); - fs.deleteOnExit(fakePart); - checker.checkMetastore(dbName, tableName, null, result); - assertEquals(Collections.emptySet(), result.getTablesNotInMs()); - assertEquals(Collections.emptySet(), result.getTablesNotOnFs()); - assertEquals(0, result.getPartitionsNotOnFs().size()); - assertEquals(0, result.getPartitionsNotInMs().size()); - assertEquals(2, partitions.size()); //no additional partitions got added Partition partToRemove = partitions.get(0); // As this partition (partdate=2008-01-01/partcity=london) is the only @@ -282,6 +315,7 @@ public void testPartitionsCheck() throws HiveException, MetaException, System.err.println("Test completed - partition check"); } + @Test public void testDataDeletion() throws HiveException, MetaException, IOException, TException, AlreadyExistsException, NoSuchObjectException { @@ -318,6 +352,7 @@ public void testDataDeletion() throws HiveException, MetaException, /* * Test multi-threaded implementation of checker to find out missing partitions */ + @Test public void testPartitionsNotInMs() throws HiveException, AlreadyExistsException, IOException { Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0); // add 10 partitions on the filesystem @@ -333,6 +368,7 @@ public void testPartitionsNotInMs() throws HiveException, AlreadyExistsException /* * Tests single threaded implementation of checkMetastore */ + @Test public void testSingleThreadedCheckMetastore() throws HiveException, AlreadyExistsException, IOException { // set num of threads to 0 so that single-threaded checkMetastore is called @@ -355,6 +391,7 @@ public void testSingleThreadedCheckMetastore() * @throws AlreadyExistsException * @throws IOException */ + @Test public void testSingleThreadedDeeplyNestedTables() throws HiveException, AlreadyExistsException, IOException { // set num of threads to 0 so that single-threaded checkMetastore is called @@ -379,6 +416,7 @@ public void testSingleThreadedDeeplyNestedTables() * @throws AlreadyExistsException * @throws IOException */ + @Test public void testDeeplyNestedPartitionedTables() throws HiveException, AlreadyExistsException, IOException { hive.getConf().setIntVar(HiveConf.ConfVars.METASTORE_FS_HANDLER_THREADS_COUNT, 2); @@ -402,6 +440,7 @@ public void testDeeplyNestedPartitionedTables() * @throws IOException * @throws HiveException */ + @Test public void testErrorForMissingPartitionColumn() throws AlreadyExistsException, IOException, HiveException { Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0); // add 10 partitions on the filesystem @@ -431,10 +470,28 @@ public void testErrorForMissingPartitionColumn() throws AlreadyExistsException, assertTrue("Expected HiveException", exception!=null && exception instanceof HiveException); } + /** + * Tests if there exists a unknown partition directory on the FS with in-valid order of partition + * keys than what is specified in table specification. + * + * @throws AlreadyExistsException + * @throws HiveException + * @throws IOException + */ + @Test(expected = HiveException.class) + public void testInvalidOrderForPartitionKeysOnFS() + throws AlreadyExistsException, HiveException, IOException { + Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0); + // add 10 partitions on the filesystem + createInvalidPartitionDirsOnFS(testTable, 10); + CheckResult result = new CheckResult(); + checker.checkMetastore(dbName, tableName, null, result); + } /* * Test if single-threaded implementation checker throws HiveException when the there is a dummy * directory present in the nested level */ + @Test public void testErrorForMissingPartitionsSingleThreaded() throws AlreadyExistsException, HiveException, IOException { // set num of threads to 0 so that single-threaded checkMetastore is called @@ -530,29 +587,59 @@ private Table createPartitionedTestTable(String dbName, String tableName, int nu * @param table - Table which provides the base locations and partition specs for creating the * sub-directories * @param numPartitions - Number of partitions to be created + * @param reverseOrder - If set to true creates the partition sub-directories in the reverse order + * of specified by partition keys defined for the table * @throws IOException */ - private void createPartitionsDirectoriesOnFS(Table table, int numPartitions) throws IOException { + private void createPartitionsDirectoriesOnFS(Table table, int numPartitions, boolean reverseOrder) throws IOException { String path = table.getDataLocation().toString(); fs = table.getPath().getFileSystem(hive.getConf()); int numPartKeys = table.getPartitionKeys().size(); for (int i = 0; i < numPartitions; i++) { StringBuilder partPath = new StringBuilder(path); partPath.append(Path.SEPARATOR); - for (int j = 0; j < numPartKeys; j++) { - FieldSchema field = table.getPartitionKeys().get(j); - partPath.append(field.getName()); - partPath.append('='); - partPath.append("val_"); - partPath.append(i); - if (j < (numPartKeys - 1)) { - partPath.append(Path.SEPARATOR); + if (!reverseOrder) { + for (int j = 0; j < numPartKeys; j++) { + FieldSchema field = table.getPartitionKeys().get(j); + partPath.append(field.getName()); + partPath.append('='); + partPath.append("val_"); + partPath.append(i); + if (j < (numPartKeys - 1)) { + partPath.append(Path.SEPARATOR); + } + } + } else { + for (int j = numPartKeys - 1; j >= 0; j--) { + FieldSchema field = table.getPartitionKeys().get(j); + partPath.append(field.getName()); + partPath.append('='); + partPath.append("val_"); + partPath.append(i); + if (j > 0) { + partPath.append(Path.SEPARATOR); + } } } createDirectory(partPath.toString()); } } + private void createPartitionsDirectoriesOnFS(Table table, int numPartitions) throws IOException { + createPartitionsDirectoriesOnFS(table, numPartitions, false); + } + /** + * Creates a partition directory structure on file system but with a reverse order + * of sub-directories compared to the partition keys defined in the table. Eg. if the + * partition keys defined in table are (a int, b int, c int) this method will create + * an invalid directory c=val_1/b=val_1/a=val_1 + * @param table + * @throws IOException + */ + private void createInvalidPartitionDirsOnFS(Table table, int numPartitions) throws IOException { + createPartitionsDirectoriesOnFS(table, numPartitions, true); + } + private void createFile(String partPath, String filename) throws IOException { Path part = new Path(partPath); fs.mkdirs(part); diff --git a/ql/src/test/queries/clientnegative/msck_repair_4.q b/ql/src/test/queries/clientnegative/msck_repair_4.q new file mode 100644 index 0000000000000000000000000000000000000000..f38b75925d991ae46d6145200f29114c0e7fc598 --- /dev/null +++ b/ql/src/test/queries/clientnegative/msck_repair_4.q @@ -0,0 +1,14 @@ +set hive.msck.repair.batch.size=1; + +DROP TABLE IF EXISTS repairtable; + +CREATE TABLE repairtable(col STRING) PARTITIONED BY (p1 STRING, p2 STRING); + +MSCK TABLE repairtable; + +dfs ${system:test.dfs.mkdir} ${system:test.warehouse.dir}/repairtable/p2=c/p1=a/p3=b; +dfs -touchz ${system:test.warehouse.dir}/repairtable/p2=c/p1=a/p3=b/datafile; + +MSCK REPAIR TABLE default.repairtable; + +DROP TABLE default.repairtable; diff --git a/ql/src/test/results/clientnegative/msck_repair_4.q.out b/ql/src/test/results/clientnegative/msck_repair_4.q.out new file mode 100644 index 0000000000000000000000000000000000000000..bb9cf47b08cd77f0bccc49bffbfc4424ca37e796 --- /dev/null +++ b/ql/src/test/results/clientnegative/msck_repair_4.q.out @@ -0,0 +1,22 @@ +PREHOOK: query: DROP TABLE IF EXISTS repairtable +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS repairtable +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE TABLE repairtable(col STRING) PARTITIONED BY (p1 STRING, p2 STRING) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@repairtable +POSTHOOK: query: CREATE TABLE repairtable(col STRING) PARTITIONED BY (p1 STRING, p2 STRING) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@repairtable +PREHOOK: query: MSCK TABLE repairtable +PREHOOK: type: MSCK +PREHOOK: Output: default@repairtable +POSTHOOK: query: MSCK TABLE repairtable +POSTHOOK: type: MSCK +POSTHOOK: Output: default@repairtable +PREHOOK: query: MSCK REPAIR TABLE default.repairtable +PREHOOK: type: MSCK +PREHOOK: Output: default@repairtable +FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask