diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java index 8e72a1275a5cdcc2d778080fff6bb82198395f5f..75d55afd69305bcfacdd1cf5a88bfffedab58879 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatUtil.java @@ -462,12 +462,17 @@ public static HiveStorageHandler getStorageHandler(Configuration conf, Map jobProperties = new HashMap(); try { - tableDesc.getJobProperties().put( - HCatConstants.HCAT_KEY_JOB_INFO, - HCatUtil.serialize(inputJobInfo)); - storageHandler.configureInputJobProperties(tableDesc, - jobProperties); + Map properties = tableDesc.getJobProperties(); + LinkedList inputJobInfos = (LinkedList) HCatUtil.deserialize( + properties.get(HCatConstants.HCAT_KEY_JOB_INFO)); + if (inputJobInfos == null) { + inputJobInfos = new LinkedList<>(); + } + inputJobInfos.add(inputJobInfo); + properties.put(HCatConstants.HCAT_KEY_JOB_INFO, HCatUtil.serialize(inputJobInfos)); + + storageHandler.configureInputJobProperties(tableDesc, jobProperties); } catch (IOException e) { throw new IllegalStateException( @@ -757,4 +762,35 @@ public static void assertNotNull(Object t, String msg, Logger logger) { throw new IllegalArgumentException(msg); } } + + public static void putInputJobInfoToConf(InputJobInfo inputJobInfo, Configuration conf) + throws IOException { + + LinkedList inputJobInfos = (LinkedList) HCatUtil.deserialize( + conf.get(HCatConstants.HCAT_KEY_JOB_INFO)); + + if (inputJobInfos == null) { + inputJobInfos = new LinkedList<>(); + } + inputJobInfos.add(inputJobInfo); + conf.set(HCatConstants.HCAT_KEY_JOB_INFO, HCatUtil.serialize(inputJobInfos)); + } + + public static LinkedList getInputJobInfosFromConf(Configuration conf) + throws IOException { + LinkedList inputJobInfos = (LinkedList) HCatUtil.deserialize( + conf.get(HCatConstants.HCAT_KEY_JOB_INFO)); + return inputJobInfos; + } + + public static InputJobInfo getLastInputJobInfosFromConf(Configuration conf) + throws IOException { + LinkedList inputJobInfos = getInputJobInfosFromConf(conf); + if (inputJobInfos == null || inputJobInfos.isEmpty()) { + return null; + } else { + return getInputJobInfosFromConf(conf).getLast(); + } + } + } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index 195eaa367933990e3ef0ef879f34049c65822aee..17e94fd485111ea1247a413bf11a8a12f42f036e 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -44,6 +44,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -113,7 +114,12 @@ public void configureInputJobProperties(TableDesc tableDesc, String jobInfoProperty = tableProperties.get(HCatConstants.HCAT_KEY_JOB_INFO); if (jobInfoProperty != null) { - InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobInfoProperty); + LinkedList inputJobInfos = (LinkedList) HCatUtil.deserialize + (jobInfoProperty); + if (inputJobInfos == null || inputJobInfos.isEmpty()) { + throw new IOException("No InputJobInfo was set in job config"); + } + InputJobInfo inputJobInfo = inputJobInfos.getLast(); HCatTableInfo tableInfo = inputJobInfo.getTableInfo(); HCatSchema dataColumns = tableInfo.getDataColumns(); diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java index 8d7a8f9df9412105ec7d77fad9af0d7dd18f4323..002f63f7bbc75b30bc43e450a748ba65ccb60d42 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java @@ -190,9 +190,8 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema) PartInfo partitionInfo = hcatSplit.getPartitionInfo(); // Ensure PartInfo's TableInfo is initialized. if (partitionInfo.getTableInfo() == null) { - partitionInfo.setTableInfo(((InputJobInfo)HCatUtil.deserialize( - taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO) - )).getTableInfo()); + partitionInfo.setTableInfo( + HCatUtil.getLastInputJobInfosFromConf(taskContext.getConfiguration()).getTableInfo()); } JobContext jobContext = taskContext; Configuration conf = jobContext.getConfiguration(); @@ -281,14 +280,13 @@ public static HCatSchema getTableSchema(Configuration conf) */ private static InputJobInfo getJobInfo(Configuration conf) throws IOException { - String jobString = conf.get( - HCatConstants.HCAT_KEY_JOB_INFO); - if (jobString == null) { + InputJobInfo inputJobInfo = HCatUtil.getLastInputJobInfosFromConf(conf); + if (inputJobInfo == null) { throw new IOException("job information not found in JobContext." + " HCatInputFormat.setInput() not called?"); } - return (InputJobInfo) HCatUtil.deserialize(jobString); + return inputJobInfo; } private List setInputPath(JobConf jobConf, String location) diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatInputFormat.java index ad6f3eb9f93338023863c6239d6af0449b20ff9c..a7befa9f0df8562ac2318cc8974e8ce49ca3b79b 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatInputFormat.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatInputFormat.java @@ -27,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; -import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.schema.HCatSchema; @@ -145,13 +144,13 @@ public HCatInputFormat setProperties(Properties properties) throws IOException { } /** - * Return partitioning columns for this input, can only be called after setInput is called. + * Return partitioning columns for this input, can only be called after setInput is called, + * since that takes care of adding a populated InputJobInfo object to its list in this job conf. * @return partitioning columns of the table specified by the job. * @throws IOException */ public static HCatSchema getPartitionColumns(Configuration conf) throws IOException { - InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize( - conf.get(HCatConstants.HCAT_KEY_JOB_INFO)); + InputJobInfo inputInfo = HCatUtil.getLastInputJobInfosFromConf(conf); Preconditions.checkNotNull(inputInfo, "inputJobInfo is null, setInput has not yet been called to save job into conf supplied."); return inputInfo.getTableInfo().getPartitionColumns(); @@ -159,13 +158,13 @@ public static HCatSchema getPartitionColumns(Configuration conf) throws IOExcept } /** - * Return data columns for this input, can only be called after setInput is called. + * Return data columns for this input, can only be called after setInput is called, + * since that takes care of adding a populated InputJobInfo object to its list in this job conf. * @return data columns of the table specified by the job. * @throws IOException */ public static HCatSchema getDataColumns(Configuration conf) throws IOException { - InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize( - conf.get(HCatConstants.HCAT_KEY_JOB_INFO)); + InputJobInfo inputInfo = HCatUtil.getLastInputJobInfosFromConf(conf); Preconditions.checkNotNull(inputInfo, "inputJobInfo is null, setInput has not yet been called to save job into conf supplied."); return inputInfo.getTableInfo().getDataColumns(); diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java index 364382d9ccf6eb9fc29689b0eb5f973f422051b4..2c8e7064c21fdea4485b8a3239f3515574b8e9e9 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InitializeInput.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.common.ErrorType; -import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hive.hcatalog.data.schema.HCatSchema; @@ -68,8 +67,8 @@ public static void setInput(Job job, InputJobInfo theirInputJobInfo) throws Exce * * After calling setInput, InputJobInfo can be retrieved from the job configuration as follows: * {code} - * InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize( - * job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)); + * LinkedList<InputJobInfo> inputInfo = (LinkedList<InputJobInfo>) HCatUtil + * .deserialize(job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)); * {code} * * @param conf the job Configuration object @@ -83,15 +82,15 @@ public static void setInput(Configuration conf, theirInputJobInfo.getTableName(), theirInputJobInfo.getFilter(), theirInputJobInfo.getProperties()); - conf.set( - HCatConstants.HCAT_KEY_JOB_INFO, - HCatUtil.serialize(getInputJobInfo(conf, inputJobInfo, null))); + + populateInputJobInfo(conf, inputJobInfo, null); + HCatUtil.putInputJobInfoToConf(inputJobInfo, conf); } /** * Returns the given InputJobInfo after populating with data queried from the metadata service. */ - private static InputJobInfo getInputJobInfo( + private static void populateInputJobInfo( Configuration conf, InputJobInfo inputJobInfo, String locationFilter) throws Exception { IMetaStoreClient client = null; HiveConf hiveConf = null; @@ -141,7 +140,6 @@ private static InputJobInfo getInputJobInfo( } inputJobInfo.setPartitions(partInfoList); - return inputJobInfo; } finally { HCatUtil.closeHiveClientQuietly(client); } diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java index ac1dd54be821d32aa008d41514df05a41f16223c..cd759cbe87859b07ce839b5d1ae992a636f76e1b 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hive.metastore.Warehouse; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -159,15 +161,25 @@ public Properties getProperties() { * Serialize this object, compressing the partitions which can exceed the * allowed jobConf size. * @see HCATALOG-453 + * Partitions will be compressed and put into a byte array which then we have to write into the + * ObjectOutputStream this method is given. */ private void writeObject(ObjectOutputStream oos) throws IOException { oos.defaultWriteObject(); + + ByteArrayOutputStream serialObj = new ByteArrayOutputStream(); + ObjectOutputStream objStream = new ObjectOutputStream(serialObj); Deflater def = new Deflater(Deflater.BEST_COMPRESSION); ObjectOutputStream partInfoWriter = - new ObjectOutputStream(new DeflaterOutputStream(oos, def)); + new ObjectOutputStream(new DeflaterOutputStream(objStream, def)); partInfoWriter.writeObject(partitions); + + //Closing only the writer used for compression byte stream partInfoWriter.close(); + + //Appending the compressed partition information + oos.writeObject(serialObj.toByteArray()); } /** @@ -179,8 +191,12 @@ private void writeObject(ObjectOutputStream oos) private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { ois.defaultReadObject(); + + //Next object in the stream will be a byte array of partition information which is compressed + ObjectInputStream pis = new ObjectInputStream(new ByteArrayInputStream( + (byte[])ois.readObject())); ObjectInputStream partInfoReader = - new ObjectInputStream(new InflaterInputStream(ois)); + new ObjectInputStream(new InflaterInputStream(pis)); partitions = (List)partInfoReader.readObject(); if (partitions != null) { for (PartInfo partInfo : partitions) { @@ -189,5 +205,7 @@ private void readObject(ObjectInputStream ois) } } } + //Closing only the reader used for decompression byte stream + partInfoReader.close(); } } diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHCatUtil.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHCatUtil.java index 91aa4fa2693e0b0bd65c1667210af340619f552d..8b0a77af74fa2442d8682916c6fcba61488e8c3f 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHCatUtil.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/common/TestHCatUtil.java @@ -23,7 +23,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.metastore.TableType; @@ -35,12 +40,11 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.apache.hive.hcatalog.mapreduce.InputJobInfo; + import org.junit.Assert; import org.junit.Test; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - public class TestHCatUtil { @Test @@ -181,4 +185,29 @@ public void testGetTableSchemaWithPtnColsSerDeReportedFields() throws IOExceptio Assert.assertEquals(new HCatSchema(expectedHCatSchema), HCatUtil.getTableSchemaWithPtnCols(table)); } + + @Test + public void testInputJobInfoInConf() throws Exception { + Configuration conf = new Configuration(false); + + InputJobInfo inputJobInfo = HCatUtil.getLastInputJobInfosFromConf(conf); + Assert.assertNull(inputJobInfo); + List inputJobInfos = HCatUtil.getInputJobInfosFromConf(conf); + Assert.assertNull(inputJobInfos); + + InputJobInfo inputJobInfo0 = InputJobInfo.create("db", "table", "",new Properties()); + InputJobInfo inputJobInfo1 = InputJobInfo.create("db", "table2", "",new Properties()); + + HCatUtil.putInputJobInfoToConf(inputJobInfo0, conf); + HCatUtil.putInputJobInfoToConf(inputJobInfo1, conf); + + inputJobInfo = HCatUtil.getLastInputJobInfosFromConf(conf); + inputJobInfos = HCatUtil.getInputJobInfosFromConf(conf); + + Assert.assertEquals(inputJobInfo1.getDatabaseName(), inputJobInfo.getDatabaseName()); + Assert.assertEquals(inputJobInfo1.getTableName(), inputJobInfo.getTableName()); + Assert.assertEquals(inputJobInfo0.getDatabaseName(), inputJobInfos.get(0).getDatabaseName()); + Assert.assertEquals(inputJobInfo0.getTableName(), inputJobInfos.get(0).getTableName()); + + } } diff --git a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java index c3bde2d2a3cbd09fb0b1ed758bf4f2b1041a23cb..696e081d03eb0ccc92a01a145fa2b68cb523f852 100644 --- a/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java +++ b/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Enumeration; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -132,8 +133,7 @@ public void setLocation(String location, Job job) throws IOException { Job clone = new Job(job.getConfiguration()); HCatInputFormat.setInput(job, dbName, tableName, getPartitionFilterString()); - InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize( - job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)); + InputJobInfo inputJobInfo = HCatUtil.getLastInputJobInfosFromConf(job.getConfiguration()); SpecialCases.addSpecialCasesParametersForHCatLoader(job.getConfiguration(), inputJobInfo.getTableInfo()); @@ -261,9 +261,22 @@ public void setPartitionFilter(Expression partitionFilter) throws IOException { public ResourceStatistics getStatistics(String location, Job job) throws IOException { try { ResourceStatistics stats = new ResourceStatistics(); - InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize( - job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO)); - stats.setSizeInBytes(getSizeInBytes(inputJobInfo)); + long inputSize = -1; + + LinkedList inputJobInfos = HCatUtil.getInputJobInfosFromConf( + job.getConfiguration()); + + for (InputJobInfo inputJobInfo : inputJobInfos) { + if (location.equals(inputJobInfo.getTableName())) { + inputSize = getSizeInBytes(inputJobInfo); + break; + } + } + + if (inputSize == -1) { + throw new IOException("Could not calculate input size for location (table) " + location); + } + stats.setSizeInBytes(inputSize); return stats; } catch (Exception e) { throw new IOException(e); diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/AbstractHCatLoaderTest.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/AbstractHCatLoaderTest.java index 58981f88ef6abfbf7a4b7ffc3116c53d47e86fde..d64d31f46ec725cf37e2eebcf00f681dff2aa94a 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/AbstractHCatLoaderTest.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/AbstractHCatLoaderTest.java @@ -78,6 +78,7 @@ private static final String COMPLEX_TABLE = "junit_unparted_complex"; private static final String PARTITIONED_TABLE = "junit_parted_basic"; private static final String SPECIFIC_SIZE_TABLE = "junit_specific_size"; + private static final String SPECIFIC_SIZE_TABLE_2 = "junit_specific_size2"; private static final String PARTITIONED_DATE_TABLE = "junit_parted_date"; private Map> basicInputData; @@ -149,6 +150,7 @@ public void setUpTest() throws Exception { createTable(PARTITIONED_TABLE, "a int, b string", "bkt string"); createTable(SPECIFIC_SIZE_TABLE, "a int, b string"); + createTable(SPECIFIC_SIZE_TABLE_2, "a int, b string"); createTable(PARTITIONED_DATE_TABLE, "b string", "dt date"); AllTypesTable.setupAllTypesTable(driver); @@ -211,6 +213,7 @@ public void tearDown() throws Exception { dropTable(COMPLEX_TABLE); dropTable(PARTITIONED_TABLE); dropTable(SPECIFIC_SIZE_TABLE); + dropTable(SPECIFIC_SIZE_TABLE_2); dropTable(PARTITIONED_DATE_TABLE); dropTable(AllTypesTable.ALL_PRIMITIVE_TYPES_TABLE); } @@ -560,10 +563,63 @@ public void testGetInputBytes() throws Exception { HCatLoader hCatLoader = new HCatLoader(); hCatLoader.setUDFContextSignature("testGetInputBytes"); hCatLoader.setLocation(SPECIFIC_SIZE_TABLE, job); - ResourceStatistics statistics = hCatLoader.getStatistics(file.getAbsolutePath(), job); + ResourceStatistics statistics = hCatLoader.getStatistics(SPECIFIC_SIZE_TABLE, job); assertEquals(2048, (long) statistics.getmBytes()); } + /** + * Simulates Pig relying on HCatLoader to inform about input size of multiple tables + * @throws Exception + */ + @Test + public void testGetInputBytesMultipleTables() throws Exception { + File file = new File(TEST_WAREHOUSE_DIR + "/" + SPECIFIC_SIZE_TABLE + "/part-m-00000"); + file.deleteOnExit(); + RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); + randomAccessFile.setLength(987654321L); + randomAccessFile.close(); + file = new File(TEST_WAREHOUSE_DIR + "/" + SPECIFIC_SIZE_TABLE_2 + "/part-m-00000"); + file.deleteOnExit(); + randomAccessFile = new RandomAccessFile(file, "rw"); + randomAccessFile.setLength(12345678L); + randomAccessFile.close(); + Job job = new Job(); + HCatLoader hCatLoader = new HCatLoader(); + + //Mocking that Pig would assign different signature for each POLoad operator + hCatLoader.setUDFContextSignature("testGetInputBytesMultipleTables" + SPECIFIC_SIZE_TABLE); + hCatLoader.setLocation(SPECIFIC_SIZE_TABLE, job); + + hCatLoader.setUDFContextSignature("testGetInputBytesMultipleTables" + SPECIFIC_SIZE_TABLE_2); + hCatLoader.setLocation(SPECIFIC_SIZE_TABLE_2, job); + + hCatLoader.setUDFContextSignature("testGetInputBytesMultipleTables" + PARTITIONED_TABLE); + hCatLoader.setLocation(PARTITIONED_TABLE, job); + + long specificTableSize = -1; + long specificTableSize2 = -1; + long partitionedTableSize = -1; + + ResourceStatistics statistics = hCatLoader.getStatistics(SPECIFIC_SIZE_TABLE, job); + specificTableSize=statistics.getSizeInBytes(); + assertEquals(987654321, specificTableSize); + + statistics = hCatLoader.getStatistics(SPECIFIC_SIZE_TABLE_2, job); + specificTableSize2=statistics.getSizeInBytes(); + assertEquals(12345678, specificTableSize2); + + statistics = hCatLoader.getStatistics(PARTITIONED_TABLE, job); + partitionedTableSize=statistics.getSizeInBytes(); + //Partitioned table size here is dependent on underlying storage format, it's ~ 20