diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index d850062..6e11d7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -293,7 +293,11 @@ static StructObjectInspector createEventSchema(ObjectInspector rowInspector) { int ratio = (int) OrcConf.BASE_DELTA_RATIO.getLong(options.getConfiguration()); writerOptions.bufferSize(baseBufferSizeValue / ratio); writerOptions.stripeSize(baseStripeSizeValue / ratio); - writerOptions.blockPadding(false); + writerOptions.blockPadding(true); + writerOptions.compress(CompressionKind.NONE); + writerOptions.encodingStrategy(org.apache.orc.OrcFile.EncodingStrategy.SPEED); + writerOptions.rowIndexStride(0); + writerOptions.getConfiguration().set(OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getAttribute(), "-1.0"); } writerOptions.fileSystem(fs).callback(indexBuilder); rowInspector = (StructObjectInspector)options.getInspector(); diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 6f63bfb..52e81e8 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -1579,6 +1579,73 @@ public void testFileDump() throws Exception { } @Test + public void testFileDumpDeltaFiles() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); + dropDB(msClient, dbName3); + dropDB(msClient, dbName4); + + // 1) Create two bucketed tables + String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db"; + dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths + String[] colNames = "key1,key2,data".split(","); + String[] colTypes = "string,int,string".split(","); + String[] bucketNames = "key1,key2".split(","); + int bucketCount = 4; + createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames + , null, dbLocation, bucketCount); + + // 2) Insert data into both tables + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); + StreamingConnection connection = endPt.newConnection(false, agentInfo); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt, connection); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("name0,1,streaming".getBytes()); + txnBatch.write("name2,2,streaming".getBytes()); + txnBatch.write("name4,2,unlimited".getBytes()); + txnBatch.write("name5,2,unlimited".getBytes()); + for (int i = 0; i < 6000; i++) { + if (i % 2 == 0) { + txnBatch.write(("name" + i + "," + i + "," + "streaming").getBytes()); + } else { + txnBatch.write(("name" + i + "," + i + "," + "unlimited").getBytes()); + } + } + txnBatch.commit(); + txnBatch.close(); + connection.close(); + + PrintStream origOut = System.out; + ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setOut(new PrintStream(myOut)); + FileDump.main(new String[]{dbLocation}); + System.out.flush(); + System.setOut(origOut); + + String outDump = new String(myOut.toByteArray()); + // make sure delta files are written with no indexes, no compression and no dictionary + // no compression + Assert.assertEquals(true, outDump.contains("Compression: NONE")); + // no stats/indexes + Assert.assertEquals(true, outDump.contains("Column 0: count: 0 hasNull: false")); + Assert.assertEquals(true, outDump.contains("Column 1: count: 0 hasNull: false sum: 0")); + Assert.assertEquals(true, outDump.contains("Column 2: count: 0 hasNull: false sum: 0")); + Assert.assertEquals(true, outDump.contains("Column 3: count: 0 hasNull: false sum: 0")); + Assert.assertEquals(true, outDump.contains("Column 4: count: 0 hasNull: false sum: 0")); + Assert.assertEquals(true, outDump.contains("Column 5: count: 0 hasNull: false sum: 0")); + Assert.assertEquals(true, outDump.contains("Column 6: count: 0 hasNull: false")); + Assert.assertEquals(true, outDump.contains("Column 7: count: 0 hasNull: false")); + Assert.assertEquals(true, outDump.contains("Column 8: count: 0 hasNull: false sum: 0")); + Assert.assertEquals(true, outDump.contains("Column 9: count: 0 hasNull: false")); + // no dictionary + Assert.assertEquals(true, outDump.contains("Encoding column 7: DIRECT_V2")); + Assert.assertEquals(true, outDump.contains("Encoding column 9: DIRECT_V2")); + } + + @Test public void testFileDumpCorruptDataFiles() throws Exception { dropDB(msClient, dbName3);