diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index d38cdc0..06d1daa 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -18,8 +18,28 @@ package org.apache.hive.hcatalog.streaming; +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsPermission; @@ -41,8 +61,10 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.orc.FileDump; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.RecordReader; @@ -52,9 +74,9 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -70,16 +92,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.TimeUnit; - public class TestStreaming { private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class); @@ -235,7 +247,8 @@ private void createStoreSales(String dbName, String loc) throws Exception { String dbUri = "raw://" + new Path(loc).toUri().toString(); String tableLoc = dbUri + Path.SEPARATOR + "store_sales"; - boolean success = runDDL(driver, "create database IF NOT EXISTS " + dbName + " location '" + dbUri + "'"); + boolean success = runDDL(driver, + "create database IF NOT EXISTS " + dbName + " location '" + dbUri + "'"); Assert.assertTrue(success); success = runDDL(driver, "use " + dbName); Assert.assertTrue(success); @@ -405,7 +418,8 @@ public void testTableValidation() throws Exception { runDDL(driver, "create database testBucketing3"); runDDL(driver, "use testBucketing3"); - runDDL(driver, "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into " + runDDL(driver, + "create table " + tbl1 + " ( key1 string, data string ) clustered by ( key1 ) into " + bucketCount + " buckets stored as orc location " + tableLoc) ; runDDL(driver, "create table " + tbl2 + " ( key1 string, data string ) clustered by ( key1 ) into " @@ -644,7 +658,7 @@ public void testHeartbeat() throws Exception { lock = response.getLocks().get(0); Assert.assertEquals("Acquired timestamp didn't match", acquiredAt, lock.getAcquiredat()); Assert.assertTrue("Expected new heartbeat (" + lock.getLastheartbeat() + - ") == old heartbeat(" + heartbeatAt +")", lock.getLastheartbeat() == heartbeatAt); + ") > old heartbeat(" + heartbeatAt + ")", lock.getLastheartbeat() > heartbeatAt); } @Test public void testTransactionBatchEmptyAbort() throws Exception { @@ -712,7 +726,7 @@ public void testTransactionBatchCommit_Delimited() throws Exception { txnBatch.close(); Assert.assertEquals(TransactionBatch.TxnState.INACTIVE - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); connection.close(); @@ -727,12 +741,12 @@ public void testTransactionBatchCommit_Delimited() throws Exception { txnBatch = connection.fetchTransactionBatch(10, writer); txnBatch.beginNextTransaction(); Assert.assertEquals(TransactionBatch.TxnState.OPEN - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); connection.close(); } @@ -815,7 +829,7 @@ public void testRemainingTransactions() throws Exception { txnBatch.close(); Assert.assertEquals(TransactionBatch.TxnState.INACTIVE - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); connection.close(); } @@ -873,7 +887,7 @@ public void testTransactionBatchAbortAndCommit() throws Exception { txnBatch.commit(); checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", - "{2, Welcome to streaming}"); + "{2, Welcome to streaming}"); txnBatch.close(); connection.close(); @@ -909,18 +923,18 @@ public void testMultipleTransactionBatchCommits() throws Exception { txnBatch.commit(); checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", - "{2, Welcome to streaming}", "{3, Hello streaming - once again}"); + "{2, Welcome to streaming}", "{3, Hello streaming - once again}"); txnBatch.beginNextTransaction(); txnBatch.write("4,Welcome to streaming - once again".getBytes()); txnBatch.commit(); checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", - "{2, Welcome to streaming}", "{3, Hello streaming - once again}", - "{4, Welcome to streaming - once again}"); + "{2, Welcome to streaming}", "{3, Hello streaming - once again}", + "{4, Welcome to streaming - once again}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED - , txnBatch.getCurrentTransactionState()); + , txnBatch.getCurrentTransactionState()); txnBatch.close(); @@ -1307,7 +1321,445 @@ public void testErrorHandling() throws Exception { txnBatch.abort(); } - // assumes un partitioned table + @Test + public void testFileDump() throws Exception { + dropDB(msClient, dbName3); + dropDB(msClient, dbName4); + + // 1) Create two bucketed tables + String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db"; + dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths + String[] colNames = "key1,key2,data".split(","); + String[] colTypes = "string,int,string".split(","); + String[] bucketNames = "key1,key2".split(","); + int bucketCount = 4; + createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames + , null, dbLocation, bucketCount); + + String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + ".db"; + dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths + String[] colNames2 = "key3,key4,data2".split(","); + String[] colTypes2 = "string,int,string".split(","); + String[] bucketNames2 = "key3,key4".split(","); + createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2 + , null, dbLocation2, bucketCount); + + + // 2) Insert data into both tables + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); + StreamingConnection connection = endPt.newConnection(false); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("name0,1,Hello streaming".getBytes()); + txnBatch.write("name2,2,Welcome to streaming".getBytes()); + txnBatch.write("name4,2,more Streaming unlimited".getBytes()); + txnBatch.write("name5,2,even more Streaming unlimited".getBytes()); + txnBatch.commit(); + + PrintStream origErr = System.err; + ByteArrayOutputStream myErr = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation}); + System.err.flush(); + System.setErr(origErr); + + String errDump = new String(myErr.toByteArray()); + assertEquals(false, errDump.contains("file(s) are corrupted")); + // since this test runs on local file system which does not have an API to tell if files or + // open or not, we are testing for negative case even though the bucket files are still open + // for writes (transaction batch not closed yet) + assertEquals(false, errDump.contains("is still open for writes.")); + + HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null); + DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2); + StreamingConnection connection2 = endPt2.newConnection(false); + TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2); + txnBatch2.beginNextTransaction(); + + txnBatch2.write("name5,2,fact3".getBytes()); // bucket 0 + txnBatch2.write("name8,2,fact3".getBytes()); // bucket 1 + txnBatch2.write("name0,1,fact1".getBytes()); // bucket 2 + // no data for bucket 3 -- expect 0 length bucket file + + txnBatch2.commit(); + + origErr = System.err; + myErr = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation}); + System.out.flush(); + System.err.flush(); + System.setErr(origErr); + + errDump = new String(myErr.toByteArray()); + assertEquals(false, errDump.contains("Exception")); + assertEquals(false, errDump.contains("file(s) are corrupted")); + assertEquals(false, errDump.contains("is still open for writes.")); + } + + @Test + public void testFileDumpCorruptDataFiles() throws Exception { + dropDB(msClient, dbName3); + + // 1) Create two bucketed tables + String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db"; + dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths + String[] colNames = "key1,key2,data".split(","); + String[] colTypes = "string,int,string".split(","); + String[] bucketNames = "key1,key2".split(","); + int bucketCount = 4; + createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames + , null, dbLocation, bucketCount); + + // 2) Insert data into both tables + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); + StreamingConnection connection = endPt.newConnection(false); + + // we need side file for this test, so we create 2 txn batch and test with only one + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("name0,1,Hello streaming".getBytes()); + txnBatch.write("name2,2,Welcome to streaming".getBytes()); + txnBatch.write("name4,2,more Streaming unlimited".getBytes()); + txnBatch.write("name5,2,even more Streaming unlimited".getBytes()); + txnBatch.commit(); + + // intentionally corrupt some files + Path path = new Path(dbLocation); + Collection files = FileDump.getAllFilesInPath(path, conf); + int readableFooter = -1; + for (String file : files) { + if (file.contains("bucket_00000")) { + // empty out the file + corruptDataFile(file, conf, Integer.MIN_VALUE); + } else if (file.contains("bucket_00001")) { + corruptDataFile(file, conf, -1); + } else if (file.contains("bucket_00002")) { + // since we are adding more bytes we know the length of the file is already readable + Path bPath = new Path(file); + FileSystem fs = bPath.getFileSystem(conf); + FileStatus fileStatus = fs.getFileStatus(bPath); + readableFooter = (int) fileStatus.getLen(); + corruptDataFile(file, conf, 2); + } else if (file.contains("bucket_00003")) { + corruptDataFile(file, conf, 100); + } + } + + PrintStream origErr = System.err; + ByteArrayOutputStream myErr = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation}); + System.err.flush(); + System.setErr(origErr); + + String errDump = new String(myErr.toByteArray()); + assertEquals(false, errDump.contains("Exception")); + assertEquals(true, errDump.contains("4 file(s) are corrupted")); + assertEquals(false, errDump.contains("is still open for writes.")); + + origErr = System.err; + myErr = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation, "--recover", "--skip-dump"}); + System.err.flush(); + System.setErr(origErr); + + errDump = new String(myErr.toByteArray()); + assertEquals(true, errDump.contains("bucket_00000 recovered successfully!")); + assertEquals(true, errDump.contains("No readable footers found. Creating empty orc file.")); + assertEquals(true, errDump.contains("bucket_00001 recovered successfully!")); + assertEquals(true, errDump.contains("bucket_00002 recovered successfully!")); + // check for bucket2's last readable footer offset + assertEquals(true, errDump.contains("Readable footerOffsets: [" + readableFooter + "]")); + assertEquals(true, errDump.contains("bucket_00003 recovered successfully!")); + assertEquals(false, errDump.contains("Exception")); + assertEquals(false, errDump.contains("is still open for writes.")); + + // test after recovery + origErr = System.err; + myErr = new ByteArrayOutputStream(); + + // replace stdout and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation}); + System.err.flush(); + System.setErr(origErr); + + errDump = new String(myErr.toByteArray()); + assertEquals(false, errDump.contains("Exception")); + assertEquals(false, errDump.contains("file(s) are corrupted")); + assertEquals(false, errDump.contains("is still open for writes.")); + + // after recovery there shouldn't be any *_flush_length files + files = FileDump.getAllFilesInPath(path, conf); + for (String file : files) { + assertEquals(false, file.contains("_flush_length")); + } + + txnBatch.close(); + } + + private void corruptDataFile(final String file, final Configuration conf, final int addRemoveBytes) + throws Exception { + Path bPath = new Path(file); + Path cPath = new Path(bPath.getParent(), bPath.getName() + ".corrupt"); + FileSystem fs = bPath.getFileSystem(conf); + FileStatus fileStatus = fs.getFileStatus(bPath); + int len = addRemoveBytes == Integer.MIN_VALUE ? 0 : (int) fileStatus.getLen() + addRemoveBytes; + byte[] buffer = new byte[len]; + FSDataInputStream fdis = fs.open(bPath); + fdis.readFully(0, buffer, 0, (int) Math.min(fileStatus.getLen(), buffer.length)); + fdis.close(); + FSDataOutputStream fdos = fs.create(cPath, true); + fdos.write(buffer, 0, buffer.length); + fdos.close(); + fs.delete(bPath, false); + fs.rename(cPath, bPath); + } + + @Test + public void testFileDumpCorruptSideFiles() throws Exception { + dropDB(msClient, dbName3); + + // 1) Create two bucketed tables + String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db"; + dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths + String[] colNames = "key1,key2,data".split(","); + String[] colTypes = "string,int,string".split(","); + String[] bucketNames = "key1,key2".split(","); + int bucketCount = 4; + createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames + , null, dbLocation, bucketCount); + + // 2) Insert data into both tables + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); + StreamingConnection connection = endPt.newConnection(false); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("name0,1,Hello streaming".getBytes()); + txnBatch.write("name2,2,Welcome to streaming".getBytes()); + txnBatch.write("name4,2,more Streaming unlimited".getBytes()); + txnBatch.write("name5,2,even more Streaming unlimited".getBytes()); + txnBatch.write("name6,3,aHello streaming".getBytes()); + txnBatch.commit(); + + Map> offsetMap = new HashMap>(); + recordOffsets(conf, dbLocation, offsetMap); + + txnBatch.beginNextTransaction(); + txnBatch.write("name01,11,-Hello streaming".getBytes()); + txnBatch.write("name21,21,-Welcome to streaming".getBytes()); + txnBatch.write("name41,21,-more Streaming unlimited".getBytes()); + txnBatch.write("name51,21,-even more Streaming unlimited".getBytes()); + txnBatch.write("name02,12,--Hello streaming".getBytes()); + txnBatch.write("name22,22,--Welcome to streaming".getBytes()); + txnBatch.write("name42,22,--more Streaming unlimited".getBytes()); + txnBatch.write("name52,22,--even more Streaming unlimited".getBytes()); + txnBatch.write("name7,4,aWelcome to streaming".getBytes()); + txnBatch.write("name8,5,amore Streaming unlimited".getBytes()); + txnBatch.write("name9,6,aeven more Streaming unlimited".getBytes()); + txnBatch.write("name10,7,bHello streaming".getBytes()); + txnBatch.write("name11,8,bWelcome to streaming".getBytes()); + txnBatch.write("name12,9,bmore Streaming unlimited".getBytes()); + txnBatch.write("name13,10,beven more Streaming unlimited".getBytes()); + txnBatch.commit(); + + recordOffsets(conf, dbLocation, offsetMap); + + // intentionally corrupt some files + Path path = new Path(dbLocation); + Collection files = FileDump.getAllFilesInPath(path, conf); + for (String file : files) { + if (file.contains("bucket_00000")) { + corruptSideFile(file, conf, offsetMap, "bucket_00000", -1); // corrupt last entry + } else if (file.contains("bucket_00001")) { + corruptSideFile(file, conf, offsetMap, "bucket_00001", 0); // empty out side file + } else if (file.contains("bucket_00002")) { + corruptSideFile(file, conf, offsetMap, "bucket_00002", 3); // total 3 entries (2 valid + 1 fake) + } else if (file.contains("bucket_00003")) { + corruptSideFile(file, conf, offsetMap, "bucket_00003", 10); // total 10 entries (2 valid + 8 fake) + } + } + + PrintStream origErr = System.err; + ByteArrayOutputStream myErr = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation}); + System.err.flush(); + System.setErr(origErr); + + String errDump = new String(myErr.toByteArray()); + assertEquals(true, errDump.contains("bucket_00000_flush_length [length: 11")); + assertEquals(true, errDump.contains("bucket_00001_flush_length [length: 0")); + assertEquals(true, errDump.contains("bucket_00002_flush_length [length: 24")); + assertEquals(true, errDump.contains("bucket_00003_flush_length [length: 80")); + assertEquals(false, errDump.contains("Exception")); + assertEquals(true, errDump.contains("4 file(s) are corrupted")); + assertEquals(false, errDump.contains("is still open for writes.")); + + origErr = System.err; + myErr = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation, "--recover", "--skip-dump"}); + System.err.flush(); + System.setErr(origErr); + + errDump = new String(myErr.toByteArray()); + assertEquals(true, errDump.contains("bucket_00000 recovered successfully!")); + assertEquals(true, errDump.contains("bucket_00001 recovered successfully!")); + assertEquals(true, errDump.contains("bucket_00002 recovered successfully!")); + assertEquals(true, errDump.contains("bucket_00003 recovered successfully!")); + List offsets = offsetMap.get("bucket_00000"); + assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString())); + offsets = offsetMap.get("bucket_00001"); + assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString())); + offsets = offsetMap.get("bucket_00002"); + assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString())); + offsets = offsetMap.get("bucket_00003"); + assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString())); + assertEquals(false, errDump.contains("Exception")); + assertEquals(false, errDump.contains("is still open for writes.")); + + // test after recovery + origErr = System.err; + myErr = new ByteArrayOutputStream(); + + // replace stdout and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation}); + System.err.flush(); + System.setErr(origErr); + + errDump = new String(myErr.toByteArray()); + assertEquals(false, errDump.contains("Exception")); + assertEquals(false, errDump.contains("file(s) are corrupted")); + assertEquals(false, errDump.contains("is still open for writes.")); + + // after recovery there shouldn't be any *_flush_length files + files = FileDump.getAllFilesInPath(path, conf); + for (String file : files) { + assertEquals(false, file.contains("_flush_length")); + } + + txnBatch.close(); + } + + private void corruptSideFile(final String file, final HiveConf conf, + final Map> offsetMap, final String key, final int numEntries) + throws IOException { + Path dataPath = new Path(file); + Path sideFilePath = OrcRecordUpdater.getSideFile(dataPath); + Path cPath = new Path(sideFilePath.getParent(), sideFilePath.getName() + ".corrupt"); + FileSystem fs = sideFilePath.getFileSystem(conf); + List offsets = offsetMap.get(key); + long lastOffset = offsets.get(offsets.size() - 1); + FSDataOutputStream fdos = fs.create(cPath, true); + // corrupt last entry + if (numEntries < 0) { + byte[] lastOffsetBytes = longToBytes(lastOffset); + for (int i = 0; i < offsets.size() - 1; i++) { + fdos.writeLong(offsets.get(i)); + } + + fdos.write(lastOffsetBytes, 0, 3); + } else if (numEntries > 0) { + int firstRun = Math.min(offsets.size(), numEntries); + // add original entries + for (int i=0; i < firstRun; i++) { + fdos.writeLong(offsets.get(i)); + } + + // add fake entries + int remaining = numEntries - firstRun; + for (int i = 0; i < remaining; i++) { + fdos.writeLong(lastOffset + ((i + 1) * 100)); + } + } + + fdos.close(); + fs.delete(sideFilePath, false); + fs.rename(cPath, sideFilePath); + } + + private byte[] longToBytes(long x) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(x); + return buffer.array(); + } + + private void recordOffsets(final HiveConf conf, final String dbLocation, + final Map> offsetMap) throws IOException { + Path path = new Path(dbLocation); + Collection files = FileDump.getAllFilesInPath(path, conf); + for (String file: files) { + Path bPath = new Path(file); + FileSystem fs = bPath.getFileSystem(conf); + FileStatus fileStatus = fs.getFileStatus(bPath); + long len = fileStatus.getLen(); + + if (file.contains("bucket_00000")) { + if (offsetMap.containsKey("bucket_00000")) { + List offsets = offsetMap.get("bucket_00000"); + offsets.add(len); + offsetMap.put("bucket_00000", offsets); + } else { + List offsets = new ArrayList(); + offsets.add(len); + offsetMap.put("bucket_00000", offsets); + } + } else if (file.contains("bucket_00001")) { + if (offsetMap.containsKey("bucket_00001")) { + List offsets = offsetMap.get("bucket_00001"); + offsets.add(len); + offsetMap.put("bucket_00001", offsets); + } else { + List offsets = new ArrayList(); + offsets.add(len); + offsetMap.put("bucket_00001", offsets); + } + } else if (file.contains("bucket_00002")) { + if (offsetMap.containsKey("bucket_00002")) { + List offsets = offsetMap.get("bucket_00002"); + offsets.add(len); + offsetMap.put("bucket_00002", offsets); + } else { + List offsets = new ArrayList(); + offsets.add(len); + offsetMap.put("bucket_00002", offsets); + } + } else if (file.contains("bucket_00003")) { + if (offsetMap.containsKey("bucket_00003")) { + List offsets = offsetMap.get("bucket_00003"); + offsets.add(len); + offsetMap.put("bucket_00003", offsets); + } else { + List offsets = new ArrayList(); + offsets.add(len); + offsetMap.put("bucket_00003", offsets); + } + } + } + } + + // assumes un partitioned table // returns a map > private HashMap> dumpAllBuckets(String dbLocation, String tableName) throws IOException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java index 0e9667a..6b1dfa0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java @@ -32,9 +32,13 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex; @@ -49,6 +53,7 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONWriter; +import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -57,6 +62,16 @@ */ public final class FileDump { public static final String UNKNOWN = "UNKNOWN"; + public static final String SEPARATOR = Strings.repeat("_", 120) + "\n"; + public static final int DEFAULT_BLOCK_SIZE = 256 * 1024 * 1024; + public static final String DEFAULT_BACKUP_PATH = "/tmp"; + public static final PathFilter HIDDEN_AND_SIDE_FILE_FILTER = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith(".") && !name.endsWith( + AcidUtils.DELTA_SIDE_FILE_SUFFIX); + } + }; // not used private FileDump() { @@ -76,6 +91,13 @@ public static void main(String[] args) throws Exception { } boolean dumpData = cli.hasOption('d'); + boolean recover = cli.hasOption("recover"); + boolean skipDump = cli.hasOption("skip-dump"); + String backupPath = DEFAULT_BACKUP_PATH; + if (cli.hasOption("backup-path")) { + backupPath = cli.getOptionValue("backup-path"); + } + if (cli.hasOption("r")) { String[] colStrs = cli.getOptionValue("r").split(","); rowIndexCols = new ArrayList(colStrs.length); @@ -101,24 +123,121 @@ public static void main(String[] args) throws Exception { if (dumpData) { printData(filesInPath, conf); + } else if (recover && skipDump) { + recoverFiles(filesInPath, conf, backupPath); } else { if (jsonFormat) { boolean prettyPrint = cli.hasOption('p'); - JsonFileDump.printJsonMetaData(filesInPath, conf, rowIndexCols, prettyPrint, - printTimeZone); + JsonFileDump.printJsonMetaData(filesInPath, conf, rowIndexCols, prettyPrint, printTimeZone); } else { - printMetaData(filesInPath, conf, rowIndexCols, printTimeZone); + printMetaData(filesInPath, conf, rowIndexCols, printTimeZone, recover, backupPath); } } } - private static Collection getAllFilesInPath(final Path path, + /** + * This method returns an ORC reader object if the specified file is readable. If the specified + * file has side file (_flush_length) file, then max footer offset will be read from the side + * file and orc reader will be created from that offset. Since both data file and side file + * use hflush() for flushing the data, there could be some inconsistencies and both files could be + * out-of-sync. Following are the cases under which null will be returned + * + * 1) If the file specified by path or its side file is still open for writes + * 2) If *_flush_length file does not return any footer offset + * 3) If *_flush_length returns a valid footer offset but the data file is not readable at that + * position (incomplete data file) + * 4) If *_flush_length file length is not a multiple of 8, then reader will be created from + * previous valid footer. If there is no such footer (file length > 0 and < 8), then null will + * be returned + * + * Also, if this method detects any file corruption (mismatch between data file and side file) + * then it will add the corresponding file to the specified input list for corrupted files. + * + * In all other cases, where the file is readable this method will return a reader object. + * + * @param path - file to get reader for + * @param conf - configuration object + * @param corruptFiles - fills this list with all possible corrupted files + * @return - reader for the specified file or null + * @throws IOException + */ + static Reader getReader(final Path path, final Configuration conf, + final List corruptFiles) throws IOException { + FileSystem fs = path.getFileSystem(conf); + long dataFileLen = fs.getFileStatus(path).getLen(); + System.err.println("Processing data file " + path + " [length: " + dataFileLen + "]"); + Path sideFile = OrcRecordUpdater.getSideFile(path); + final boolean sideFileExists = fs.exists(sideFile); + boolean openDataFile = false; + boolean openSideFile = false; + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + openDataFile = !dfs.isFileClosed(path); + openSideFile = sideFileExists && !dfs.isFileClosed(sideFile); + } + + if (openDataFile || openSideFile) { + if (openDataFile && openSideFile) { + System.err.println("Unable to perform file dump as " + path + " and " + sideFile + + " are still open for writes."); + } else if (openSideFile) { + System.err.println("Unable to perform file dump as " + sideFile + + " is still open for writes."); + } else { + System.err.println("Unable to perform file dump as " + path + + " is still open for writes."); + } + + return null; + } + + Reader reader = null; + if (sideFileExists) { + final long maxLen = OrcRawRecordMerger.getLastFlushLength(fs, path); + final long sideFileLen = fs.getFileStatus(sideFile).getLen(); + System.err.println("Found flush length file " + sideFile + + " [length: " + sideFileLen + ", maxFooterOffset: " + maxLen + "]"); + // no offsets read from side file + if (maxLen == -1) { + + // if data file is larger than last flush length, then additional data could be recovered + if (dataFileLen > maxLen) { + System.err.println("Data file has more data than max footer offset:" + maxLen + + ". Adding data file to recovery list."); + corruptFiles.add(path.toUri().toString()); + } + return null; + } + + try { + reader = OrcFile.createReader(path, OrcFile.readerOptions(conf).maxLength(maxLen)); + + // if data file is larger than last flush length, then additional data could be recovered + if (dataFileLen > maxLen) { + System.err.println("Data file has more data than max footer offset:" + maxLen + + ". Adding data file to recovery list."); + corruptFiles.add(path.toUri().toString()); + } + } catch (Exception e) { + corruptFiles.add(path.toUri().toString()); + System.err.println("Unable to read data from max footer offset." + + " Adding data file to recovery list."); + return null; + } + } else { + reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + } + + return reader; + } + + public static Collection getAllFilesInPath(final Path path, final Configuration conf) throws IOException { List filesInPath = Lists.newArrayList(); FileSystem fs = path.getFileSystem(conf); FileStatus fileStatus = fs.getFileStatus(path); if (fileStatus.isDir()) { - FileStatus[] fileStatuses = fs.listStatus(path, AcidUtils.hiddenFileFilter); + FileStatus[] fileStatuses = fs.listStatus(path, HIDDEN_AND_SIDE_FILE_FILTER); for (FileStatus fileInPath : fileStatuses) { if (fileInPath.isDir()) { filesInPath.addAll(getAllFilesInPath(fileInPath.getPath(), conf)); @@ -133,139 +252,356 @@ public static void main(String[] args) throws Exception { return filesInPath; } - private static void printData(List files, Configuration conf) throws IOException, + private static void printData(List files, + Configuration conf) throws IOException, JSONException { for (String file : files) { try { - printJsonData(conf, file); - if (files.size() > 1) { - System.out.println(Strings.repeat("=", 80) + "\n"); + Path path = new Path(file); + Reader reader = getReader(path, conf, null); + if (reader == null) { + continue; } + printJsonData(reader); + System.out.println(SEPARATOR); } catch (Exception e) { System.err.println("Unable to dump data for file: " + file); - e.printStackTrace(); - System.err.println(Strings.repeat("=", 80) + "\n"); continue; } } } private static void printMetaData(List files, Configuration conf, - List rowIndexCols, boolean printTimeZone) throws IOException { + List rowIndexCols, boolean printTimeZone, final boolean recover, + final String backupPath) + throws IOException { + List corruptFiles = Lists.newArrayList(); for (String filename : files) { - try { - Path path = new Path(filename); - Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); - System.out.println("Structure for " + filename); - System.out.println("File Version: " + reader.getFileVersion().getName() + - " with " + reader.getWriterVersion()); - RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); - System.out.println("Rows: " + reader.getNumberOfRows()); - System.out.println("Compression: " + reader.getCompression()); - if (reader.getCompression() != CompressionKind.NONE) { - System.out.println("Compression size: " + reader.getCompressionSize()); + printMetaDataImpl(filename, conf, rowIndexCols, printTimeZone, corruptFiles); + System.out.println(SEPARATOR); + } + + if (!corruptFiles.isEmpty()) { + if (recover) { + recoverFiles(corruptFiles, conf, backupPath); + } else { + System.err.println(corruptFiles.size() + " file(s) are corrupted." + + " Run the following command to recover corrupted files.\n"); + String fileNames = Joiner.on(" ").skipNulls().join(corruptFiles); + System.err.println("hive --orcfiledump --recover --skip-dump " + fileNames); + System.out.println(SEPARATOR); + } + } + } + + private static void printMetaDataImpl(final String filename, + final Configuration conf, final List rowIndexCols, final boolean printTimeZone, + final List corruptFiles) throws IOException { + Path file = new Path(filename); + Reader reader = getReader(file, conf, corruptFiles); + // if we can create reader then footer is not corrupt and file will readable + if (reader == null) { + return; + } + + System.out.println("Structure for " + filename); + System.out.println("File Version: " + reader.getFileVersion().getName() + + " with " + reader.getWriterVersion()); + RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); + System.out.println("Rows: " + reader.getNumberOfRows()); + System.out.println("Compression: " + reader.getCompression()); + if (reader.getCompression() != CompressionKind.NONE) { + System.out.println("Compression size: " + reader.getCompressionSize()); + } + System.out.println("Type: " + reader.getObjectInspector().getTypeName()); + System.out.println("\nStripe Statistics:"); + List stripeStats = reader.getStripeStatistics(); + for (int n = 0; n < stripeStats.size(); n++) { + System.out.println(" Stripe " + (n + 1) + ":"); + StripeStatistics ss = stripeStats.get(n); + for (int i = 0; i < ss.getColumnStatistics().length; ++i) { + System.out.println(" Column " + i + ": " + + ss.getColumnStatistics()[i].toString()); + } + } + ColumnStatistics[] stats = reader.getStatistics(); + int colCount = stats.length; + System.out.println("\nFile Statistics:"); + for (int i = 0; i < stats.length; ++i) { + System.out.println(" Column " + i + ": " + stats[i].toString()); + } + System.out.println("\nStripes:"); + int stripeIx = -1; + for (StripeInformation stripe : reader.getStripes()) { + ++stripeIx; + long stripeStart = stripe.getOffset(); + OrcProto.StripeFooter footer = rows.readStripeFooter(stripe); + if (printTimeZone) { + String tz = footer.getWriterTimezone(); + if (tz == null || tz.isEmpty()) { + tz = UNKNOWN; } - System.out.println("Type: " + reader.getObjectInspector().getTypeName()); - System.out.println("\nStripe Statistics:"); - List stripeStats = reader.getStripeStatistics(); - for (int n = 0; n < stripeStats.size(); n++) { - System.out.println(" Stripe " + (n + 1) + ":"); - StripeStatistics ss = stripeStats.get(n); - for (int i = 0; i < ss.getColumnStatistics().length; ++i) { - System.out.println(" Column " + i + ": " + - ss.getColumnStatistics()[i].toString()); - } + System.out.println(" Stripe: " + stripe.toString() + " timezone: " + tz); + } else { + System.out.println(" Stripe: " + stripe.toString()); + } + long sectionStart = stripeStart; + for (OrcProto.Stream section : footer.getStreamsList()) { + String kind = section.hasKind() ? section.getKind().name() : UNKNOWN; + System.out.println(" Stream: column " + section.getColumn() + + " section " + kind + " start: " + sectionStart + + " length " + section.getLength()); + sectionStart += section.getLength(); + } + for (int i = 0; i < footer.getColumnsCount(); ++i) { + OrcProto.ColumnEncoding encoding = footer.getColumns(i); + StringBuilder buf = new StringBuilder(); + buf.append(" Encoding column "); + buf.append(i); + buf.append(": "); + buf.append(encoding.getKind()); + if (encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY || + encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { + buf.append("["); + buf.append(encoding.getDictionarySize()); + buf.append("]"); } - ColumnStatistics[] stats = reader.getStatistics(); - int colCount = stats.length; - System.out.println("\nFile Statistics:"); - for (int i = 0; i < stats.length; ++i) { - System.out.println(" Column " + i + ": " + stats[i].toString()); + System.out.println(buf); + } + if (rowIndexCols != null && !rowIndexCols.isEmpty()) { + // include the columns that are specified, only if the columns are included, bloom filter + // will be read + boolean[] sargColumns = new boolean[colCount]; + for (int colIdx : rowIndexCols) { + sargColumns[colIdx] = true; } - System.out.println("\nStripes:"); - int stripeIx = -1; - for (StripeInformation stripe : reader.getStripes()) { - ++stripeIx; - long stripeStart = stripe.getOffset(); - OrcProto.StripeFooter footer = rows.readStripeFooter(stripe); - if (printTimeZone) { - String tz = footer.getWriterTimezone(); - if (tz == null || tz.isEmpty()) { - tz = UNKNOWN; - } - System.out.println(" Stripe: " + stripe.toString() + " timezone: " + tz); - } else { - System.out.println(" Stripe: " + stripe.toString()); - } - long sectionStart = stripeStart; - for (OrcProto.Stream section : footer.getStreamsList()) { - String kind = section.hasKind() ? section.getKind().name() : UNKNOWN; - System.out.println(" Stream: column " + section.getColumn() + - " section " + kind + " start: " + sectionStart + - " length " + section.getLength()); - sectionStart += section.getLength(); - } - for (int i = 0; i < footer.getColumnsCount(); ++i) { - OrcProto.ColumnEncoding encoding = footer.getColumns(i); - StringBuilder buf = new StringBuilder(); - buf.append(" Encoding column "); - buf.append(i); - buf.append(": "); - buf.append(encoding.getKind()); - if (encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY || - encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { - buf.append("["); - buf.append(encoding.getDictionarySize()); - buf.append("]"); - } - System.out.println(buf); - } - if (rowIndexCols != null && !rowIndexCols.isEmpty()) { - // include the columns that are specified, only if the columns are included, bloom filter - // will be read - boolean[] sargColumns = new boolean[colCount]; - for (int colIdx : rowIndexCols) { - sargColumns[colIdx] = true; - } - RecordReaderImpl.Index indices = rows - .readRowIndex(stripeIx, null, null, null, sargColumns); - for (int col : rowIndexCols) { - StringBuilder buf = new StringBuilder(); - String rowIdxString = getFormattedRowIndices(col, indices.getRowGroupIndex()); - buf.append(rowIdxString); - String bloomFilString = getFormattedBloomFilters(col, indices.getBloomFilterIndex()); - buf.append(bloomFilString); - System.out.println(buf); + RecordReaderImpl.Index indices = rows + .readRowIndex(stripeIx, null, null, null, sargColumns); + for (int col : rowIndexCols) { + StringBuilder buf = new StringBuilder(); + String rowIdxString = getFormattedRowIndices(col, indices.getRowGroupIndex()); + buf.append(rowIdxString); + String bloomFilString = getFormattedBloomFilters(col, indices.getBloomFilterIndex()); + buf.append(bloomFilString); + System.out.println(buf); + } + } + } + + FileSystem fs = file.getFileSystem(conf); + long fileLen = fs.getFileStatus(file).getLen(); + long paddedBytes = getTotalPaddingSize(reader); + // empty ORC file is ~45 bytes. Assumption here is file length always >0 + double percentPadding = ((double) paddedBytes / (double) fileLen) * 100; + DecimalFormat format = new DecimalFormat("##.##"); + System.out.println("\nFile length: " + fileLen + " bytes"); + System.out.println("Padding length: " + paddedBytes + " bytes"); + System.out.println("Padding ratio: " + format.format(percentPadding) + "%"); + OrcRecordUpdater.AcidStats acidStats = OrcRecordUpdater.parseAcidStats(reader); + if (acidStats != null) { + System.out.println("ACID stats:" + acidStats); + } + rows.close(); + } + + private static void recoverFiles(final List corruptFiles, final Configuration conf, + final String backup) + throws IOException { + for (String corruptFile : corruptFiles) { + System.err.println("Recovering file " + corruptFile); + Path corruptPath = new Path(corruptFile); + FileSystem fs = corruptPath.getFileSystem(conf); + FSDataInputStream fdis = fs.open(corruptPath); + try { + long corruptFileLen = fs.getFileStatus(corruptPath).getLen(); + long remaining = corruptFileLen; + List footerOffsets = Lists.newArrayList(); + + // start reading the data file form top to bottom and record the valid footers + while (remaining > 0) { + int toRead = (int) Math.min(DEFAULT_BLOCK_SIZE, remaining); + byte[] data = new byte[toRead]; + long startPos = corruptFileLen - remaining; + fdis.readFully(startPos, data, 0, toRead); + + // find all MAGIC string and see if the file is readable from there + int index = 0; + long nextFooterOffset; + + while (index != -1) { + index = indexOf(data, OrcFile.MAGIC.getBytes(), index + 1); + if (index != -1) { + nextFooterOffset = startPos + index + OrcFile.MAGIC.length() + 1; + if (isReadable(corruptPath, conf, nextFooterOffset)) { + footerOffsets.add(nextFooterOffset); + } } } - } - FileSystem fs = path.getFileSystem(conf); - long fileLen = fs.getContentSummary(path).getLength(); - long paddedBytes = getTotalPaddingSize(reader); - // empty ORC file is ~45 bytes. Assumption here is file length always >0 - double percentPadding = ((double) paddedBytes / (double) fileLen) * 100; - DecimalFormat format = new DecimalFormat("##.##"); - System.out.println("\nFile length: " + fileLen + " bytes"); - System.out.println("Padding length: " + paddedBytes + " bytes"); - System.out.println("Padding ratio: " + format.format(percentPadding) + "%"); - OrcRecordUpdater.AcidStats acidStats = OrcRecordUpdater.parseAcidStats(reader); - if (acidStats != null) { - System.out.println("ACID stats:" + acidStats); - } - rows.close(); - if (files.size() > 1) { - System.out.println(Strings.repeat("=", 80) + "\n"); + System.err.println("Scanning for valid footers - startPos: " + startPos + + " toRead: " + toRead + " remaining: " + remaining); + remaining = remaining - toRead; } + + System.err.println("Readable footerOffsets: " + footerOffsets); + recoverFile(corruptPath, fs, conf, footerOffsets, backup); } catch (Exception e) { - System.err.println("Unable to dump metadata for file: " + filename); + Path recoveryFile = getRecoveryFile(corruptPath); + if (fs.exists(recoveryFile)) { + fs.delete(recoveryFile, false); + } + System.err.println("Unable to recover file " + corruptFile); e.printStackTrace(); - System.err.println(Strings.repeat("=", 80) + "\n"); + System.err.println(SEPARATOR); continue; + } finally { + fdis.close(); } + System.err.println(corruptFile + " recovered successfully!"); + System.err.println(SEPARATOR); } } + private static void recoverFile(final Path corruptPath, final FileSystem fs, + final Configuration conf, final List footerOffsets, final String backup) + throws IOException { + + // first recover the file to .recovered file and then once successful rename it to actual file + Path recoveredPath = getRecoveryFile(corruptPath); + + // make sure that file does not exist + if (fs.exists(recoveredPath)) { + fs.delete(recoveredPath, false); + } + + // if there are no valid footers, the file should still be readable so create an empty orc file + if (footerOffsets == null || footerOffsets.isEmpty()) { + System.err.println("No readable footers found. Creating empty orc file."); + TypeDescription schema = TypeDescription.createStruct(); + Writer writer = OrcFile.createWriter(recoveredPath, + OrcFile.writerOptions(conf).setSchema(schema)); + writer.close(); + } else { + FSDataInputStream fdis = fs.open(corruptPath); + FileStatus fileStatus = fs.getFileStatus(corruptPath); + // read corrupt file and copy it to recovered file until last valid footer + FSDataOutputStream fdos = fs.create(recoveredPath, true, + conf.getInt("io.file.buffer.size", 4096), + fileStatus.getReplication(), + fileStatus.getBlockSize()); + try { + long fileLen = footerOffsets.get(footerOffsets.size() - 1); + long remaining = fileLen; + + while (remaining > 0) { + int toRead = (int) Math.min(DEFAULT_BLOCK_SIZE, remaining); + byte[] data = new byte[toRead]; + long startPos = fileLen - remaining; + fdis.readFully(startPos, data, 0, toRead); + fdos.write(data); + System.err.println("Copying data to recovery file - startPos: " + startPos + + " toRead: " + toRead + " remaining: " + remaining); + remaining = remaining - toRead; + } + } catch (Exception e) { + fs.delete(recoveredPath, false); + throw new IOException(e); + } finally { + fdis.close(); + fdos.close(); + } + } + + // validate the recovered file once again and start moving corrupt files to backup folder + if (isReadable(recoveredPath, conf, Long.MAX_VALUE)) { + Path backupDataPath; + String scheme = corruptPath.toUri().getScheme(); + String authority = corruptPath.toUri().getAuthority(); + String filePath = corruptPath.toUri().getPath(); + + // use the same filesystem as corrupt file if backup-path is not explicitly specified + if (backup.equals(DEFAULT_BACKUP_PATH)) { + backupDataPath = new Path(scheme, authority, DEFAULT_BACKUP_PATH + filePath); + } else { + backupDataPath = new Path(backup + filePath); + } + + // Move data file to backup path + moveFiles(fs, corruptPath, backupDataPath); + + // Move side file to backup path + Path sideFilePath = OrcRecordUpdater.getSideFile(corruptPath); + Path backupSideFilePath = new Path(backupDataPath.getParent(), sideFilePath.getName()); + moveFiles(fs, sideFilePath, backupSideFilePath); + + // finally move recovered file to actual file + moveFiles(fs, recoveredPath, corruptPath); + + // we are done recovering, backing up and validating + System.err.println("Validation of recovered file successful!"); + } + } + + private static void moveFiles(final FileSystem fs, final Path src, final Path dest) + throws IOException { + try { + // create the dest directory if not exist + if (!fs.exists(dest.getParent())) { + fs.mkdirs(dest.getParent()); + } + + // if the destination file exists for some reason delete it + fs.delete(dest, false); + + if (fs.rename(src, dest)) { + System.err.println("Moved " + src + " to " + dest); + } else { + throw new IOException("Unable to move " + src + " to " + dest); + } + + } catch (Exception e) { + throw new IOException("Unable to move " + src + " to " + dest, e); + } + } + + private static Path getRecoveryFile(final Path corruptPath) { + return new Path(corruptPath.getParent(), corruptPath.getName() + ".recovered"); + } + + private static boolean isReadable(final Path corruptPath, final Configuration conf, + final long maxLen) { + try { + OrcFile.createReader(corruptPath, OrcFile.readerOptions(conf).maxLength(maxLen)); + return true; + } catch (Exception e) { + // ignore this exception as maxLen is unreadable + return false; + } + } + + // search for byte pattern in another byte array + private static int indexOf(final byte[] data, final byte[] pattern, final int index) { + if (data == null || data.length == 0 || pattern == null || pattern.length == 0 || + index > data.length || index < 0) { + return -1; + } + + int j = 0; + for (int i = index; i < data.length; i++) { + if (pattern[j] == data[i]) { + j++; + } else { + j = 0; + } + + if (j == pattern.length) { + return i - pattern.length + 1; + } + } + + return -1; + } + private static String getFormattedBloomFilters(int col, OrcProto.BloomFilterIndex[] bloomFilterIndex) { StringBuilder buf = new StringBuilder(); @@ -387,10 +723,25 @@ static Options createOptions() { .create('j')); result.addOption(OptionBuilder - .withLongOpt("pretty") - .withDescription("Pretty print json metadata output") - .create('p')); + .withLongOpt("pretty") + .withDescription("Pretty print json metadata output") + .create('p')); + + result.addOption(OptionBuilder + .withLongOpt("recover") + .withDescription("recover corrupted orc files generated by streaming") + .create()); + result.addOption(OptionBuilder + .withLongOpt("skip-dump") + .withDescription("used along with --recover to directly recover files without dumping") + .create()); + + result.addOption(OptionBuilder + .withLongOpt("backup-path") + .withDescription("used along with --recover to directly recover files without dumping") + .hasArg() + .create()); return result; } @@ -497,10 +848,7 @@ static void printObject(JSONWriter writer, } } - static void printJsonData(Configuration conf, - String filename) throws IOException, JSONException { - Path path = new Path(filename); - Reader reader = OrcFile.createReader(path.getFileSystem(conf), path); + static void printJsonData(final Reader reader) throws IOException, JSONException { PrintStream printStream = System.out; OutputStreamWriter out = new OutputStreamWriter(printStream, "UTF-8"); RecordReader rows = reader.rows(null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java index 7f673dc..f9a6d9e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java @@ -19,14 +19,15 @@ import java.io.IOException; import java.util.List; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; +import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; -import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONStringer; import org.codehaus.jettison.json.JSONWriter; @@ -35,8 +36,10 @@ */ public class JsonFileDump { - public static void printJsonMetaData(List files, Configuration conf, - List rowIndexCols, boolean prettyPrint, boolean printTimeZone) throws JSONException, IOException { + public static void printJsonMetaData(List files, + Configuration conf, + List rowIndexCols, boolean prettyPrint, boolean printTimeZone) + throws JSONException, IOException { JSONStringer writer = new JSONStringer(); boolean multiFile = files.size() > 1; if (multiFile) { @@ -51,7 +54,11 @@ public static void printJsonMetaData(List files, Configuration conf, } writer.key("fileName").value(filename); Path path = new Path(filename); - Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + Reader reader = FileDump.getReader(path, conf, null); + if (reader == null) { + writer.key("status").value("FAILED"); + continue; + } writer.key("fileVersion").value(reader.getFileVersion().getName()); writer.key("writerVersion").value(reader.getWriterVersion()); RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); @@ -179,8 +186,6 @@ public static void printJsonMetaData(List files, Configuration conf, writer.endObject(); } catch (Exception e) { writer.key("status").value("FAILED"); - System.err.println("Unable to dump data for file: " + filename); - e.printStackTrace(); throw e; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index bc4d2ee..090d63b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -17,10 +17,15 @@ */ package org.apache.hadoop.hive.ql.io.orc; -import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -33,12 +38,10 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; +import com.google.common.annotations.VisibleForTesting; /** * Merges a base and a list of delta files together into a single stream of @@ -537,7 +540,7 @@ private void discoverKeyBounds(Reader reader, * @return the maximum size of the file to use * @throws IOException */ - private static long getLastFlushLength(FileSystem fs, + static long getLastFlushLength(FileSystem fs, Path deltaFile) throws IOException { Path lengths = OrcRecordUpdater.getSideFile(deltaFile); long result = Long.MAX_VALUE; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index 9098e84..d085c58 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -138,7 +138,7 @@ public String toString() { } } - static Path getSideFile(Path main) { + public static Path getSideFile(Path main) { return new Path(main + AcidUtils.DELTA_SIDE_FILE_SUFFIX); } diff --git a/ql/src/test/resources/orc-file-dump-bloomfilter.out b/ql/src/test/resources/orc-file-dump-bloomfilter.out index 90aee15..7c3db78 100644 --- a/ql/src/test/resources/orc-file-dump-bloomfilter.out +++ b/ql/src/test/resources/orc-file-dump-bloomfilter.out @@ -175,3 +175,5 @@ Stripes: File length: 273307 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + diff --git a/ql/src/test/resources/orc-file-dump-bloomfilter2.out b/ql/src/test/resources/orc-file-dump-bloomfilter2.out index a3a8c18..a4f006b 100644 --- a/ql/src/test/resources/orc-file-dump-bloomfilter2.out +++ b/ql/src/test/resources/orc-file-dump-bloomfilter2.out @@ -175,3 +175,5 @@ Stripes: File length: 298416 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + diff --git a/ql/src/test/resources/orc-file-dump-dictionary-threshold.out b/ql/src/test/resources/orc-file-dump-dictionary-threshold.out index a521db7..8ad856d 100644 --- a/ql/src/test/resources/orc-file-dump-dictionary-threshold.out +++ b/ql/src/test/resources/orc-file-dump-dictionary-threshold.out @@ -186,3 +186,5 @@ Stripes: File length: 2217685 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + diff --git a/ql/src/test/resources/orc-file-dump.out b/ql/src/test/resources/orc-file-dump.out index d7ee117..5aaa0f3 100644 --- a/ql/src/test/resources/orc-file-dump.out +++ b/ql/src/test/resources/orc-file-dump.out @@ -191,3 +191,5 @@ Stripes: File length: 270923 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + diff --git a/ql/src/test/resources/orc-file-has-null.out b/ql/src/test/resources/orc-file-has-null.out index bef44a5..6dccf31 100644 --- a/ql/src/test/resources/orc-file-has-null.out +++ b/ql/src/test/resources/orc-file-has-null.out @@ -108,3 +108,5 @@ Stripes: File length: 1940 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + diff --git a/ql/src/test/results/clientpositive/orc_file_dump.q.out b/ql/src/test/results/clientpositive/orc_file_dump.q.out index 50d5701..c7821d6 100644 --- a/ql/src/test/results/clientpositive/orc_file_dump.q.out +++ b/ql/src/test/results/clientpositive/orc_file_dump.q.out @@ -195,6 +195,8 @@ Stripes: File length: 33458 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 124 336 65664 4294967435 74.72 42.47 true bob davidson 2013-03-01 09:11:58.703302 45.40 yard duty PREHOOK: query: alter table orc_ppd set tblproperties("orc.bloom.filter.fpp"="0.01") @@ -313,6 +315,8 @@ Stripes: File length: 38613 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 124 336 65664 4294967435 74.72 42.47 true bob davidson 2013-03-01 09:11:58.703302 45.40 yard duty PREHOOK: query: CREATE TABLE orc_ppd_part(t tinyint, @@ -443,5 +447,7 @@ Stripes: File length: 33458 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 124 336 65664 4294967435 74.72 42.47 true bob davidson 2013-03-01 09:11:58.703302 45.40 yard duty 2015 10