commit a1d811037df97b717c20e9677a6be3ea03834ebc Author: Vihang Karajgaonkar Date: Wed Feb 8 11:41:44 2017 -0800 HIVE-15879 : Fix HiveMetaStoreChecker.checkPartitionDirs method diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java index 7c94c95f00492467ba27dedc9ce513e13c85ea61..5d2ae2ca28b78280434437614518b13a0f3a95cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveMetaStoreChecker.java @@ -19,19 +19,20 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.slf4j.Logger; @@ -403,16 +404,11 @@ static String getPartitionName(Path tablePath, Path partitionPath, */ private void checkPartitionDirs(Path basePath, Set allDirs, int maxDepth) throws IOException, HiveException { - ConcurrentLinkedQueue basePaths = new ConcurrentLinkedQueue<>(); - basePaths.add(basePath); - Set dirSet = Collections.newSetFromMap(new ConcurrentHashMap()); - // Here we just reuse the THREAD_COUNT configuration for - // HIVE_MOVE_FILES_THREAD_COUNT int poolSize = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 15); // Check if too low config is provided for move files. 2x CPU is reasonable max count. poolSize = poolSize == 0 ? poolSize : Math.max(poolSize, - Runtime.getRuntime().availableProcessors() * 2); + getMinPoolSize()); // Fixed thread pool on need basis final ThreadPoolExecutor pool = poolSize > 0 ? (ThreadPoolExecutor) @@ -421,135 +417,176 @@ private void checkPartitionDirs(Path basePath, Set allDirs, int maxDepth) if (pool == null) { LOG.debug("Not-using threaded version of MSCK-GetPaths"); + Queue basePaths = new LinkedList<>(); + basePaths.add(basePath); + checkPartitionDirsSingleThreaded(basePaths, allDirs, basePath.getFileSystem(conf), maxDepth, + maxDepth); } else { - LOG.debug("Using threaded version of MSCK-GetPaths with number of threads " + LOG.debug("Using multi-threaded version of MSCK-GetPaths with number of threads " + pool.getMaximumPoolSize()); + checkPartitionDirsInParallel((ThreadPoolExecutor) pool, basePath, allDirs, + basePath.getFileSystem(conf), maxDepth); } - checkPartitionDirs(pool, basePaths, dirSet, basePath.getFileSystem(conf), maxDepth, maxDepth); if (pool != null) { pool.shutdown(); } - allDirs.addAll(dirSet); } - // process the basePaths in parallel and then the next level of basePaths - private void checkPartitionDirs(final ThreadPoolExecutor pool, - final ConcurrentLinkedQueue basePaths, final Set allDirs, - final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException { - final ConcurrentLinkedQueue nextLevel = new ConcurrentLinkedQueue<>(); + @VisibleForTesting + int getMinPoolSize() { + return Runtime.getRuntime().availableProcessors() * 2; + } - // Check if thread pool can be used. - boolean useThreadPool = false; - if (pool != null) { - synchronized (pool) { - // In case of recursive calls, it is possible to deadlock with TP. Check TP usage here. - if (pool.getActiveCount() < pool.getMaximumPoolSize()) { - useThreadPool = true; - } + private final class PathDepthInfoCallable implements Callable { + private final int maxDepth; + private final FileSystem fs; + private final ConcurrentLinkedQueue pendingPaths; + private final boolean throwException; + private final PathDepthInfo pd; + + private PathDepthInfoCallable(PathDepthInfo pd, int maxDepth, FileSystem fs, + ConcurrentLinkedQueue basePaths) { + this.maxDepth = maxDepth; + this.pd = pd; + this.fs = fs; + this.pendingPaths = basePaths; + this.throwException = "throw" + .equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION)); + } - if (!useThreadPool) { - if (LOG.isDebugEnabled()) { - LOG.debug("Not using threadPool as active count:" + pool.getActiveCount() - + ", max:" + pool.getMaximumPoolSize()); - } - } - } + @Override + public Path call() throws Exception { + return processPathDepthInfo(pd); } - if (null == pool || !useThreadPool) { - for (final Path path : basePaths) { - FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); - boolean fileFound = false; - for (FileStatus status : statuses) { - if (status.isDirectory()) { - nextLevel.add(status.getPath()); - } else { - fileFound = true; - } + private Path processPathDepthInfo(final PathDepthInfo pd) + throws IOException, HiveException, InterruptedException { + final Path currentPath = pd.p; + final int currentDepth = pd.depth; + FileStatus[] fileStatuses = fs.listStatus(currentPath, FileUtils.HIDDEN_FILES_PATH_FILTER); + // found no files under a sub-directory under table base path; it is possible that the table + // is empty and hence there are no partition sub-directories created under base path + if (fileStatuses.length == 0 && currentDepth > 0 && currentDepth < maxDepth) { + // since maxDepth is not yet reached, we are missing partition + // columns in currentPath + if (throwException) { + throw new HiveException( + "MSCK is missing partition columns under " + currentPath.toString()); + } else { + LOG.warn("MSCK is missing partition columns under " + currentPath.toString()); } - if (depth != 0) { - // we are in the middle of the search and we find a file - if (fileFound) { - if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { + } else { + // found files under currentPath add them to the queue if it is a directory + for (FileStatus fileStatus : fileStatuses) { + if (!fileStatus.isDirectory() && currentDepth < maxDepth) { + // found a file at depth which is less than number of partition keys + if (throwException) { throw new HiveException( - "MSCK finds a file rather than a folder when it searches for " + path.toString()); - } else { - LOG.warn("MSCK finds a file rather than a folder when it searches for " - + path.toString()); - } - } - if (!nextLevel.isEmpty()) { - checkPartitionDirs(pool, nextLevel, allDirs, fs, depth - 1, maxDepth); - } else if (depth != maxDepth) { - // since nextLevel is empty, we are missing partition columns. - if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { - throw new HiveException("MSCK is missing partition columns under " + path.toString()); + "MSCK finds a file rather than a directory when it searches for " + + fileStatus.getPath().toString()); } else { - LOG.warn("MSCK is missing partition columns under " + path.toString()); + LOG.warn("MSCK finds a file rather than a directory when it searches for " + + fileStatus.getPath().toString()); } + } else if (fileStatus.isDirectory() && currentDepth < maxDepth) { + // add sub-directory to the work queue if maxDepth is not yet reached + pendingPaths.add(new PathDepthInfo(fileStatus.getPath(), currentDepth + 1)); } - } else { - allDirs.add(path); + } + if (currentDepth == maxDepth) { + return currentPath; } } - } else { - final List> futures = new LinkedList<>(); - for (final Path path : basePaths) { - futures.add(pool.submit(new Callable() { - @Override - public Void call() throws Exception { - FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); - boolean fileFound = false; - for (FileStatus status : statuses) { - if (status.isDirectory()) { - nextLevel.add(status.getPath()); - } else { - fileFound = true; - } - } - if (depth != 0) { - // we are in the middle of the search and we find a file - if (fileFound) { - if ("throw".equals(HiveConf.getVar(conf, - HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { - throw new HiveException( - "MSCK finds a file rather than a folder when it searches for " - + path.toString()); - } else { - LOG.warn("MSCK finds a file rather than a folder when it searches for " - + path.toString()); - } - } - if (!nextLevel.isEmpty()) { - checkPartitionDirs(pool, nextLevel, allDirs, fs, depth - 1, maxDepth); - } else if (depth != maxDepth) { - // since nextLevel is empty, we are missing partition columns. - if ("throw".equals(HiveConf.getVar(conf, - HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { - throw new HiveException("MSCK is missing partition columns under " - + path.toString()); - } else { - LOG.warn("MSCK is missing partition columns under " + path.toString()); - } - } - } else { - allDirs.add(path); - } - return null; + return null; + } + } + + private static class PathDepthInfo { + private final Path p; + private final int depth; + PathDepthInfo(Path p, int depth) { + this.p = p; + this.depth = depth; + } + } + + private void checkPartitionDirsInParallel(final ThreadPoolExecutor pool, + final Path basePath, final Set result, + final FileSystem fs, final int maxDepth) throws HiveException { + try { + Queue> futures = new LinkedList>(); + ConcurrentLinkedQueue nextLevel = new ConcurrentLinkedQueue<>(); + nextLevel.add(new PathDepthInfo(basePath, 0)); + //Uses level parallel implementation of a bfs. Recursive DFS implementations + //have a issue where the number of threads can run out if the number of + //nested sub-directories is more than the pool size. + //Using a two queue implementation is simpler than one queue since then we will + //have to add the complex mechanisms to let the free worker threads know when new levels are + //discovered using notify()/wait() mechanisms which can potentially lead to bugs if + //not done right + while(!nextLevel.isEmpty()) { + ConcurrentLinkedQueue tempQueue = new ConcurrentLinkedQueue<>(); + //process each level in parallel + while(!nextLevel.isEmpty()) { + futures.add( + pool.submit(new PathDepthInfoCallable(nextLevel.poll(), maxDepth, fs, tempQueue))); + } + while(!futures.isEmpty()) { + Path p = futures.poll().get(); + if (p != null) { + result.add(p); } - })); + } + //update the nextlevel with newly discovered sub-directories from the above + nextLevel = tempQueue; } - for (Future future : futures) { - try { - future.get(); - } catch (Exception e) { - LOG.error(e.getMessage()); - pool.shutdownNow(); - throw new HiveException(e.getCause()); + } catch (InterruptedException | ExecutionException e) { + LOG.error(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } + } + + /* + * Original recursive implementation works well for single threaded use-case but has limitations + * if we attempt to parallelize this directly + */ + private void checkPartitionDirsSingleThreaded(Queue basePaths, final Set allDirs, + final FileSystem fs, final int depth, final int maxDepth) throws IOException, HiveException { + final Queue nextLevel = new LinkedList<>(); + for (final Path path : basePaths) { + FileStatus[] statuses = fs.listStatus(path, FileUtils.HIDDEN_FILES_PATH_FILTER); + boolean fileFound = false; + for (FileStatus status : statuses) { + if (status.isDirectory()) { + nextLevel.add(status.getPath()); + } else { + fileFound = true; } } - if (!nextLevel.isEmpty() && depth != 0) { - checkPartitionDirs(pool, nextLevel, allDirs, fs, depth - 1, maxDepth); + if (depth != 0) { + // we are in the middle of the search and we find a file + if (fileFound) { + if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { + throw new HiveException( + "MSCK finds a file rather than a folder when it searches for " + path.toString()); + } else { + LOG.warn("MSCK finds a file rather than a folder when it searches for " + + path.toString()); + } + } + if (!nextLevel.isEmpty()) { + checkPartitionDirsSingleThreaded(nextLevel, allDirs, fs, depth - 1, maxDepth); + } else if (depth != maxDepth) { + // since nextLevel is empty, we are missing partition columns. + if ("throw".equals(HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION))) { + throw new HiveException("MSCK is missing partition columns under " + path.toString()); + } else { + LOG.warn("MSCK is missing partition columns under " + path.toString()); + } + } + } else { + allDirs.add(path); } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java index 35f52cd522e0e48a333e30966245bec65cc2ec9c..f95c130b0e76ef27ffa2ddf02a5bc22fb52934b9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveMetaStoreChecker.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.metadata; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -29,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -37,8 +39,8 @@ import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.util.Shell; import org.apache.thrift.TException; +import org.mockito.Mockito; /** * TestHiveMetaStoreChecker. @@ -63,6 +65,8 @@ protected void setUp() throws Exception { super.setUp(); hive = Hive.get(); + hive.getConf().setIntVar(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT, 15); + hive.getConf().set(HiveConf.ConfVars.HIVE_MSCK_PATH_VALIDATION.varname, "throw"); checker = new HiveMetaStoreChecker(hive); partCols = new ArrayList(); @@ -104,7 +108,6 @@ protected void tearDown() throws Exception { public void testTableCheck() throws HiveException, MetaException, IOException, TException, AlreadyExistsException { - CheckResult result = new CheckResult(); checker.checkMetastore(dbName, null, null, result); // we haven't added anything so should return an all ok @@ -194,7 +197,6 @@ public void testTableCheck() throws HiveException, MetaException, public void testPartitionsCheck() throws HiveException, MetaException, IOException, TException, AlreadyExistsException { - Database db = new Database(); db.setName(dbName); hive.createDatabase(db); @@ -311,4 +313,256 @@ public void testDataDeletion() throws HiveException, MetaException, hive.dropDatabase(dbName); assertFalse(fs.exists(fakeTable)); } + + /* + * Test multi-threaded implementation of checker to find out missing partitions + */ + public void testPartitionsNotInMs() throws HiveException, AlreadyExistsException, IOException { + Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0); + // add 10 partitions on the filesystem + createPartitionsDirectoriesOnFS(testTable, 10); + CheckResult result = new CheckResult(); + checker.checkMetastore(dbName, tableName, null, result); + assertEquals(Collections.emptySet(), result.getTablesNotInMs()); + assertEquals(Collections.emptySet(), result.getTablesNotOnFs()); + assertEquals(Collections.emptySet(), result.getPartitionsNotOnFs()); + assertEquals(10, result.getPartitionsNotInMs().size()); + } + + /* + * Tests single threaded implementation of checkMetastore + */ + public void testSingleThreadedCheckMetastore() + throws HiveException, AlreadyExistsException, IOException { + // set num of threads to 0 so that single-threaded checkMetastore is called + hive.getConf().setIntVar(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT, 0); + Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0); + // add 10 partitions on the filesystem + createPartitionsDirectoriesOnFS(testTable, 10); + CheckResult result = new CheckResult(); + checker.checkMetastore(dbName, tableName, null, result); + assertEquals(Collections. emptySet(), result.getTablesNotInMs()); + assertEquals(Collections. emptySet(), result.getTablesNotOnFs()); + assertEquals(Collections. emptySet(), result.getPartitionsNotOnFs()); + assertEquals(10, result.getPartitionsNotInMs().size()); + } + + /** + * Tests single threaded implementation for deeply nested partitioned tables + * + * @throws HiveException + * @throws AlreadyExistsException + * @throws IOException + */ + public void testSingleThreadedDeeplyNestedTables() + throws HiveException, AlreadyExistsException, IOException { + // set num of threads to 0 so that single-threaded checkMetastore is called + hive.getConf().setIntVar(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT, 0); + // currently HiveMetastoreChecker uses a minimum pool size of 2*numOfProcs + // no other easy way to set it deterministically for this test case + checker = Mockito.spy(checker); + Mockito.when(checker.getMinPoolSize()).thenReturn(2); + int poolSize = checker.getMinPoolSize(); + // create a deeply nested table which has more partition keys than the pool size + Table testTable = createPartitionedTestTable(dbName, tableName, poolSize + 2, 0); + // add 10 partitions on the filesystem + createPartitionsDirectoriesOnFS(testTable, 10); + CheckResult result = new CheckResult(); + checker.checkMetastore(dbName, tableName, null, result); + assertEquals(Collections. emptySet(), result.getTablesNotInMs()); + assertEquals(Collections. emptySet(), result.getTablesNotOnFs()); + assertEquals(Collections. emptySet(), result.getPartitionsNotOnFs()); + assertEquals(10, result.getPartitionsNotInMs().size()); + } + + /** + * Tests the case when the number of partition keys are more than the threadpool size. + * + * @throws HiveException + * @throws AlreadyExistsException + * @throws IOException + */ + public void testDeeplyNestedPartitionedTables() + throws HiveException, AlreadyExistsException, IOException { + // currently HiveMetastoreChecker uses a minimum pool size of 2*numOfProcs + // no other easy way to set it deterministically for this test case + int poolSize = checker.getMinPoolSize(); + checker = Mockito.spy(checker); + Mockito.when(checker.getMinPoolSize()).thenReturn(2); + // create a deeply nested table which has more partition keys than the pool size + Table testTable = createPartitionedTestTable(dbName, tableName, poolSize + 2, 0); + // add 10 partitions on the filesystem + createPartitionsDirectoriesOnFS(testTable, 10); + CheckResult result = new CheckResult(); + checker.checkMetastore(dbName, tableName, null, result); + assertEquals(Collections. emptySet(), result.getTablesNotInMs()); + assertEquals(Collections. emptySet(), result.getTablesNotOnFs()); + assertEquals(Collections. emptySet(), result.getPartitionsNotOnFs()); + assertEquals(10, result.getPartitionsNotInMs().size()); + } + + /** + * Test if checker throws HiveException when the there is a dummy directory present in the nested level + * of sub-directories + * @throws AlreadyExistsException + * @throws IOException + * @throws HiveException + */ + public void testErrorForMissingPartitionColumn() throws AlreadyExistsException, IOException, HiveException { + Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0); + // add 10 partitions on the filesystem + createPartitionsDirectoriesOnFS(testTable, 10); + //create a fake directory to throw exception + StringBuilder sb = new StringBuilder(testTable.getDataLocation().toString()); + sb.append(Path.SEPARATOR); + sb.append("dummyPart=error"); + createDirectory(sb.toString()); + //check result now + CheckResult result = new CheckResult(); + try { + checker.checkMetastore(dbName, tableName, null, result); + } catch (Exception e) { + assertTrue("Expected exception HiveException got " + e.getClass(), e instanceof HiveException); + } + createFile(sb.toString(), "dummyFile"); + result = new CheckResult(); + try { + checker.checkMetastore(dbName, tableName, null, result); + } catch (Exception e) { + assertTrue("Expected exception HiveException got " + e.getClass(), e instanceof HiveException); + } + } + + /* + * Test if single-threaded implementation checker throws HiveException when the there is a dummy + * directory present in the nested level + */ + public void testErrorForMissingPartitionsSingleThreaded() + throws AlreadyExistsException, HiveException, IOException { + // set num of threads to 0 so that single-threaded checkMetastore is called + hive.getConf().setIntVar(HiveConf.ConfVars.HIVE_MOVE_FILES_THREAD_COUNT, 0); + Table testTable = createPartitionedTestTable(dbName, tableName, 2, 0); + // add 10 partitions on the filesystem + createPartitionsDirectoriesOnFS(testTable, 10); + // create a fake directory to throw exception + StringBuilder sb = new StringBuilder(testTable.getDataLocation().toString()); + sb.append(Path.SEPARATOR); + sb.append("dummyPart=error"); + createDirectory(sb.toString()); + // check result now + CheckResult result = new CheckResult(); + try { + checker.checkMetastore(dbName, tableName, null, result); + } catch (Exception e) { + assertTrue("Expected exception HiveException got " + e.getClass(), + e instanceof HiveException); + } + createFile(sb.toString(), "dummyFile"); + result = new CheckResult(); + try { + checker.checkMetastore(dbName, tableName, null, result); + } catch (Exception e) { + assertTrue("Expected exception HiveException got " + e.getClass(), + e instanceof HiveException); + } + } + /** + * Creates a test partitioned table with the required level of nested partitions and number of + * partitions + * + * @param dbName - Database name + * @param tableName - Table name + * @param numOfPartKeys - Number of partition keys (nested levels of sub-directories in base table + * path) + * @param valuesPerPartition - If greater than 0 creates valuesPerPartition dummy partitions + * @return + * @throws AlreadyExistsException + * @throws HiveException + */ + private Table createPartitionedTestTable(String dbName, String tableName, int numOfPartKeys, + int valuesPerPartition) throws AlreadyExistsException, HiveException { + Database db = new Database(); + db.setName(dbName); + hive.createDatabase(db); + + Table table = new Table(dbName, tableName); + table.setDbName(dbName); + table.setInputFormatClass(TextInputFormat.class); + table.setOutputFormatClass(HiveIgnoreKeyTextOutputFormat.class); + // create partition key schema + ArrayList partKeys = new ArrayList(); + for (int i = 1; i <= numOfPartKeys; i++) { + String partName = "part" + String.valueOf(i); + partKeys.add(new FieldSchema(partName, serdeConstants.STRING_TYPE_NAME, "")); + } + table.setPartCols(partKeys); + // create table + hive.createTable(table); + table = hive.getTable(dbName, tableName); + if (valuesPerPartition == 0) { + return table; + } + // create partition specs + ArrayList> partitionSpecs = new ArrayList>(); + for (int partKeyIndex = 0; partKeyIndex < numOfPartKeys; partKeyIndex++) { + String partName = partKeys.get(partKeyIndex).getName(); + Map partMap = new HashMap<>(); + for (int val = 1; val <= valuesPerPartition; val++) { + partMap.put(partName, String.valueOf(val)); + } + partitionSpecs.add(partMap); + } + + // create partitions + for (Map partSpec : partitionSpecs) { + hive.createPartition(table, partSpec); + } + + List partitions = hive.getPartitions(table); + assertEquals(numOfPartKeys * valuesPerPartition, partitions.size()); + return table; + } + + /** + * Creates partition sub-directories for a given table on the file system. Used to test the + * use-cases when partitions for the table are not present in the metastore db + * + * @param table - Table which provides the base locations and partition specs for creating the + * sub-directories + * @param numPartitions - Number of partitions to be created + * @throws IOException + */ + private void createPartitionsDirectoriesOnFS(Table table, int numPartitions) throws IOException { + String path = table.getDataLocation().toString(); + fs = table.getPath().getFileSystem(hive.getConf()); + int numPartKeys = table.getPartitionKeys().size(); + for (int i = 0; i < numPartitions; i++) { + StringBuilder partPath = new StringBuilder(path); + partPath.append(Path.SEPARATOR); + for (int j = 0; j < numPartKeys; j++) { + FieldSchema field = table.getPartitionKeys().get(j); + partPath.append(field.getName()); + partPath.append('='); + partPath.append("val_"); + partPath.append(i); + if (j < (numPartKeys - 1)) { + partPath.append(Path.SEPARATOR); + } + } + createDirectory(partPath.toString()); + } + } + + private void createFile(String partPath, String filename) throws IOException { + Path part = new Path(partPath); + fs.mkdirs(part); + fs.createNewFile(new Path(partPath + Path.SEPARATOR + filename)); + fs.deleteOnExit(part); + } + + private void createDirectory(String partPath) throws IOException { + Path part = new Path(partPath); + fs.mkdirs(part); + fs.deleteOnExit(part); + } }