commit 661174ab4bea7f0c07bd53466a8326bccdf9934d Author: Owen O'Malley Date: Mon Sep 14 11:52:58 2015 -0700 HIVE-11807. Optimize buffer size. diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 7aa8d65..2031732 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.OutputStream; -import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.sql.Timestamp; import java.util.ArrayList; @@ -211,7 +210,8 @@ public Writer getWriter() { if (allColumns == null) { allColumns = getColumnNamesFromInspector(inspector); } - this.bufferSize = getEstimatedBufferSize(allColumns, bufferSize); + this.bufferSize = getEstimatedBufferSize(defaultStripeSize, + countColumns(inspector), bufferSize); if (version == OrcFile.Version.V_0_11) { /* do not write bloom filters for ORC v11 */ this.bloomFilterColumns = @@ -244,49 +244,58 @@ private String getColumnNamesFromInspector(ObjectInspector inspector) { return joiner.join(fieldNames); } - @VisibleForTesting - int getEstimatedBufferSize(int bs) { - return getEstimatedBufferSize(conf.get(IOConstants.COLUMNS), bs); - } - - int getEstimatedBufferSize(String colNames, int bs) { - long availableMem = getMemoryAvailableForORC(); - if (colNames != null) { - final int numCols = colNames.split(",").length; - if (numCols > COLUMN_COUNT_THRESHOLD) { - // In BufferedStream, there are 3 outstream buffers (compressed, - // uncompressed and overflow) and list of previously compressed buffers. - // Since overflow buffer is rarely used, lets consider only 2 allocation. - // Also, initially, the list of compression buffers will be empty. - final int outStreamBuffers = codec == null ? 1 : 2; - - // max possible streams per column is 5. For string columns, there is - // ROW_INDEX, PRESENT, DATA, LENGTH, DICTIONARY_DATA streams. - final int maxStreams = 5; - - // Lets assume 10% memory for holding dictionary in memory and other - // object allocations - final long miscAllocation = (long) (0.1f * availableMem); - - // compute the available memory - final long remainingMem = availableMem - miscAllocation; - - int estBufferSize = (int) (remainingMem / - (maxStreams * outStreamBuffers * numCols)); - estBufferSize = getClosestBufferSize(estBufferSize, bs); - if (estBufferSize > bs) { - estBufferSize = bs; + static int countColumns(ObjectInspector oi) { + switch (oi.getCategory()) { + case PRIMITIVE: + return 1; + case STRUCT: { + int result = 1; + for(StructField field: + ((StructObjectInspector) oi).getAllStructFieldRefs()) { + result += countColumns(field.getFieldObjectInspector()); } - - LOG.info("WIDE TABLE - Number of columns: " + numCols + - " Chosen compression buffer size: " + estBufferSize); - return estBufferSize; + return result; + } + case UNION: { + int result = 1; + for(ObjectInspector child: + ((UnionObjectInspector) oi).getObjectInspectors()) { + result += countColumns(child); + } + return result; + } + case LIST: + return 1 + countColumns( + ((ListObjectInspector)oi).getListElementObjectInspector()); + case MAP: { + MapObjectInspector moi = (MapObjectInspector) oi; + return 1 + countColumns(moi.getMapKeyObjectInspector()) + + countColumns(moi.getMapValueObjectInspector()); } + default: + throw new IllegalArgumentException("Unknown category " + + oi.getCategory()); } - return bs; } - private int getClosestBufferSize(int estBufferSize, int bs) { + @VisibleForTesting + static int getEstimatedBufferSize(long stripeSize, int numColumns, int bs) { + // The worst case is that there are 2 big streams per a column and + // we want to guarantee that each stream gets ~10 buffers. + // This keeps buffers small enough that we don't get really small stripe + // sizes. + int estBufferSize = (int) (stripeSize / (20 * numColumns)); + estBufferSize = getClosestBufferSize(estBufferSize); + if (estBufferSize > bs) { + estBufferSize = bs; + } else { + LOG.info("WIDE TABLE - Number of columns: " + numColumns + + " Chosen compression buffer size: " + estBufferSize); + } + return estBufferSize; + } + + private static int getClosestBufferSize(int estBufferSize) { final int kb4 = 4 * 1024; final int kb8 = 8 * 1024; final int kb16 = 16 * 1024; @@ -311,15 +320,6 @@ private int getClosestBufferSize(int estBufferSize, int bs) { } } - // the assumption is only one ORC writer open at a time, which holds true for - // most of the cases. HIVE-6455 forces single writer case. - private long getMemoryAvailableForORC() { - double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf); - long totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean(). - getHeapMemoryUsage().getMax() * maxLoad); - return totalMemoryPool; - } - public static CompressionCodec createCodec(CompressionKind kind) { switch (kind) { case NONE: diff --git ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcWideTable.java ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcWideTable.java index a3d3ec5..6b6cb2c 100644 --- ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcWideTable.java +++ ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcWideTable.java @@ -38,241 +38,39 @@ public class TestOrcWideTable { - private static final int MEMORY_FOR_ORC = 512 * 1024 * 1024; - Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test" - + File.separator + "tmp")); - - Configuration conf; - FileSystem fs; - Path testFilePath; - float memoryPercent; - - @Rule - public TestName testCaseName = new TestName(); - - @Before - public void openFileSystem() throws Exception { - conf = new Configuration(); - fs = FileSystem.getLocal(conf); - testFilePath = new Path(workDir, "TestOrcFile." + testCaseName.getMethodName() + ".orc"); - fs.delete(testFilePath, false); - // make sure constant memory is available for ORC always - memoryPercent = (float) MEMORY_FOR_ORC / (float) ManagementFactory.getMemoryMXBean(). - getHeapMemoryUsage().getMax(); - conf.setFloat(HiveConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL.varname, memoryPercent); - } - @Test public void testBufferSizeFor1Col() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 128 * 1024; - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(bufferSize, newBufferSize); - } + assertEquals(128 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + 1, 128*1024)); } @Test - public void testBufferSizeFor1000Col() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 128 * 1024; - String columns = getRandomColumnNames(1000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(bufferSize, newBufferSize); - } + public void testBufferSizeFor50Col() throws IOException { + assertEquals(256 * 1024, WriterImpl.getEstimatedBufferSize(256 * 1024 * 1024, + 50, 256*1024)); } @Test - public void testBufferSizeFor2000Col() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 256 * 1024; - String columns = getRandomColumnNames(2000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.ZLIB).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(32 * 1024, newBufferSize); - } + public void testBufferSizeFor1000Col() throws IOException { + assertEquals(32 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + 1000, 128*1024)); } @Test - public void testBufferSizeFor2000ColNoCompression() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 256 * 1024; - String columns = getRandomColumnNames(2000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(64 * 1024, newBufferSize); - } + public void testBufferSizeFor2000Col() throws IOException { + assertEquals(16 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + 2000, 256*1024)); } @Test public void testBufferSizeFor4000Col() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 256 * 1024; - String columns = getRandomColumnNames(4000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.ZLIB).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(16 * 1024, newBufferSize); - } - } - - @Test - public void testBufferSizeFor4000ColNoCompression() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 256 * 1024; - String columns = getRandomColumnNames(4000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(32 * 1024, newBufferSize); - } + assertEquals(8 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + 4000, 256*1024)); } @Test public void testBufferSizeFor25000Col() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 256 * 1024; - String columns = getRandomColumnNames(25000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - // 4K is the minimum buffer size - assertEquals(4 * 1024, newBufferSize); - } - } - - @Test - public void testBufferSizeManualOverride1() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 1024; - String columns = getRandomColumnNames(2000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(bufferSize, newBufferSize); - } - } - - @Test - public void testBufferSizeManualOverride2() throws IOException { - ObjectInspector inspector; - synchronized (TestOrcFile.class) { - inspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, - ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - } - int bufferSize = 2 * 1024; - String columns = getRandomColumnNames(4000); - // just for testing. manually write the column names - conf.set(IOConstants.COLUMNS, columns); - Writer writer = OrcFile.createWriter( - testFilePath, - OrcFile.writerOptions(conf).inspector(inspector).stripeSize(100000) - .compress(CompressionKind.NONE).bufferSize(bufferSize)); - final int newBufferSize; - if (writer instanceof WriterImpl) { - WriterImpl orcWriter = (WriterImpl) writer; - newBufferSize = orcWriter.getEstimatedBufferSize(bufferSize); - assertEquals(bufferSize, newBufferSize); - } - } - - private String getRandomColumnNames(int n) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < n - 1; i++) { - sb.append("col").append(i).append(","); - } - sb.append("col").append(n - 1); - return sb.toString(); + assertEquals(4 * 1024, WriterImpl.getEstimatedBufferSize(512 * 1024 * 1024, + 25000, 256*1024)); } }