From afa943c3c0c4302adc03ddfa6a037bbef08fecb5 Mon Sep 17 00:00:00 2001 From: Alexander Kolbasov Date: Thu, 19 Apr 2018 08:46:41 -0700 Subject: [PATCH 1/1] HIVE-18743: CREATE TABLE on S3 data can be extremely slow. DO_NOT_UPDATE_STATS workaround is buggy. --- .../hadoop/hive/metastore/HiveAlterHandler.java | 2 +- .../hadoop/hive/metastore/HiveMetaStore.java | 2 +- .../hive/metastore/utils/MetaStoreUtils.java | 90 ++++----- .../hive/metastore/utils/TestMetaStoreUtils.java | 215 +++++++++++++++++++-- 4 files changed, 244 insertions(+), 65 deletions(-) diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 60bed9841f..0be0aaa10c 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -313,7 +313,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam !isPartitionedTable) { Database db = msdb.getDatabase(catName, newDbName); // Update table stats. For partitioned table, we update stats in alterPartition() - MetaStoreUtils.updateTableStatsFast(db, newt, wh, false, true, environmentContext, false); + MetaStoreUtils.updateTableStatsSlow(db, newt, wh, false, true, environmentContext); } if (isPartitionedTable) { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index ae9ec5cad8..f6dc91d0ed 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -1796,7 +1796,7 @@ private void create_table_core(final RawStore ms, final Table tbl, } if (MetastoreConf.getBoolVar(conf, ConfVars.STATS_AUTO_GATHER) && !MetaStoreUtils.isView(tbl)) { - MetaStoreUtils.updateTableStatsFast(db, tbl, wh, madeDir, false, envContext, true); + MetaStoreUtils.updateTableStatsSlow(db, tbl, wh, madeDir, false, envContext); } // set create time diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java index d022bc0343..8c159e9aa3 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreUtils.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.metastore.utils; -import org.apache.hadoop.hive.metastore.api.ISchemaName; import org.apache.hadoop.hive.metastore.api.WMPoolSchedulingPolicy; import com.google.common.base.Predicates; @@ -642,70 +641,63 @@ public static boolean isFastStatsSame(Partition oldPart, Partition newPart) { return false; } - public static boolean updateTableStatsFast(Database db, Table tbl, Warehouse wh, boolean madeDir, - boolean forceRecompute, EnvironmentContext environmentContext, boolean isCreate) throws MetaException { - if (tbl.getPartitionKeysSize() != 0) return false; - // Update stats only when unpartitioned - // TODO: this is also invalid for ACID tables, except for the create case by coincidence; - // because the methods in metastore get all the files in the table directory without - // regard for ACID state. - List fileStatuses = wh.getFileStatusesForUnpartitionedTable(db, tbl); - return updateTableStatsFast( - tbl, fileStatuses, madeDir, forceRecompute, environmentContext, isCreate); - } - /** * Updates the numFiles and totalSize parameters for the passed Table by querying * the warehouse if the passed Table does not already have values for these parameters. - * @param tbl - * @param fileStatus + * NOTE: This function is rather expensive since it needs to traverse the file system to get all + * the information. + * * @param newDir if true, the directory was just created and can be assumed to be empty * @param forceRecompute Recompute stats even if the passed Table already has * these parameters set - * @return true if the stats were updated, false otherwise */ - private static boolean updateTableStatsFast(Table tbl, List fileStatus, - boolean newDir, boolean forceRecompute, EnvironmentContext environmentContext, - boolean isCreate) throws MetaException { - + public static void updateTableStatsSlow(Database db, Table tbl, Warehouse wh, + boolean newDir, boolean forceRecompute, + EnvironmentContext environmentContext) throws MetaException { + // DO_NOT_UPDATE_STATS is supposed to be a transient parameter that is only passed via RPC + // We want to avoid this property from being persistent. + // + // NOTE: If this property *is* set as table property we will remove it which is incorrect but + // we can't distinguish between these two cases + // + // This problem was introduced by HIVE-10228. A better approach would be to pass the property + // via the environment context. Map params = tbl.getParameters(); - - if ((params!=null) && params.containsKey(StatsSetupConst.DO_NOT_UPDATE_STATS)){ - boolean doNotUpdateStats = Boolean.valueOf(params.get(StatsSetupConst.DO_NOT_UPDATE_STATS)); + boolean updateStats = true; + if ((params != null) && params.containsKey(StatsSetupConst.DO_NOT_UPDATE_STATS)) { + updateStats = !Boolean.valueOf(params.get(StatsSetupConst.DO_NOT_UPDATE_STATS)); params.remove(StatsSetupConst.DO_NOT_UPDATE_STATS); - tbl.setParameters(params); // to make sure we remove this marker property - if (doNotUpdateStats){ - return false; - } } - if (!forceRecompute && params != null && containsAllFastStats(params)) return false; + if (!updateStats || newDir || tbl.getPartitionKeysSize() != 0) { + return; + } + + // If stats are already present and forceRecompute isn't set, nothing to do + if (!forceRecompute && params != null && containsAllFastStats(params)) { + return; + } + + // NOTE: wh.getFileStatusesForUnpartitionedTable() can be REALLY slow + List fileStatus = wh.getFileStatusesForUnpartitionedTable(db, tbl); if (params == null) { params = new HashMap<>(); - } - if (!isCreate && MetaStoreUtils.isTransactionalTable(tbl.getParameters())) { - // TODO: we should use AcidUtils.getAcidFilesForStats, but cannot access it from metastore. - LOG.warn("Not updating fast stats for a transactional table " + tbl.getTableName()); tbl.setParameters(params); - return true; } - if (!newDir) { - // The table location already exists and may contain data. - // Let's try to populate those stats that don't require full scan. - LOG.info("Updating table stats fast for " + tbl.getTableName()); - populateQuickStats(fileStatus, params); - LOG.info("Updated size of table " + tbl.getTableName() +" to "+ params.get(StatsSetupConst.TOTAL_SIZE)); - if (environmentContext != null - && environmentContext.isSetProperties() - && StatsSetupConst.TASK.equals(environmentContext.getProperties().get( - StatsSetupConst.STATS_GENERATED))) { - StatsSetupConst.setBasicStatsState(params, StatsSetupConst.TRUE); - } else { - StatsSetupConst.setBasicStatsState(params, StatsSetupConst.FALSE); - } + // The table location already exists and may contain data. + // Let's try to populate those stats that don't require full scan. + LOG.info("Updating table stats for {}", tbl.getTableName()); + populateQuickStats(fileStatus, params); + LOG.info("Updated size of table {} to {}", + tbl.getTableName(), params.get(StatsSetupConst.TOTAL_SIZE)); + if (environmentContext != null + && environmentContext.isSetProperties() + && StatsSetupConst.TASK.equals(environmentContext.getProperties().get( + StatsSetupConst.STATS_GENERATED))) { + StatsSetupConst.setBasicStatsState(params, StatsSetupConst.TRUE); + } else { + StatsSetupConst.setBasicStatsState(params, StatsSetupConst.FALSE); } - tbl.setParameters(params); - return true; } /** This method is invalid for MM and ACID tables unless fileStatus comes from AcidUtils. */ diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/utils/TestMetaStoreUtils.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/utils/TestMetaStoreUtils.java index d6c13d3f2a..55ff1502d4 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/utils/TestMetaStoreUtils.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/utils/TestMetaStoreUtils.java @@ -18,21 +18,60 @@ package org.apache.hadoop.hive.metastore.utils; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder; +import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; +import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static org.apache.hadoop.hive.common.StatsSetupConst.COLUMN_STATS_ACCURATE; +import static org.apache.hadoop.hive.common.StatsSetupConst.NUM_FILES; +import static org.apache.hadoop.hive.common.StatsSetupConst.STATS_GENERATED; +import static org.apache.hadoop.hive.common.StatsSetupConst.TOTAL_SIZE; +import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.updateTableStatsSlow; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @Category(MetastoreUnitTest.class) public class TestMetaStoreUtils { + private static final String DB_NAME = "db1"; + private static final String TABLE_NAME = "tbl1"; + + private final Map paramsWithStats = ImmutableMap.of(NUM_FILES, "1", TOTAL_SIZE, "2"); + + private Database db; + + public TestMetaStoreUtils() { + try { + db = new DatabaseBuilder().setName(DB_NAME).build(null); + } catch (TException e) { + e.printStackTrace(); + } + } + @Test public void testTrimMapNullsXform() throws Exception { Map m = new HashMap<>(); @@ -40,14 +79,11 @@ public void testTrimMapNullsXform() throws Exception { m.put("blank",""); m.put("null",null); + Map expected = ImmutableMap.of("akey", "aval", + "blank", "", "null", ""); + Map xformed = MetaStoreUtils.trimMapNulls(m,true); - assertEquals(3,xformed.size()); - assert(xformed.containsKey("akey")); - assert(xformed.containsKey("blank")); - assert(xformed.containsKey("null")); - assertEquals("aval",xformed.get("akey")); - assertEquals("",xformed.get("blank")); - assertEquals("",xformed.get("null")); + assertThat(xformed, is(expected)); } @Test @@ -56,15 +92,10 @@ public void testTrimMapNullsPrune() throws Exception { m.put("akey","aval"); m.put("blank",""); m.put("null",null); + Map expected = ImmutableMap.of("akey", "aval", "blank", ""); Map pruned = MetaStoreUtils.trimMapNulls(m,false); - assertEquals(2,pruned.size()); - assert(pruned.containsKey("akey")); - assert(pruned.containsKey("blank")); - assert(!pruned.containsKey("null")); - assertEquals("aval",pruned.get("akey")); - assertEquals("",pruned.get("blank")); - assert(!pruned.containsValue(null)); + assertThat(pruned, is(expected)); } @Test @@ -82,6 +113,162 @@ public void testcolumnsIncludedByNameType() { Assert.assertFalse(org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.columnsIncludedByNameType(Arrays.asList(col1, col2), Arrays.asList(col1))); } + /** + * Verify that updateTableStatsSlow really updates table statistics. + * The test does the following: + *
    + *
  1. Create database
  2. + *
  3. Create unpartitioned table
  4. + *
  5. Create unpartitioned table which has params
  6. + *
  7. Call updateTableStatsSlow with arguments which should caue stats calculation
  8. + *
  9. Verify table statistics using mocked warehouse
  10. + *
  11. Create table which already have stats
  12. + *
  13. Call updateTableStatsSlow forcing stats recompute
  14. + *
  15. Verify table statistics using mocked warehouse
  16. + *
  17. Verifies behavior when STATS_GENERATED is set in environment context
  18. + *
+ */ + @Test + public void testUpdateTableStatsSlow_statsUpdated() throws TException { + long fileLength = 5; + + // Create database and table + Table tbl = new TableBuilder() + .setDbName(DB_NAME) + .setTableName(TABLE_NAME) + .addCol("id", "int") + .build(null); + + + // Set up mock warehouse + FileStatus fs1 = new FileStatus(1, true, 2, 3, + 4, new Path("/tmp/0")); + FileStatus fs2 = new FileStatus(fileLength, false, 3, 4, + 5, new Path("/tmp/1")); + FileStatus fs3 = new FileStatus(fileLength, false, 3, 4, + 5, new Path("/tmp/1")); + List fileStatus = Arrays.asList(fs1, fs2, fs3); + Warehouse wh = mock(Warehouse.class); + when(wh.getFileStatusesForUnpartitionedTable(db, tbl)).thenReturn(fileStatus); + + Map expected = ImmutableMap.of(NUM_FILES, "2", + TOTAL_SIZE, String.valueOf(2 * fileLength)); + updateTableStatsSlow(db, tbl, wh, false, false, null); + assertThat(tbl.getParameters(), is(expected)); + + // Verify that when stats are already present and forceRecompute is specified they are recomputed + Table tbl1 = new TableBuilder() + .setDbName(DB_NAME) + .setTableName(TABLE_NAME) + .addCol("id", "int") + .addTableParam(NUM_FILES, "0") + .addTableParam(TOTAL_SIZE, "0") + .build(null); + when(wh.getFileStatusesForUnpartitionedTable(db, tbl1)).thenReturn(fileStatus); + updateTableStatsSlow(db, tbl1, wh, false, true, null); + assertThat(tbl1.getParameters(), is(expected)); + + // Verify that COLUMN_STATS_ACCURATE is removed from params + Table tbl2 = new TableBuilder() + .setDbName(DB_NAME) + .setTableName(TABLE_NAME) + .addCol("id", "int") + .addTableParam(COLUMN_STATS_ACCURATE, "true") + .build(null); + when(wh.getFileStatusesForUnpartitionedTable(db, tbl2)).thenReturn(fileStatus); + updateTableStatsSlow(db, tbl2, wh, false, true, null); + assertThat(tbl2.getParameters(), is(expected)); + EnvironmentContext context = new EnvironmentContext(ImmutableMap.of(STATS_GENERATED, + StatsSetupConst.TASK)); + // Verify that if environment context has STATS_GENERATED set to task, + // COLUMN_STATS_ACCURATE in params is set to correct value + Table tbl3 = new TableBuilder() + .setDbName(DB_NAME) + .setTableName(TABLE_NAME) + .addCol("id", "int") + .addTableParam(COLUMN_STATS_ACCURATE, "foo") // The value doesn't matter + .build(null); + when(wh.getFileStatusesForUnpartitionedTable(db, tbl3)).thenReturn(fileStatus); + updateTableStatsSlow(db, tbl3, wh, false, true, context); + + Map expected1 = ImmutableMap.of(NUM_FILES, "2", + TOTAL_SIZE, String.valueOf(2 * fileLength), + COLUMN_STATS_ACCURATE, "{\"BASIC_STATS\":\"true\"}"); + assertThat(tbl3.getParameters(), is(expected1)); + } + + /** + * Verify that the call to updateTableStatsSlow() removes DO_NOT_UPDATE_STATS from table params. + */ + @Test + public void testUpdateTableStatsSlow_removesDoNotUpdateStats() throws TException { + // Create database and table + Table tbl = new TableBuilder() + .setDbName(DB_NAME) + .setTableName(TABLE_NAME) + .addCol("id", "int") + .addTableParam(StatsSetupConst.DO_NOT_UPDATE_STATS, "true") + .build(null); + Table tbl1 = new TableBuilder() + .setDbName(DB_NAME) + .setTableName(TABLE_NAME) + .addCol("id", "int") + .addTableParam(StatsSetupConst.DO_NOT_UPDATE_STATS, "false") + .build(null); + Warehouse wh = mock(Warehouse.class); + updateTableStatsSlow(db, tbl, wh, false, true, null); + assertThat(tbl.getParameters(), is(Collections.emptyMap())); + verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl); + updateTableStatsSlow(db, tbl1, wh, true, false, null); + assertThat(tbl.getParameters(), is(Collections.emptyMap())); + verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl1); + } + + /** + * Verify that updateTableStatsSlow() does not calculate tabe statistics when + *
    + *
  1. newDir is true
  2. + *
  3. Table is partitioned
  4. + *
  5. Stats are already present and forceRecompute isn't set
  6. + *
+ */ + @Test + public void testUpdateTableStatsSlow_doesNotUpdateStats() throws TException { + // Create database and table + FieldSchema fs = new FieldSchema("date", "string", "date column"); + List cols = Collections.singletonList(fs); + + Table tbl = new TableBuilder() + .setDbName(DB_NAME) + .setTableName(TABLE_NAME) + .addCol("id", "int") + .build(null); + Warehouse wh = mock(Warehouse.class); + // newDir(true) => stats not updated + updateTableStatsSlow(db, tbl, wh, true, false, null); + verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl); + + // partitioned table => stats not updated + Table tbl1 = new TableBuilder() + .setDbName(DB_NAME) + .setTableName(TABLE_NAME) + .addCol("id", "int") + .setPartCols(cols) + .build(null); + updateTableStatsSlow(db, tbl1, wh, false, false, null); + verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl1); + + // Already contains stats => stats not updated when forceRecompute isn't set + Table tbl2 = new TableBuilder() + .setDbName(DB_NAME) + .setTableName(TABLE_NAME) + .addCol("id", "int") + .setTableParams(paramsWithStats) + .build(null); + updateTableStatsSlow(db, tbl2, wh, false, false, null); + verify(wh, never()).getFileStatusesForUnpartitionedTable(db, tbl2); + } } + -- 2.16.3