commit bd01b206709979bf2bbb6d4ccfa8d4652731c0d7 Author: Vihang Karajgaonkar Date: Tue Feb 21 16:14:12 2017 -0800 HIVE-15880 : Allow insert overwrite query to use auto.purge table property diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 9a0521c27594663c0247bf72c5b80da75d0124f4..f0f208a1e381b6d3026c6ef94342ee1e3f1acfc8 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -616,15 +616,19 @@ static boolean copy(FileSystem srcFS, Path src, * @return true if move successful * @throws IOException */ - public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf) + public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf, boolean purge) throws IOException { LOG.debug("deleting " + f); boolean result = false; try { - result = Trash.moveToAppropriateTrash(fs, f, conf); - if (result) { - LOG.trace("Moved to trash: " + f); - return true; + if(purge) { + LOG.debug("purge is set to true. Not moving to Trash " + f); + } else { + result = Trash.moveToAppropriateTrash(fs, f, conf); + if (result) { + LOG.trace("Moved to trash: " + f); + return true; + } } } catch (IOException ioe) { // for whatever failure reason including that trash has lower encryption zone diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAutoPurgeTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAutoPurgeTables.java new file mode 100644 index 0000000000000000000000000000000000000000..2cd2255e35340851c06a97c067133556e9d2f91b --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAutoPurgeTables.java @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hive.jdbc.miniHS2.MiniHS2; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestAutoPurgeTables { + private static final String driverName = "org.apache.hive.jdbc.HiveDriver"; + private static final String testDbName = "auto_purge_test_db"; + private static final String testTableName = "auto_purge_test_table"; + private static final String partitionedColumnName = "partCol"; + private static final String partitionedColumnValue1 = "20090619"; + private static final String partitionedColumnValue2 = "20100720"; + private static HiveConf conf; + private static Connection con; + private static MiniHS2 miniHS2; + + private static Connection getConnection(String url) throws SQLException { + Connection con1; + con1 = DriverManager.getConnection(url, "", ""); + Assert.assertNotNull("Connection is null", con1); + Assert.assertFalse("Connection should not be closed", con1.isClosed()); + return con1; + } + + private static void createTestTable(Statement stmt, boolean isAutopurge, boolean isExternal, + boolean isPartitioned) throws SQLException { + stmt.execute("drop table if exists " + testDbName + "." + testTableName); + if (isPartitioned) { + // create a partitioned table + stmt.execute("create table " + testDbName + "." + testTableName + " (id int, value string) " + + " partitioned by (" + partitionedColumnName + " STRING)"); + // load data + stmt.execute("insert into " + testDbName + "." + testTableName + " PARTITION (" + + partitionedColumnName + "=" + partitionedColumnValue1 + + ") values (1, \"dummy1\"), (2, \"dummy2\"), (3, \"dummy3\")"); + stmt.execute("insert into " + testDbName + "." + testTableName + " PARTITION (" + + partitionedColumnName + "=" + partitionedColumnValue2 + + ") values (4, \"dummy4\"), (5, \"dummy5\"), (6, \"dummy6\")"); + } else { + // create a table + stmt.execute("create table " + testDbName + "." + testTableName + " (id int, value string)"); + // load data + stmt.execute("insert into " + testDbName + "." + testTableName + + " values (1, \"dummy1\"), (2, \"dummy2\"), (3, \"dummy3\")"); + } + if (isAutopurge) { + stmt.execute("alter table " + testDbName + "." + testTableName + + " set tblproperties (\"auto.purge\"=\"true\")"); + } else { + stmt.execute("alter table " + testDbName + "." + testTableName + + " set tblproperties (\"auto.purge\"=\"false\")"); + } + if (isExternal) { + stmt.execute("alter table " + testDbName + "." + testTableName + + " set tblproperties (\"external\"=\"true\")"); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = new HiveConf(TestAutoPurgeTables.class); + // enable trash so it can be tested + conf.setFloat("fs.trash.checkpoint.interval", 30); + conf.setFloat("fs.trash.interval", 30); + // Create test database and base tables once for all the test + miniHS2 = new MiniHS2.Builder().withConf(conf).build(); + miniHS2.start(new HashMap()); + Class.forName(driverName); + con = getConnection(miniHS2.getBaseJdbcURL() + testDbName + ";create=true"); + try (Statement stmt = con.createStatement()) { + Assert.assertNotNull("Statement is null", stmt); + stmt.execute("set hive.support.concurrency = false"); + stmt.execute("drop database if exists " + testDbName + " cascade"); + stmt.execute("create database " + testDbName); + } + } + + @AfterClass + public static void tearDownAfterClass() { + Statement stmt = null; + try { + stmt = con.createStatement(); + // drop test db and its tables and views + stmt.execute("set hive.support.concurrency = false"); + stmt.execute("drop database if exists " + testDbName + " cascade"); + FileSystem fs = FileSystem.get(conf); + fs.deleteOnExit(ShimLoader.getHadoopShims().getCurrentTrashPath(conf, fs)); + } catch (SQLException | IOException e) { + e.printStackTrace(); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // + } + } + if (con != null) { + try { + con.close(); + } catch (SQLException e) { + // + } + } + if (miniHS2 != null) { + miniHS2.cleanup(); + miniHS2.stop(); + miniHS2 = null; + } + } + } + + @After + public void afterTest() throws Exception { + try (Statement stmt = con.createStatement()) { + stmt.execute("drop table if exists " + testDbName + "." + testTableName); + } + } + + /** + * Tests if previous table data skips trash when insert overwrite table .. is run against a table + * which has auto.purge property set + * + * @throws Exception + */ + @Test + public void testAutoPurge() throws Exception { + try (Statement stmt = con.createStatement()) { + // create a test table with auto.purge = true + createTestTable(stmt, true, false, false); + int numFilesInTrashBefore = getTrashContents(); + stmt.execute( + "insert overwrite table " + testDbName + "." + testTableName + " select 1, \"test\""); + int numFilesInTrashAfter = getTrashContents(); + Assert.assertEquals( + String.format( + "Data should not have been moved to trash. Number of files in trash: before : %d after %d", + numFilesInTrashBefore, numFilesInTrashAfter), + numFilesInTrashBefore, numFilesInTrashAfter); + } + } + + /** + * Tests if the auto.purge property works correctly for external tables. Old data should skip + * trash when insert overwrite table .. is run when auto.purge is set to true + * + * @throws Exception + */ + @Test + public void testExternalTable() throws Exception { + try (Statement stmt = con.createStatement()) { + // create a external test table with auto.purge = true + createTestTable(stmt, true, true, false); + int numFilesInTrashBefore = getTrashContents(); + stmt.execute( + "insert overwrite table " + testDbName + "." + testTableName + " select 1, \"test\""); + int numFilesInTrashAfter = getTrashContents(); + Assert.assertEquals( + String.format( + "Data should not have been moved to trash. Number of files in trash: before : %d after %d", + numFilesInTrashBefore, numFilesInTrashAfter), + numFilesInTrashBefore, numFilesInTrashAfter); + } + } + + /** + * Tests auto.purge when managed table is partitioned. Old data should skip trash when insert + * overwrite table .. is run and auto.purge property is set to true + * + * @throws Exception + */ + @Test + public void testPartitionedTable() throws Exception { + try (Statement stmt = con.createStatement()) { + // create a managed, partitioned test table with auto.purge = true + createTestTable(stmt, true, false, true); + int numFilesInTrashBefore = getTrashContents(true); + stmt.execute("insert overwrite table " + testDbName + "." + testTableName + " PARTITION (" + + partitionedColumnName + "=" + partitionedColumnValue1 + ")" + " select 1, \"test\""); + int numFilesInTrashAfter = getTrashContents(true); + Assert.assertEquals( + String.format( + "Data should not have been moved to trash. Number of files in trash: before : %d after %d", + numFilesInTrashBefore, numFilesInTrashAfter), + numFilesInTrashBefore, numFilesInTrashAfter); + } + } + + /** + * Tests auto.purge for an external, partitioned table. Old partition data should skip trash when + * auto.purge is set to true + * + * @throws Exception + */ + @Test + public void testExternalPartitionedTable() throws Exception { + try (Statement stmt = con.createStatement()) { + // create a managed, partitioned test table with auto.purge = true + createTestTable(stmt, true, true, true); + int numFilesInTrashBefore = getTrashContents(true); + stmt.execute("insert overwrite table " + testDbName + "." + testTableName + " PARTITION (" + + partitionedColumnName + "=" + partitionedColumnValue1 + ")" + " select 1, \"test\""); + int numFilesInTrashAfter = getTrashContents(true); + Assert.assertEquals( + String.format( + "Data should not have been moved to trash. Number of files in trash: before : %d after %d", + numFilesInTrashBefore, numFilesInTrashAfter), + numFilesInTrashBefore, numFilesInTrashAfter); + } + } + + /** + * Tests when auto.purge is set to false, older data is moved to Trash when insert overwrite table + * .. is run + * + * @throws Exception + */ + @Test + public void testNoAutoPurge() throws Exception { + try (Statement stmt = con.createStatement()) { + // create a test table with auto.purge = true + createTestTable(stmt, false, false, false); + int numFilesInTrashBefore = getTrashContents(); + stmt.execute( + "insert overwrite table " + testDbName + "." + testTableName + " select 1, \"test\""); + int numFilesInTrashAfter = getTrashContents(); + Assert.assertTrue( + String.format( + "Data should have been moved to trash. Number of files in trash: before : %d after %d", + numFilesInTrashBefore, numFilesInTrashAfter), + numFilesInTrashBefore < numFilesInTrashAfter); + } + } + + /** + * Tests when auto.purge is set to false on a external table, older data is moved to Trash when + * insert overwrite table .. is run + * + * @throws Exception + */ + @Test + public void testExternalNoAutoPurge() throws Exception { + try (Statement stmt = con.createStatement()) { + // create a test table with auto.purge = true + createTestTable(stmt, false, true, false); + int numFilesInTrashBefore = getTrashContents(); + stmt.execute( + "insert overwrite table " + testDbName + "." + testTableName + " select 1, \"test\""); + int numFilesInTrashAfter = getTrashContents(); + Assert.assertTrue( + String.format( + "Data should not have been moved to trash. Number of files in trash: before : %d after %d", + numFilesInTrashBefore, numFilesInTrashAfter), + numFilesInTrashBefore < numFilesInTrashAfter); + } + } + + /** + * Tests when auto.purge is set to false on a partitioned table, older data is moved to Trash when + * insert overwrite table .. is run + * + * @throws Exception + */ + @Test + public void testPartitionedNoAutoPurge() throws Exception { + try (Statement stmt = con.createStatement()) { + // create a test table with auto.purge = true + createTestTable(stmt, false, false, true); + int numFilesInTrashBefore = getTrashContents(true); + stmt.execute("insert overwrite table " + testDbName + "." + testTableName + " PARTITION (" + + partitionedColumnName + "=" + partitionedColumnValue1 + ")" + " select 1, \"test\""); + int numFilesInTrashAfter = getTrashContents(true); + Assert.assertTrue( + String.format( + "Data should have been moved to trash. Number of files in trash: before : %d after %d", + numFilesInTrashBefore, numFilesInTrashAfter), + numFilesInTrashBefore < numFilesInTrashAfter); + } + } + + /** + * Tests when auto.purge is set to false on a partitioned external table, older data is moved to + * Trash when insert overwrite table .. is run + * + * @throws Exception + */ + @Test + public void testPartitionedExternalNoAutoPurge() throws Exception { + try (Statement stmt = con.createStatement()) { + // create a test table with auto.purge = false + createTestTable(stmt, false, true, true); + // TODO: add partitioned directory check in the trash + int numFilesInTrashBefore = getTrashContents(true); + stmt.execute("insert overwrite table " + testDbName + "." + testTableName + " PARTITION (" + + partitionedColumnName + "=" + partitionedColumnValue1 + ")" + " select 1, \"test\""); + int numFilesInTrashAfter = getTrashContents(true); + Assert.assertTrue( + String.format( + "Data should have been moved to trash. Number of files in trash: before : %d after %d", + numFilesInTrashBefore, numFilesInTrashAfter), + numFilesInTrashBefore < numFilesInTrashAfter); + } + } + + private int getTrashContents() throws Exception { + return getTrashContents(false); + } + + private int getTrashContents(boolean checkPartition) throws Exception { + FileSystem fs = FileSystem.get(conf); + Path trashDir = ShimLoader.getHadoopShims().getCurrentTrashPath(conf, fs); + Path tableLocation = new Path(conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname) + + Path.SEPARATOR + testDbName + ".db" + Path.SEPARATOR + testTableName); + String child = Path.getPathWithoutSchemeAndAuthority(tableLocation).toString().substring(1); + if (checkPartition) { + child = child + Path.SEPARATOR + partitionedColumnName + "=" + partitionedColumnValue1; + } + Path trashLocation = new Path(trashDir, child); + FileStatus[] filestatus = fs.globStatus(trashLocation.suffix("/*")); + return filestatus == null ? 0 : filestatus.length; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index f13781943da240bcc81873daf4682ffee7415a5a..bb85896d5dd341c36a7594233b99f5d66069ec67 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4652,7 +4652,7 @@ private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws H Map partSpec = truncateTableDesc.getPartSpec(); Table table = db.getTable(tableName, true); - + boolean isAutopurge = "true".equalsIgnoreCase(table.getProperty("auto.purge")); try { // this is not transactional for (Path location : getLocations(db, table, partSpec)) { @@ -4663,7 +4663,7 @@ private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws H HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(conf, fs, location); FileStatus targetStatus = fs.getFileStatus(location); String targetGroup = targetStatus == null ? null : targetStatus.getGroup(); - FileUtils.moveToTrash(fs, location, conf); + FileUtils.moveToTrash(fs, location, conf, isAutopurge); fs.mkdirs(location); HdfsUtils.setFullFileStatus(conf, status, targetGroup, fs, location, false); } else { @@ -4671,7 +4671,7 @@ private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws H if (statuses == null || statuses.length == 0) { continue; } - boolean success = Hive.trashFiles(fs, statuses, conf); + boolean success = Hive.trashFiles(fs, statuses, conf, isAutopurge); if (!success) { throw new HiveException("Error in deleting the contents of " + location.toString()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index ed854bf48b7507857e28c155a90963204a05c913..f64cfda6da2b82ec277889378ea111e17f044ed3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1646,8 +1646,9 @@ public Partition loadPartition(Path loadPath, Table tbl, PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin("MoveTask", "FileMoves"); if (replace || (oldPart == null && !isAcid)) { + boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(), - isSrcLocal); + isSrcLocal, isAutoPurge); } else { if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) { newFiles = Collections.synchronizedList(new ArrayList()); @@ -2012,7 +2013,8 @@ public void loadTable(Path loadPath, String tableName, boolean replace, boolean } if (replace) { Path tableDest = tbl.getPath(); - replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal); + boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); + replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal, isAutopurge); } else { FileSystem fs; try { @@ -3398,11 +3400,13 @@ private static void moveAcidDeltaFiles(String deltaFileType, PathFilter pathFilt * @param oldPath * The directory where the old data location, need to be cleaned up. Most of time, will be the same * as destf, unless its across FileSystem boundaries. + * @param purge + * When set to true files which needs to be deleted are not moved to Trash * @param isSrcLocal * If the source directory is LOCAL */ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf, - boolean isSrcLocal) throws HiveException { + boolean isSrcLocal, boolean purge) throws HiveException { try { FileSystem destFs = destf.getFileSystem(conf); @@ -3435,7 +3439,7 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, // existing content might result in incorrect (extra) data. // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is // not the destf or its subdir? - oldPathDeleted = trashFiles(oldFs, statuses, conf); + oldPathDeleted = trashFiles(oldFs, statuses, conf, purge); } } catch (IOException e) { if (isOldPathUnderDestf) { @@ -3495,7 +3499,8 @@ protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, * @return true if deletion successful * @throws IOException */ - public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuses, final Configuration conf) + public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuses, + final Configuration conf, final boolean purge) throws IOException { boolean result = true; @@ -3509,13 +3514,13 @@ public static boolean trashFiles(final FileSystem fs, final FileStatus[] statuse final SessionState parentSession = SessionState.get(); for (final FileStatus status : statuses) { if (null == pool) { - result &= FileUtils.moveToTrash(fs, status.getPath(), conf); + result &= FileUtils.moveToTrash(fs, status.getPath(), conf, purge); } else { futures.add(pool.submit(new Callable() { @Override public Boolean call() throws Exception { SessionState.setCurrentSessionState(parentSession); - return FileUtils.moveToTrash(fs, status.getPath(), conf); + return FileUtils.moveToTrash(fs, status.getPath(), conf, purge); } })); }