From b656cacec426c882b01f539f64ae11301cf25129 Mon Sep 17 00:00:00 2001 From: Alexander Kolbasov Date: Fri, 20 Apr 2018 17:51:16 -0700 Subject: [PATCH 1/1] HIVE-18743: CREATE TABLE on S3 data can be extremely slow. DO_NOT_UPDATE_STATS workaround is buggy. --- .../hadoop/hive/metastore/HiveAlterHandler.java | 2 +- .../hadoop/hive/metastore/HiveMetaStore.java | 4 +- .../hadoop/hive/metastore/MetaStoreUtils.java | 96 +++++----- .../hadoop/hive/metastore/TestMetaStoreUtils.java | 212 +++++++++++++++++++++ .../ql/metadata/SessionHiveMetaStoreClient.java | 3 +- 5 files changed, 259 insertions(+), 58 deletions(-) diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 83c68a25ab..3d7df221fe 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -289,7 +289,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String dbname, !isPartitionedTable) { Database db = msdb.getDatabase(newDbName); // Update table stats. For partitioned table, we update stats in alterPartition() - MetaStoreUtils.updateTableStatsFast(db, newt, wh, false, true, environmentContext); + MetaStoreUtils.updateTableStatsSlow(db, newt, wh, false, true, environmentContext); } if (cascade && isPartitionedTable) { diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index aa233dda03..02c345bb5f 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -63,7 +63,6 @@ import com.google.common.collect.Multimaps; import org.apache.commons.cli.OptionBuilder; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JvmPauseMonitor; @@ -85,7 +84,6 @@ import org.apache.hadoop.hive.metastore.events.AddPartitionEvent; import org.apache.hadoop.hive.metastore.events.AlterIndexEvent; import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent; -import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent; import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent; @@ -1455,7 +1453,7 @@ private void create_table_core(final RawStore ms, final Table tbl, } if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) && !MetaStoreUtils.isView(tbl)) { - MetaStoreUtils.updateTableStatsFast(db, tbl, wh, madeDir, envContext); + MetaStoreUtils.updateTableStatsSlow(db, tbl, wh, madeDir, false, envContext); } // set create time diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java index 13bdf8e083..3b2da1089b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java @@ -188,72 +188,64 @@ public static boolean containsAllFastStats(Map partParams) { return true; } - public static boolean updateTableStatsFast(Database db, Table tbl, Warehouse wh, - boolean madeDir, EnvironmentContext environmentContext) throws MetaException { - return updateTableStatsFast(db, tbl, wh, madeDir, false, environmentContext); - } - - public static boolean updateTableStatsFast(Database db, Table tbl, Warehouse wh, - boolean madeDir, boolean forceRecompute, EnvironmentContext environmentContext) throws MetaException { - if (tbl.getPartitionKeysSize() == 0) { - // Update stats only when unpartitioned - FileStatus[] fileStatuses = wh.getFileStatusesForUnpartitionedTable(db, tbl); - return updateTableStatsFast(tbl, fileStatuses, madeDir, forceRecompute, environmentContext); - } else { - return false; - } - } - /** * Updates the numFiles and totalSize parameters for the passed Table by querying * the warehouse if the passed Table does not already have values for these parameters. + * NOTE: This function is rather expensive since it needs to traverse the file system to get all + * the information. + * * @param tbl - * @param fileStatus * @param newDir if true, the directory was just created and can be assumed to be empty * @param forceRecompute Recompute stats even if the passed Table already has * these parameters set - * @return true if the stats were updated, false otherwise */ - public static boolean updateTableStatsFast(Table tbl, FileStatus[] fileStatus, boolean newDir, - boolean forceRecompute, EnvironmentContext environmentContext) throws MetaException { - + public static void updateTableStatsSlow(Database db, Table tbl, Warehouse wh, + boolean newDir, boolean forceRecompute, + EnvironmentContext environmentContext) throws MetaException { + // DO_NOT_UPDATE_STATS is supposed to be a transient parameter that is only passed via RPC + // We want to avoid this property from being persistent. + // + // NOTE: If this property *is* set as table property we will remove it which is incorrect but + // we can't distinguish between these two cases + // + // This problem was introduced by HIVE-10228. A better approach would be to pass the property + // via the environment context. Map params = tbl.getParameters(); - - if ((params!=null) && params.containsKey(StatsSetupConst.DO_NOT_UPDATE_STATS)){ - boolean doNotUpdateStats = Boolean.valueOf(params.get(StatsSetupConst.DO_NOT_UPDATE_STATS)); + boolean updateStats = true; + if ((params != null) && params.containsKey(StatsSetupConst.DO_NOT_UPDATE_STATS)) { + updateStats = !Boolean.valueOf(params.get(StatsSetupConst.DO_NOT_UPDATE_STATS)); params.remove(StatsSetupConst.DO_NOT_UPDATE_STATS); - tbl.setParameters(params); // to make sure we remove this marker property - if (doNotUpdateStats){ - return false; - } } - boolean updated = false; - if (forceRecompute || - params == null || - !containsAllFastStats(params)) { - if (params == null) { - params = new HashMap(); - } - if (!newDir) { - // The table location already exists and may contain data. - // Let's try to populate those stats that don't require full scan. - LOG.info("Updating table stats fast for " + tbl.getTableName()); - populateQuickStats(fileStatus, params); - LOG.info("Updated size of table " + tbl.getTableName() +" to "+ params.get(StatsSetupConst.TOTAL_SIZE)); - if (environmentContext != null - && environmentContext.isSetProperties() - && StatsSetupConst.TASK.equals(environmentContext.getProperties().get( - StatsSetupConst.STATS_GENERATED))) { - StatsSetupConst.setBasicStatsState(params, StatsSetupConst.TRUE); - } else { - StatsSetupConst.setBasicStatsState(params, StatsSetupConst.FALSE); - } - } + if (!updateStats || newDir || tbl.getPartitionKeysSize() != 0) { + return; + } + + // If stats are already present and forceRecompute isn't set, nothing to do + if (!forceRecompute && params != null && containsAllFastStats(params)) { + return; + } + + // NOTE: wh.getFileStatusesForUnpartitionedTable() can be REALLY slow + FileStatus[] fileStatus = wh.getFileStatusesForUnpartitionedTable(db, tbl); + if (params == null) { + params = new HashMap<>(); tbl.setParameters(params); - updated = true; } - return updated; + // The table location already exists and may contain data. + // Let's try to populate those stats that don't require full scan. + LOG.info("Updating table stats for {}", tbl.getTableName()); + populateQuickStats(fileStatus, params); + LOG.info("Updated size of table {} to {}", + tbl.getTableName(), params.get(StatsSetupConst.TOTAL_SIZE)); + if (environmentContext != null + && environmentContext.isSetProperties() + && StatsSetupConst.TASK.equals(environmentContext.getProperties().get( + StatsSetupConst.STATS_GENERATED))) { + StatsSetupConst.setBasicStatsState(params, StatsSetupConst.TRUE); + } else { + StatsSetupConst.setBasicStatsState(params, StatsSetupConst.FALSE); + } } public static void populateQuickStats(FileStatus[] fileStatus, Map params) { diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreUtils.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreUtils.java index e5c8a40e39..0ffe78244a 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreUtils.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreUtils.java @@ -18,13 +18,51 @@ package org.apache.hadoop.hive.metastore; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hive.common.StatsSetupConst.COLUMN_STATS_ACCURATE; +import static org.apache.hadoop.hive.common.StatsSetupConst.NUM_FILES; +import static org.apache.hadoop.hive.common.StatsSetupConst.STATS_GENERATED; +import static org.apache.hadoop.hive.common.StatsSetupConst.TOTAL_SIZE; +import static org.apache.hadoop.hive.metastore.MetaStoreUtils.updateTableStatsSlow; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestMetaStoreUtils { + private static final String DB_NAME = "db1"; + private static final String TABLE_NAME = "tbl1"; + private static final String SERDE_LIB = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; + + + private final Map paramsWithStats = ImmutableMap.of(NUM_FILES, "1", TOTAL_SIZE, "2"); + + private Database db; + + public TestMetaStoreUtils() { + db = new Database(DB_NAME, "", "/", null); + } @Test public void testcolumnsIncludedByNameType() { @@ -40,4 +78,178 @@ public void testcolumnsIncludedByNameType() { Assert.assertTrue(MetaStoreUtils.columnsIncludedByNameType(Arrays.asList(col1, col2), Arrays.asList(col3, col2, col1))); Assert.assertFalse(MetaStoreUtils.columnsIncludedByNameType(Arrays.asList(col1, col2), Arrays.asList(col1))); } + + /** + * Verify that updateTableStatsSlow really updates table statistics. + * The test does the following: + *
    + *
  1. Create database
  2. + *
  3. Create unpartitioned table
  4. + *
  5. Create unpartitioned table which has params
  6. + *
  7. Call updateTableStatsSlow with arguments which should caue stats calculation
  8. + *
  9. Verify table statistics using mocked warehouse
  10. + *
  11. Create table which already have stats
  12. + *
  13. Call updateTableStatsSlow forcing stats recompute
  14. + *
  15. Verify table statistics using mocked warehouse
  16. + *
  17. Verifies behavior when STATS_GENERATED is set in environment context
  18. + *
+ */ + @Test + public void testUpdateTableStatsSlow_statsUpdated() throws TException { + long fileLength = 5; + + // Create database and table + Table tbl = new Table(); + tbl.setDbName(DB_NAME); + tbl.setTableName(TABLE_NAME); + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(Collections.singletonList(new FieldSchema("id", "int", ""))); + sd.setLocation("/tmp"); + tbl.setSd(sd); + tbl.setParameters(new HashMap()); + + // Set up mock warehouse + FileStatus fs1 = new FileStatus(1, true, 2, 3, + 4, new Path("/tmp/0")); + FileStatus fs2 = new FileStatus(fileLength, false, 3, 4, + 5, new Path("/tmp/1")); + FileStatus fs3 = new FileStatus(fileLength, false, 3, 4, + 5, new Path("/tmp/1")); + FileStatus[] fileStatus = {fs1, fs2, fs3}; + Warehouse wh = mock(Warehouse.class); + when(wh.getFileStatusesForUnpartitionedTable(db, tbl)).thenReturn(fileStatus); + + Map expected = ImmutableMap.of(NUM_FILES, "2", + TOTAL_SIZE, String.valueOf(2 * fileLength)); + updateTableStatsSlow(db, tbl, wh, false, false, null); + assertThat(tbl.getParameters(), is(expected)); + + // Verify that when stats are already present and forceRecompute is specified they are recomputed + Table tbl1 = new Table(); + tbl1.setDbName(DB_NAME); + tbl1.setTableName(TABLE_NAME); + tbl1.setSd(sd); + tbl1.setParameters(new HashMap()); + tbl1.getParameters().put(NUM_FILES, "0"); + tbl1.getParameters().put(TOTAL_SIZE, "0"); + + when(wh.getFileStatusesForUnpartitionedTable(db, tbl1)).thenReturn(fileStatus); + updateTableStatsSlow(db, tbl1, wh, false, true, null); + assertThat(tbl1.getParameters(), is(expected)); + + // Verify that COLUMN_STATS_ACCURATE is removed from params + Table tbl2 = new Table(); + tbl2.setDbName(DB_NAME); + tbl2.setTableName(TABLE_NAME); + tbl2.setSd(sd); + tbl2.setParameters(new HashMap()); + tbl2.getParameters().put(COLUMN_STATS_ACCURATE, "true"); + + when(wh.getFileStatusesForUnpartitionedTable(db, tbl2)).thenReturn(fileStatus); + updateTableStatsSlow(db, tbl2, wh, false, true, null); + assertThat(tbl2.getParameters(), is(expected)); + + EnvironmentContext context = new EnvironmentContext(ImmutableMap.of(STATS_GENERATED, + StatsSetupConst.TASK)); + + // Verify that if environment context has STATS_GENERATED set to task, + // COLUMN_STATS_ACCURATE in params is set to correct value + Table tbl3 = new Table(); + tbl3.setDbName(DB_NAME); + tbl3.setTableName(TABLE_NAME); + tbl3.setSd(sd); + tbl3.setParameters(new HashMap()); + tbl3.getParameters().put(COLUMN_STATS_ACCURATE, "foo"); + when(wh.getFileStatusesForUnpartitionedTable(db, tbl3)).thenReturn(fileStatus); + updateTableStatsSlow(db, tbl3, wh, false, true, context); + + Map expected1 = ImmutableMap.of(NUM_FILES, "2", + TOTAL_SIZE, String.valueOf(2 * fileLength), + COLUMN_STATS_ACCURATE, "{\"BASIC_STATS\":\"true\"}"); + assertThat(tbl3.getParameters(), is(expected1)); + } + + /** + * Verify that the call to updateTableStatsSlow() removes DO_NOT_UPDATE_STATS from table params. + */ + @Test + public void testUpdateTableStatsSlow_removesDoNotUpdateStats() throws TException { + // Create database and table + Table tbl = new Table(); + tbl.setDbName(DB_NAME); + tbl.setTableName(TABLE_NAME); + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(Collections.singletonList(new FieldSchema("id", "int", ""))); + sd.setLocation("/tmp"); + tbl.setSd(sd); + tbl.setParameters(new HashMap()); + tbl.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, "true"); + + Table tbl1 = new Table(); + tbl1.setDbName(DB_NAME); + tbl1.setTableName(TABLE_NAME); + tbl1.setSd(sd); + tbl1.setParameters(new HashMap()); + tbl1.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, "false"); + + Warehouse wh = mock(Warehouse.class); + updateTableStatsSlow(db, tbl, wh, false, true, null); + Map expected = Collections.emptyMap(); + assertThat(tbl.getParameters(), is(expected)); + verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl); + updateTableStatsSlow(db, tbl1, wh, true, false, null); + assertThat(tbl.getParameters(), is(expected)); + verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl1); + } + + /** + * Verify that updateTableStatsSlow() does not calculate tabe statistics when + *
    + *
  1. newDir is true
  2. + *
  3. Table is partitioned
  4. + *
  5. Stats are already present and forceRecompute isn't set
  6. + *
+ */ + @Test + public void testUpdateTableStatsSlow_doesNotUpdateStats() throws TException { + FieldSchema fs = new FieldSchema("date", "string", "date column"); + List cols = Collections.singletonList(fs); + + // Create database and table + Table tbl = new Table(); + tbl.setDbName(DB_NAME); + tbl.setTableName(TABLE_NAME); + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(Collections.singletonList(new FieldSchema("id", "int", ""))); + sd.setLocation("/tmp"); + tbl.setSd(sd); + tbl.setParameters(new HashMap()); + + Warehouse wh = mock(Warehouse.class); + // newDir(true) => stats not updated + updateTableStatsSlow(db, tbl, wh, true, false, null); + verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl); + + // partitioned table => stats not updated + Table tbl1 = new Table(); + tbl1.setDbName(DB_NAME); + tbl1.setTableName(TABLE_NAME); + tbl1.setPartitionKeys(cols); + tbl1.setSd(sd); + tbl.setParameters(new HashMap()); + + updateTableStatsSlow(db, tbl1, wh, false, false, null); + verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl1); + + // Already contains stats => stats not updated when forceRecompute isn't set + Table tbl2 = new Table(); + tbl2.setDbName(DB_NAME); + tbl2.setTableName(TABLE_NAME); + tbl2.setSd(sd); + tbl2.setParameters(paramsWithStats); + + updateTableStatsSlow(db, tbl2, wh, false, false, null); + verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl2); + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java index 109bc3a50e..fd867f43ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java @@ -447,8 +447,7 @@ private void alterTempTable(String dbname, String tbl_name, } org.apache.hadoop.hive.metastore.api.Table newtCopy = deepCopyAndLowerCaseTable(newt); - MetaStoreUtils.updateTableStatsFast(newtCopy, - getWh().getFileStatusesForSD(newtCopy.getSd()), false, true, envContext); + MetaStoreUtils.updateTableStatsSlow(null, newtCopy, getWh(), false, true, envContext); Table newTable = new Table(newtCopy); String newDbName = newTable.getDbName(); String newTableName = newTable.getTableName(); -- 2.16.3