diff --git a/bin/ext/orcfiledump.cmd b/bin/ext/orcfiledump.cmd index e818109..f78ed7f 100644 --- a/bin/ext/orcfiledump.cmd +++ b/bin/ext/orcfiledump.cmd @@ -31,5 +31,5 @@ if [%1]==[orcfiledump_help] goto :orcfiledump_help goto :EOF :orcfiledump_help - echo "usage hive --orcfiledump [-d] [--rowindex _col_ids_] " + echo "usage hive --orcfiledump [-d] [--rowindex=_csv_col_ids_] [-t] [-j] [-p] [--recover] [--skip-dump] [--backup-path=_path_for_backup_] " goto :EOF diff --git a/bin/ext/orcfiledump.sh b/bin/ext/orcfiledump.sh index 6139de2..74f1a1e 100644 --- a/bin/ext/orcfiledump.sh +++ b/bin/ext/orcfiledump.sh @@ -23,12 +23,15 @@ orcfiledump () { } orcfiledump_help () { - echo "usage ./hive orcfiledump [-h] [-j] [-p] [-t] [-d] [-r ] " + echo "usage ./hive orcfiledump [-h] [-j] [-p] [-t] [-d] [-r ] [--recover] [--skip-dump] [--backup-path ] " echo "" echo " --json (-j) Print metadata in JSON format" echo " --pretty (-p) Pretty print json metadata output" echo " --timezone (-t) Print writer's time zone" echo " --data (-d) Should the data be printed" - echo " --rowindex (-r) <_col_ids_> Comma separated list of column ids for which row index should be printed" + echo " --rowindex (-r) Comma separated list of column ids for which row index should be printed" + echo " --recover Recover corrupted orc files generated by streaming" + echo " --skip-dump Used along with --recover to directly recover files without dumping" + echo " --backup-path Specify a backup path to store the corrupted files (default: /tmp)" echo " --help (-h) Print help message" } diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index d79c18f..3458b65 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -18,8 +18,26 @@ package org.apache.hive.hcatalog.streaming; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsPermission; @@ -41,8 +59,10 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.orc.FileDump; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.RecordReader; @@ -52,9 +72,9 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -70,16 +90,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.TimeUnit; - public class TestStreaming { private static final Logger LOG = LoggerFactory.getLogger(TestStreaming.class); @@ -1194,7 +1204,445 @@ private void runCmdOnDriver(String cmd) throws QueryFailedException { boolean t = runDDL(driver, cmd); Assert.assertTrue(cmd + " failed", t); } - + + + @Test + public void testFileDump() throws Exception { + dropDB(msClient, dbName3); + dropDB(msClient, dbName4); + + // 1) Create two bucketed tables + String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db"; + dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths + String[] colNames = "key1,key2,data".split(","); + String[] colTypes = "string,int,string".split(","); + String[] bucketNames = "key1,key2".split(","); + int bucketCount = 4; + createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames + , null, dbLocation, bucketCount); + + String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + ".db"; + dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths + String[] colNames2 = "key3,key4,data2".split(","); + String[] colTypes2 = "string,int,string".split(","); + String[] bucketNames2 = "key3,key4".split(","); + createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2 + , null, dbLocation2, bucketCount); + + + // 2) Insert data into both tables + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); + StreamingConnection connection = endPt.newConnection(false); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("name0,1,Hello streaming".getBytes()); + txnBatch.write("name2,2,Welcome to streaming".getBytes()); + txnBatch.write("name4,2,more Streaming unlimited".getBytes()); + txnBatch.write("name5,2,even more Streaming unlimited".getBytes()); + txnBatch.commit(); + + PrintStream origErr = System.err; + ByteArrayOutputStream myErr = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation}); + System.err.flush(); + System.setErr(origErr); + + String errDump = new String(myErr.toByteArray()); + Assert.assertEquals(false, errDump.contains("file(s) are corrupted")); + // since this test runs on local file system which does not have an API to tell if files or + // open or not, we are testing for negative case even though the bucket files are still open + // for writes (transaction batch not closed yet) + Assert.assertEquals(false, errDump.contains("is still open for writes.")); + + HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null); + DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2); + StreamingConnection connection2 = endPt2.newConnection(false); + TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2); + txnBatch2.beginNextTransaction(); + + txnBatch2.write("name5,2,fact3".getBytes()); // bucket 0 + txnBatch2.write("name8,2,fact3".getBytes()); // bucket 1 + txnBatch2.write("name0,1,fact1".getBytes()); // bucket 2 + // no data for bucket 3 -- expect 0 length bucket file + + txnBatch2.commit(); + + origErr = System.err; + myErr = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation}); + System.out.flush(); + System.err.flush(); + System.setErr(origErr); + + errDump = new String(myErr.toByteArray()); + Assert.assertEquals(false, errDump.contains("Exception")); + Assert.assertEquals(false, errDump.contains("file(s) are corrupted")); + Assert.assertEquals(false, errDump.contains("is still open for writes.")); + } + + @Test + public void testFileDumpCorruptDataFiles() throws Exception { + dropDB(msClient, dbName3); + + // 1) Create two bucketed tables + String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db"; + dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths + String[] colNames = "key1,key2,data".split(","); + String[] colTypes = "string,int,string".split(","); + String[] bucketNames = "key1,key2".split(","); + int bucketCount = 4; + createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames + , null, dbLocation, bucketCount); + + // 2) Insert data into both tables + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); + StreamingConnection connection = endPt.newConnection(false); + + // we need side file for this test, so we create 2 txn batch and test with only one + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("name0,1,Hello streaming".getBytes()); + txnBatch.write("name2,2,Welcome to streaming".getBytes()); + txnBatch.write("name4,2,more Streaming unlimited".getBytes()); + txnBatch.write("name5,2,even more Streaming unlimited".getBytes()); + txnBatch.commit(); + + // intentionally corrupt some files + Path path = new Path(dbLocation); + Collection files = FileDump.getAllFilesInPath(path, conf); + int readableFooter = -1; + for (String file : files) { + if (file.contains("bucket_00000")) { + // empty out the file + corruptDataFile(file, conf, Integer.MIN_VALUE); + } else if (file.contains("bucket_00001")) { + corruptDataFile(file, conf, -1); + } else if (file.contains("bucket_00002")) { + // since we are adding more bytes we know the length of the file is already readable + Path bPath = new Path(file); + FileSystem fs = bPath.getFileSystem(conf); + FileStatus fileStatus = fs.getFileStatus(bPath); + readableFooter = (int) fileStatus.getLen(); + corruptDataFile(file, conf, 2); + } else if (file.contains("bucket_00003")) { + corruptDataFile(file, conf, 100); + } + } + + PrintStream origErr = System.err; + ByteArrayOutputStream myErr = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation}); + System.err.flush(); + System.setErr(origErr); + + String errDump = new String(myErr.toByteArray()); + Assert.assertEquals(false, errDump.contains("Exception")); + Assert.assertEquals(true, errDump.contains("4 file(s) are corrupted")); + Assert.assertEquals(false, errDump.contains("is still open for writes.")); + + origErr = System.err; + myErr = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation, "--recover", "--skip-dump"}); + System.err.flush(); + System.setErr(origErr); + + errDump = new String(myErr.toByteArray()); + Assert.assertEquals(true, errDump.contains("bucket_00000 recovered successfully!")); + Assert.assertEquals(true, errDump.contains("No readable footers found. Creating empty orc file.")); + Assert.assertEquals(true, errDump.contains("bucket_00001 recovered successfully!")); + Assert.assertEquals(true, errDump.contains("bucket_00002 recovered successfully!")); + // check for bucket2's last readable footer offset + Assert.assertEquals(true, errDump.contains("Readable footerOffsets: [" + readableFooter + "]")); + Assert.assertEquals(true, errDump.contains("bucket_00003 recovered successfully!")); + Assert.assertEquals(false, errDump.contains("Exception")); + Assert.assertEquals(false, errDump.contains("is still open for writes.")); + + // test after recovery + origErr = System.err; + myErr = new ByteArrayOutputStream(); + + // replace stdout and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation}); + System.err.flush(); + System.setErr(origErr); + + errDump = new String(myErr.toByteArray()); + Assert.assertEquals(false, errDump.contains("Exception")); + Assert.assertEquals(false, errDump.contains("file(s) are corrupted")); + Assert.assertEquals(false, errDump.contains("is still open for writes.")); + + // after recovery there shouldn't be any *_flush_length files + files = FileDump.getAllFilesInPath(path, conf); + for (String file : files) { + Assert.assertEquals(false, file.contains("_flush_length")); + } + + txnBatch.close(); + } + + private void corruptDataFile(final String file, final Configuration conf, final int addRemoveBytes) + throws Exception { + Path bPath = new Path(file); + Path cPath = new Path(bPath.getParent(), bPath.getName() + ".corrupt"); + FileSystem fs = bPath.getFileSystem(conf); + FileStatus fileStatus = fs.getFileStatus(bPath); + int len = addRemoveBytes == Integer.MIN_VALUE ? 0 : (int) fileStatus.getLen() + addRemoveBytes; + byte[] buffer = new byte[len]; + FSDataInputStream fdis = fs.open(bPath); + fdis.readFully(0, buffer, 0, (int) Math.min(fileStatus.getLen(), buffer.length)); + fdis.close(); + FSDataOutputStream fdos = fs.create(cPath, true); + fdos.write(buffer, 0, buffer.length); + fdos.close(); + fs.delete(bPath, false); + fs.rename(cPath, bPath); + } + + @Test + public void testFileDumpCorruptSideFiles() throws Exception { + dropDB(msClient, dbName3); + + // 1) Create two bucketed tables + String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db"; + dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths + String[] colNames = "key1,key2,data".split(","); + String[] colTypes = "string,int,string".split(","); + String[] bucketNames = "key1,key2".split(","); + int bucketCount = 4; + createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames + , null, dbLocation, bucketCount); + + // 2) Insert data into both tables + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); + StreamingConnection connection = endPt.newConnection(false); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("name0,1,Hello streaming".getBytes()); + txnBatch.write("name2,2,Welcome to streaming".getBytes()); + txnBatch.write("name4,2,more Streaming unlimited".getBytes()); + txnBatch.write("name5,2,even more Streaming unlimited".getBytes()); + txnBatch.write("name6,3,aHello streaming".getBytes()); + txnBatch.commit(); + + Map> offsetMap = new HashMap>(); + recordOffsets(conf, dbLocation, offsetMap); + + txnBatch.beginNextTransaction(); + txnBatch.write("name01,11,-Hello streaming".getBytes()); + txnBatch.write("name21,21,-Welcome to streaming".getBytes()); + txnBatch.write("name41,21,-more Streaming unlimited".getBytes()); + txnBatch.write("name51,21,-even more Streaming unlimited".getBytes()); + txnBatch.write("name02,12,--Hello streaming".getBytes()); + txnBatch.write("name22,22,--Welcome to streaming".getBytes()); + txnBatch.write("name42,22,--more Streaming unlimited".getBytes()); + txnBatch.write("name52,22,--even more Streaming unlimited".getBytes()); + txnBatch.write("name7,4,aWelcome to streaming".getBytes()); + txnBatch.write("name8,5,amore Streaming unlimited".getBytes()); + txnBatch.write("name9,6,aeven more Streaming unlimited".getBytes()); + txnBatch.write("name10,7,bHello streaming".getBytes()); + txnBatch.write("name11,8,bWelcome to streaming".getBytes()); + txnBatch.write("name12,9,bmore Streaming unlimited".getBytes()); + txnBatch.write("name13,10,beven more Streaming unlimited".getBytes()); + txnBatch.commit(); + + recordOffsets(conf, dbLocation, offsetMap); + + // intentionally corrupt some files + Path path = new Path(dbLocation); + Collection files = FileDump.getAllFilesInPath(path, conf); + for (String file : files) { + if (file.contains("bucket_00000")) { + corruptSideFile(file, conf, offsetMap, "bucket_00000", -1); // corrupt last entry + } else if (file.contains("bucket_00001")) { + corruptSideFile(file, conf, offsetMap, "bucket_00001", 0); // empty out side file + } else if (file.contains("bucket_00002")) { + corruptSideFile(file, conf, offsetMap, "bucket_00002", 3); // total 3 entries (2 valid + 1 fake) + } else if (file.contains("bucket_00003")) { + corruptSideFile(file, conf, offsetMap, "bucket_00003", 10); // total 10 entries (2 valid + 8 fake) + } + } + + PrintStream origErr = System.err; + ByteArrayOutputStream myErr = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation}); + System.err.flush(); + System.setErr(origErr); + + String errDump = new String(myErr.toByteArray()); + Assert.assertEquals(true, errDump.contains("bucket_00000_flush_length [length: 11")); + Assert.assertEquals(true, errDump.contains("bucket_00001_flush_length [length: 0")); + Assert.assertEquals(true, errDump.contains("bucket_00002_flush_length [length: 24")); + Assert.assertEquals(true, errDump.contains("bucket_00003_flush_length [length: 80")); + Assert.assertEquals(false, errDump.contains("Exception")); + Assert.assertEquals(true, errDump.contains("4 file(s) are corrupted")); + Assert.assertEquals(false, errDump.contains("is still open for writes.")); + + origErr = System.err; + myErr = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation, "--recover", "--skip-dump"}); + System.err.flush(); + System.setErr(origErr); + + errDump = new String(myErr.toByteArray()); + Assert.assertEquals(true, errDump.contains("bucket_00000 recovered successfully!")); + Assert.assertEquals(true, errDump.contains("bucket_00001 recovered successfully!")); + Assert.assertEquals(true, errDump.contains("bucket_00002 recovered successfully!")); + Assert.assertEquals(true, errDump.contains("bucket_00003 recovered successfully!")); + List offsets = offsetMap.get("bucket_00000"); + Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString())); + offsets = offsetMap.get("bucket_00001"); + Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString())); + offsets = offsetMap.get("bucket_00002"); + Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString())); + offsets = offsetMap.get("bucket_00003"); + Assert.assertEquals(true, errDump.contains("Readable footerOffsets: " + offsets.toString())); + Assert.assertEquals(false, errDump.contains("Exception")); + Assert.assertEquals(false, errDump.contains("is still open for writes.")); + + // test after recovery + origErr = System.err; + myErr = new ByteArrayOutputStream(); + + // replace stdout and run command + System.setErr(new PrintStream(myErr)); + FileDump.main(new String[]{dbLocation}); + System.err.flush(); + System.setErr(origErr); + + errDump = new String(myErr.toByteArray()); + Assert.assertEquals(false, errDump.contains("Exception")); + Assert.assertEquals(false, errDump.contains("file(s) are corrupted")); + Assert.assertEquals(false, errDump.contains("is still open for writes.")); + + // after recovery there shouldn't be any *_flush_length files + files = FileDump.getAllFilesInPath(path, conf); + for (String file : files) { + Assert.assertEquals(false, file.contains("_flush_length")); + } + + txnBatch.close(); + } + + private void corruptSideFile(final String file, final HiveConf conf, + final Map> offsetMap, final String key, final int numEntries) + throws IOException { + Path dataPath = new Path(file); + Path sideFilePath = OrcRecordUpdater.getSideFile(dataPath); + Path cPath = new Path(sideFilePath.getParent(), sideFilePath.getName() + ".corrupt"); + FileSystem fs = sideFilePath.getFileSystem(conf); + List offsets = offsetMap.get(key); + long lastOffset = offsets.get(offsets.size() - 1); + FSDataOutputStream fdos = fs.create(cPath, true); + // corrupt last entry + if (numEntries < 0) { + byte[] lastOffsetBytes = longToBytes(lastOffset); + for (int i = 0; i < offsets.size() - 1; i++) { + fdos.writeLong(offsets.get(i)); + } + + fdos.write(lastOffsetBytes, 0, 3); + } else if (numEntries > 0) { + int firstRun = Math.min(offsets.size(), numEntries); + // add original entries + for (int i=0; i < firstRun; i++) { + fdos.writeLong(offsets.get(i)); + } + + // add fake entries + int remaining = numEntries - firstRun; + for (int i = 0; i < remaining; i++) { + fdos.writeLong(lastOffset + ((i + 1) * 100)); + } + } + + fdos.close(); + fs.delete(sideFilePath, false); + fs.rename(cPath, sideFilePath); + } + + private byte[] longToBytes(long x) { + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.putLong(x); + return buffer.array(); + } + + private void recordOffsets(final HiveConf conf, final String dbLocation, + final Map> offsetMap) throws IOException { + Path path = new Path(dbLocation); + Collection files = FileDump.getAllFilesInPath(path, conf); + for (String file: files) { + Path bPath = new Path(file); + FileSystem fs = bPath.getFileSystem(conf); + FileStatus fileStatus = fs.getFileStatus(bPath); + long len = fileStatus.getLen(); + + if (file.contains("bucket_00000")) { + if (offsetMap.containsKey("bucket_00000")) { + List offsets = offsetMap.get("bucket_00000"); + offsets.add(len); + offsetMap.put("bucket_00000", offsets); + } else { + List offsets = new ArrayList(); + offsets.add(len); + offsetMap.put("bucket_00000", offsets); + } + } else if (file.contains("bucket_00001")) { + if (offsetMap.containsKey("bucket_00001")) { + List offsets = offsetMap.get("bucket_00001"); + offsets.add(len); + offsetMap.put("bucket_00001", offsets); + } else { + List offsets = new ArrayList(); + offsets.add(len); + offsetMap.put("bucket_00001", offsets); + } + } else if (file.contains("bucket_00002")) { + if (offsetMap.containsKey("bucket_00002")) { + List offsets = offsetMap.get("bucket_00002"); + offsets.add(len); + offsetMap.put("bucket_00002", offsets); + } else { + List offsets = new ArrayList(); + offsets.add(len); + offsetMap.put("bucket_00002", offsets); + } + } else if (file.contains("bucket_00003")) { + if (offsetMap.containsKey("bucket_00003")) { + List offsets = offsetMap.get("bucket_00003"); + offsets.add(len); + offsetMap.put("bucket_00003", offsets); + } else { + List offsets = new ArrayList(); + offsets.add(len); + offsetMap.put("bucket_00003", offsets); + } + } + } + } @Test public void testErrorHandling() throws Exception { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java index f3f2279..30dac37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java @@ -32,9 +32,13 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex; @@ -42,6 +46,9 @@ import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; @@ -49,6 +56,7 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONWriter; +import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -57,6 +65,16 @@ */ public final class FileDump { public static final String UNKNOWN = "UNKNOWN"; + public static final String SEPARATOR = Strings.repeat("_", 120) + "\n"; + public static final int DEFAULT_BLOCK_SIZE = 256 * 1024 * 1024; + public static final String DEFAULT_BACKUP_PATH = "/tmp"; + public static final PathFilter HIDDEN_AND_SIDE_FILE_FILTER = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith(".") && !name.endsWith( + AcidUtils.DELTA_SIDE_FILE_SUFFIX); + } + }; // not used private FileDump() { @@ -76,6 +94,13 @@ public static void main(String[] args) throws Exception { } boolean dumpData = cli.hasOption('d'); + boolean recover = cli.hasOption("recover"); + boolean skipDump = cli.hasOption("skip-dump"); + String backupPath = DEFAULT_BACKUP_PATH; + if (cli.hasOption("backup-path")) { + backupPath = cli.getOptionValue("backup-path"); + } + if (cli.hasOption("r")) { String[] colStrs = cli.getOptionValue("r").split(","); rowIndexCols = new ArrayList(colStrs.length); @@ -101,24 +126,127 @@ public static void main(String[] args) throws Exception { if (dumpData) { printData(filesInPath, conf); - } else { + } else if (recover && skipDump) { + recoverFiles(filesInPath, conf, backupPath); + } else { if (jsonFormat) { boolean prettyPrint = cli.hasOption('p'); - JsonFileDump.printJsonMetaData(filesInPath, conf, rowIndexCols, prettyPrint, - printTimeZone); + JsonFileDump.printJsonMetaData(filesInPath, conf, rowIndexCols, prettyPrint, printTimeZone); } else { - printMetaData(filesInPath, conf, rowIndexCols, printTimeZone); + printMetaData(filesInPath, conf, rowIndexCols, printTimeZone, recover, backupPath); + } + } + } + + /** + * This method returns an ORC reader object if the specified file is readable. If the specified + * file has side file (_flush_length) file, then max footer offset will be read from the side + * file and orc reader will be created from that offset. Since both data file and side file + * use hflush() for flushing the data, there could be some inconsistencies and both files could be + * out-of-sync. Following are the cases under which null will be returned + * + * 1) If the file specified by path or its side file is still open for writes + * 2) If *_flush_length file does not return any footer offset + * 3) If *_flush_length returns a valid footer offset but the data file is not readable at that + * position (incomplete data file) + * 4) If *_flush_length file length is not a multiple of 8, then reader will be created from + * previous valid footer. If there is no such footer (file length > 0 and < 8), then null will + * be returned + * + * Also, if this method detects any file corruption (mismatch between data file and side file) + * then it will add the corresponding file to the specified input list for corrupted files. + * + * In all other cases, where the file is readable this method will return a reader object. + * + * @param path - file to get reader for + * @param conf - configuration object + * @param corruptFiles - fills this list with all possible corrupted files + * @return - reader for the specified file or null + * @throws IOException + */ + static Reader getReader(final Path path, final Configuration conf, + final List corruptFiles) throws IOException { + FileSystem fs = path.getFileSystem(conf); + long dataFileLen = fs.getFileStatus(path).getLen(); + System.err.println("Processing data file " + path + " [length: " + dataFileLen + "]"); + Path sideFile = OrcRecordUpdater.getSideFile(path); + final boolean sideFileExists = fs.exists(sideFile); + boolean openDataFile = false; + boolean openSideFile = false; + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + openDataFile = !dfs.isFileClosed(path); + openSideFile = sideFileExists && !dfs.isFileClosed(sideFile); + } + + if (openDataFile || openSideFile) { + if (openDataFile && openSideFile) { + System.err.println("Unable to perform file dump as " + path + " and " + sideFile + + " are still open for writes."); + } else if (openSideFile) { + System.err.println("Unable to perform file dump as " + sideFile + + " is still open for writes."); + } else { + System.err.println("Unable to perform file dump as " + path + + " is still open for writes."); + } + + return null; + } + + Reader reader = null; + if (sideFileExists) { + final long maxLen = OrcRawRecordMerger.getLastFlushLength(fs, path); + final long sideFileLen = fs.getFileStatus(sideFile).getLen(); + System.err.println("Found flush length file " + sideFile + + " [length: " + sideFileLen + ", maxFooterOffset: " + maxLen + "]"); + // no offsets read from side file + if (maxLen == -1) { + + // if data file is larger than last flush length, then additional data could be recovered + if (dataFileLen > maxLen) { + System.err.println("Data file has more data than max footer offset:" + maxLen + + ". Adding data file to recovery list."); + if (corruptFiles != null) { + corruptFiles.add(path.toUri().toString()); + } + } + return null; } + + try { + reader = OrcFile.createReader(path, OrcFile.readerOptions(conf).maxLength(maxLen)); + + // if data file is larger than last flush length, then additional data could be recovered + if (dataFileLen > maxLen) { + System.err.println("Data file has more data than max footer offset:" + maxLen + + ". Adding data file to recovery list."); + if (corruptFiles != null) { + corruptFiles.add(path.toUri().toString()); + } + } + } catch (Exception e) { + if (corruptFiles != null) { + corruptFiles.add(path.toUri().toString()); + } + System.err.println("Unable to read data from max footer offset." + + " Adding data file to recovery list."); + return null; + } + } else { + reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); } + + return reader; } - private static Collection getAllFilesInPath(final Path path, + public static Collection getAllFilesInPath(final Path path, final Configuration conf) throws IOException { List filesInPath = Lists.newArrayList(); FileSystem fs = path.getFileSystem(conf); FileStatus fileStatus = fs.getFileStatus(path); if (fileStatus.isDir()) { - FileStatus[] fileStatuses = fs.listStatus(path, AcidUtils.hiddenFileFilter); + FileStatus[] fileStatuses = fs.listStatus(path, HIDDEN_AND_SIDE_FILE_FILTER); for (FileStatus fileInPath : fileStatuses) { if (fileInPath.isDir()) { filesInPath.addAll(getAllFilesInPath(fileInPath.getPath(), conf)); @@ -137,134 +265,352 @@ private static void printData(List files, Configuration conf) throws IOE JSONException { for (String file : files) { try { - printJsonData(conf, file); - if (files.size() > 1) { - System.out.println(Strings.repeat("=", 80) + "\n"); + Path path = new Path(file); + Reader reader = getReader(path, conf, Lists.newArrayList()); + if (reader == null) { + continue; } + printJsonData(reader); + System.out.println(SEPARATOR); } catch (Exception e) { System.err.println("Unable to dump data for file: " + file); - e.printStackTrace(); - System.err.println(Strings.repeat("=", 80) + "\n"); continue; } } } private static void printMetaData(List files, Configuration conf, - List rowIndexCols, boolean printTimeZone) throws IOException { + List rowIndexCols, boolean printTimeZone, final boolean recover, + final String backupPath) + throws IOException { + List corruptFiles = Lists.newArrayList(); for (String filename : files) { - try { - Path path = new Path(filename); - Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); - System.out.println("Structure for " + filename); - System.out.println("File Version: " + reader.getFileVersion().getName() + - " with " + reader.getWriterVersion()); - RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); - System.out.println("Rows: " + reader.getNumberOfRows()); - System.out.println("Compression: " + reader.getCompression()); - if (reader.getCompression() != CompressionKind.NONE) { - System.out.println("Compression size: " + reader.getCompressionSize()); + printMetaDataImpl(filename, conf, rowIndexCols, printTimeZone, corruptFiles); + System.out.println(SEPARATOR); + } + + if (!corruptFiles.isEmpty()) { + if (recover) { + recoverFiles(corruptFiles, conf, backupPath); + } else { + System.err.println(corruptFiles.size() + " file(s) are corrupted." + + " Run the following command to recover corrupted files.\n"); + String fileNames = Joiner.on(" ").skipNulls().join(corruptFiles); + System.err.println("hive --orcfiledump --recover --skip-dump " + fileNames); + System.out.println(SEPARATOR); + } + } + } + + private static void printMetaDataImpl(final String filename, + final Configuration conf, final List rowIndexCols, final boolean printTimeZone, + final List corruptFiles) throws IOException { + Path file = new Path(filename); + Reader reader = getReader(file, conf, corruptFiles); + // if we can create reader then footer is not corrupt and file will readable + if (reader == null) { + return; + } + System.out.println("Structure for " + filename); + System.out.println("File Version: " + reader.getFileVersion().getName() + + " with " + reader.getWriterVersion()); + RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); + System.out.println("Rows: " + reader.getNumberOfRows()); + System.out.println("Compression: " + reader.getCompression()); + if (reader.getCompression() != CompressionKind.NONE) { + System.out.println("Compression size: " + reader.getCompressionSize()); + } + System.out.println("Type: " + reader.getObjectInspector().getTypeName()); + System.out.println("\nStripe Statistics:"); + Metadata metadata = reader.getMetadata(); + for (int n = 0; n < metadata.getStripeStatistics().size(); n++) { + System.out.println(" Stripe " + (n + 1) + ":"); + StripeStatistics ss = metadata.getStripeStatistics().get(n); + for (int i = 0; i < ss.getColumnStatistics().length; ++i) { + System.out.println(" Column " + i + ": " + + ss.getColumnStatistics()[i].toString()); + } + } + ColumnStatistics[] stats = reader.getStatistics(); + int colCount = stats.length; + System.out.println("\nFile Statistics:"); + for (int i = 0; i < stats.length; ++i) { + System.out.println(" Column " + i + ": " + stats[i].toString()); + } + System.out.println("\nStripes:"); + int stripeIx = -1; + for (StripeInformation stripe : reader.getStripes()) { + ++stripeIx; + long stripeStart = stripe.getOffset(); + OrcProto.StripeFooter footer = rows.readStripeFooter(stripe); + if (printTimeZone) { + String tz = footer.getWriterTimezone(); + if (tz == null || tz.isEmpty()) { + tz = UNKNOWN; } - System.out.println("Type: " + reader.getObjectInspector().getTypeName()); - System.out.println("\nStripe Statistics:"); - Metadata metadata = reader.getMetadata(); - for (int n = 0; n < metadata.getStripeStatistics().size(); n++) { - System.out.println(" Stripe " + (n + 1) + ":"); - StripeStatistics ss = metadata.getStripeStatistics().get(n); - for (int i = 0; i < ss.getColumnStatistics().length; ++i) { - System.out.println(" Column " + i + ": " + - ss.getColumnStatistics()[i].toString()); - } + System.out.println(" Stripe: " + stripe.toString() + " timezone: " + tz); + } else { + System.out.println(" Stripe: " + stripe.toString()); + } + long sectionStart = stripeStart; + for (OrcProto.Stream section : footer.getStreamsList()) { + String kind = section.hasKind() ? section.getKind().name() : UNKNOWN; + System.out.println(" Stream: column " + section.getColumn() + + " section " + kind + " start: " + sectionStart + + " length " + section.getLength()); + sectionStart += section.getLength(); + } + for (int i = 0; i < footer.getColumnsCount(); ++i) { + OrcProto.ColumnEncoding encoding = footer.getColumns(i); + StringBuilder buf = new StringBuilder(); + buf.append(" Encoding column "); + buf.append(i); + buf.append(": "); + buf.append(encoding.getKind()); + if (encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY || + encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { + buf.append("["); + buf.append(encoding.getDictionarySize()); + buf.append("]"); } - ColumnStatistics[] stats = reader.getStatistics(); - int colCount = stats.length; - System.out.println("\nFile Statistics:"); - for (int i = 0; i < stats.length; ++i) { - System.out.println(" Column " + i + ": " + stats[i].toString()); + System.out.println(buf); + } + if (rowIndexCols != null && !rowIndexCols.isEmpty()) { + // include the columns that are specified, only if the columns are included, bloom filter + // will be read + boolean[] sargColumns = new boolean[colCount]; + for (int colIdx : rowIndexCols) { + sargColumns[colIdx] = true; } - System.out.println("\nStripes:"); - int stripeIx = -1; - for (StripeInformation stripe : reader.getStripes()) { - ++stripeIx; - long stripeStart = stripe.getOffset(); - OrcProto.StripeFooter footer = rows.readStripeFooter(stripe); - if (printTimeZone) { - String tz = footer.getWriterTimezone(); - if (tz == null || tz.isEmpty()) { - tz = UNKNOWN; - } - System.out.println(" Stripe: " + stripe.toString() + " timezone: " + tz); - } else { - System.out.println(" Stripe: " + stripe.toString()); - } - long sectionStart = stripeStart; - for (OrcProto.Stream section : footer.getStreamsList()) { - String kind = section.hasKind() ? section.getKind().name() : UNKNOWN; - System.out.println(" Stream: column " + section.getColumn() + - " section " + kind + " start: " + sectionStart + - " length " + section.getLength()); - sectionStart += section.getLength(); - } - for (int i = 0; i < footer.getColumnsCount(); ++i) { - OrcProto.ColumnEncoding encoding = footer.getColumns(i); - StringBuilder buf = new StringBuilder(); - buf.append(" Encoding column "); - buf.append(i); - buf.append(": "); - buf.append(encoding.getKind()); - if (encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY || - encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { - buf.append("["); - buf.append(encoding.getDictionarySize()); - buf.append("]"); - } - System.out.println(buf); - } - if (rowIndexCols != null && !rowIndexCols.isEmpty()) { - // include the columns that are specified, only if the columns are included, bloom filter - // will be read - boolean[] sargColumns = new boolean[colCount]; - for (int colIdx : rowIndexCols) { - sargColumns[colIdx] = true; - } - RecordReaderImpl.Index indices = rows.readRowIndex(stripeIx, null, sargColumns); - for (int col : rowIndexCols) { - StringBuilder buf = new StringBuilder(); - String rowIdxString = getFormattedRowIndices(col, indices.getRowGroupIndex()); - buf.append(rowIdxString); - String bloomFilString = getFormattedBloomFilters(col, indices.getBloomFilterIndex()); - buf.append(bloomFilString); - System.out.println(buf); + RecordReaderImpl.Index indices = rows.readRowIndex(stripeIx, null, sargColumns); + for (int col : rowIndexCols) { + StringBuilder buf = new StringBuilder(); + String rowIdxString = getFormattedRowIndices(col, indices.getRowGroupIndex()); + buf.append(rowIdxString); + String bloomFilString = getFormattedBloomFilters(col, indices.getBloomFilterIndex()); + buf.append(bloomFilString); + System.out.println(buf); + } + } + } + + FileSystem fs = file.getFileSystem(conf); + long fileLen = fs.getContentSummary(file).getLength(); + long paddedBytes = getTotalPaddingSize(reader); + // empty ORC file is ~45 bytes. Assumption here is file length always >0 + double percentPadding = ((double) paddedBytes / (double) fileLen) * 100; + DecimalFormat format = new DecimalFormat("##.##"); + System.out.println("\nFile length: " + fileLen + " bytes"); + System.out.println("Padding length: " + paddedBytes + " bytes"); + System.out.println("Padding ratio: " + format.format(percentPadding) + "%"); + OrcRecordUpdater.AcidStats acidStats = OrcRecordUpdater.parseAcidStats(reader); + if (acidStats != null) { + System.out.println("ACID stats:" + acidStats); + } + rows.close(); + } + + + private static void recoverFiles(final List corruptFiles, final Configuration conf, + final String backup) + throws IOException { + for (String corruptFile : corruptFiles) { + System.err.println("Recovering file " + corruptFile); + Path corruptPath = new Path(corruptFile); + FileSystem fs = corruptPath.getFileSystem(conf); + FSDataInputStream fdis = fs.open(corruptPath); + try { + long corruptFileLen = fs.getFileStatus(corruptPath).getLen(); + long remaining = corruptFileLen; + List footerOffsets = Lists.newArrayList(); + + // start reading the data file form top to bottom and record the valid footers + while (remaining > 0) { + int toRead = (int) Math.min(DEFAULT_BLOCK_SIZE, remaining); + byte[] data = new byte[toRead]; + long startPos = corruptFileLen - remaining; + fdis.readFully(startPos, data, 0, toRead); + + // find all MAGIC string and see if the file is readable from there + int index = 0; + long nextFooterOffset; + + while (index != -1) { + index = indexOf(data, OrcFile.MAGIC.getBytes(), index + 1); + if (index != -1) { + nextFooterOffset = startPos + index + OrcFile.MAGIC.length() + 1; + if (isReadable(corruptPath, conf, nextFooterOffset)) { + footerOffsets.add(nextFooterOffset); + } } } - } - FileSystem fs = path.getFileSystem(conf); - long fileLen = fs.getContentSummary(path).getLength(); - long paddedBytes = getTotalPaddingSize(reader); - // empty ORC file is ~45 bytes. Assumption here is file length always >0 - double percentPadding = ((double) paddedBytes / (double) fileLen) * 100; - DecimalFormat format = new DecimalFormat("##.##"); - System.out.println("\nFile length: " + fileLen + " bytes"); - System.out.println("Padding length: " + paddedBytes + " bytes"); - System.out.println("Padding ratio: " + format.format(percentPadding) + "%"); - OrcRecordUpdater.AcidStats acidStats = OrcRecordUpdater.parseAcidStats(reader); - if (acidStats != null) { - System.out.println("ACID stats:" + acidStats); - } - rows.close(); - if (files.size() > 1) { - System.out.println(Strings.repeat("=", 80) + "\n"); + System.err.println("Scanning for valid footers - startPos: " + startPos + + " toRead: " + toRead + " remaining: " + remaining); + remaining = remaining - toRead; } + + System.err.println("Readable footerOffsets: " + footerOffsets); + recoverFile(corruptPath, fs, conf, footerOffsets, backup); } catch (Exception e) { - System.err.println("Unable to dump metadata for file: " + filename); + Path recoveryFile = getRecoveryFile(corruptPath); + if (fs.exists(recoveryFile)) { + fs.delete(recoveryFile, false); + } + System.err.println("Unable to recover file " + corruptFile); e.printStackTrace(); - System.err.println(Strings.repeat("=", 80) + "\n"); + System.err.println(SEPARATOR); continue; + } finally { + fdis.close(); + } + System.err.println(corruptFile + " recovered successfully!"); + System.err.println(SEPARATOR); + } + } + + private static void recoverFile(final Path corruptPath, final FileSystem fs, + final Configuration conf, final List footerOffsets, final String backup) + throws IOException { + + // first recover the file to .recovered file and then once successful rename it to actual file + Path recoveredPath = getRecoveryFile(corruptPath); + + // make sure that file does not exist + if (fs.exists(recoveredPath)) { + fs.delete(recoveredPath, false); + } + + // if there are no valid footers, the file should still be readable so create an empty orc file + if (footerOffsets == null || footerOffsets.isEmpty()) { + System.err.println("No readable footers found. Creating empty orc file."); + StructObjectInspector soi = ObjectInspectorFactory.getStandardStructObjectInspector( + Lists.newArrayList(), Lists.newArrayList(), + Lists.newArrayList()); + Writer writer = OrcFile + .createWriter(recoveredPath, OrcFile.writerOptions(conf).inspector(soi)); + writer.close(); + } else { + FSDataInputStream fdis = fs.open(corruptPath); + FileStatus fileStatus = fs.getFileStatus(corruptPath); + // read corrupt file and copy it to recovered file until last valid footer + FSDataOutputStream fdos = fs.create(recoveredPath, true, + conf.getInt("io.file.buffer.size", 4096), + fileStatus.getReplication(), + fileStatus.getBlockSize()); + try { + long fileLen = footerOffsets.get(footerOffsets.size() - 1); + long remaining = fileLen; + + while (remaining > 0) { + int toRead = (int) Math.min(DEFAULT_BLOCK_SIZE, remaining); + byte[] data = new byte[toRead]; + long startPos = fileLen - remaining; + fdis.readFully(startPos, data, 0, toRead); + fdos.write(data); + System.err.println("Copying data to recovery file - startPos: " + startPos + + " toRead: " + toRead + " remaining: " + remaining); + remaining = remaining - toRead; + } + } catch (Exception e) { + fs.delete(recoveredPath, false); + throw new IOException(e); + } finally { + fdis.close(); + fdos.close(); + } + } + + // validate the recovered file once again and start moving corrupt files to backup folder + if (isReadable(recoveredPath, conf, Long.MAX_VALUE)) { + Path backupDataPath; + String scheme = corruptPath.toUri().getScheme(); + String authority = corruptPath.toUri().getAuthority(); + String filePath = corruptPath.toUri().getPath(); + + // use the same filesystem as corrupt file if backup-path is not explicitly specified + if (backup.equals(DEFAULT_BACKUP_PATH)) { + backupDataPath = new Path(scheme, authority, DEFAULT_BACKUP_PATH + filePath); + } else { + backupDataPath = new Path(backup + filePath); + } + + // Move data file to backup path + moveFiles(fs, corruptPath, backupDataPath); + + // Move side file to backup path + Path sideFilePath = OrcRecordUpdater.getSideFile(corruptPath); + Path backupSideFilePath = new Path(backupDataPath.getParent(), sideFilePath.getName()); + moveFiles(fs, sideFilePath, backupSideFilePath); + + // finally move recovered file to actual file + moveFiles(fs, recoveredPath, corruptPath); + + // we are done recovering, backing up and validating + System.err.println("Validation of recovered file successful!"); + } + } + + private static void moveFiles(final FileSystem fs, final Path src, final Path dest) + throws IOException { + try { + // create the dest directory if not exist + if (!fs.exists(dest.getParent())) { + fs.mkdirs(dest.getParent()); + } + + // if the destination file exists for some reason delete it + fs.delete(dest, false); + + if (fs.rename(src, dest)) { + System.err.println("Moved " + src + " to " + dest); + } else { + throw new IOException("Unable to move " + src + " to " + dest); } + + } catch (Exception e) { + throw new IOException("Unable to move " + src + " to " + dest, e); } } + private static Path getRecoveryFile(final Path corruptPath) { + return new Path(corruptPath.getParent(), corruptPath.getName() + ".recovered"); + } + + private static boolean isReadable(final Path corruptPath, final Configuration conf, + final long maxLen) { + try { + OrcFile.createReader(corruptPath, OrcFile.readerOptions(conf).maxLength(maxLen)); + return true; + } catch (Exception e) { + // ignore this exception as maxLen is unreadable + return false; + } + } + + // search for byte pattern in another byte array + private static int indexOf(final byte[] data, final byte[] pattern, final int index) { + if (data == null || data.length == 0 || pattern == null || pattern.length == 0 || + index > data.length || index < 0) { + return -1; + } + + int j = 0; + for (int i = index; i < data.length; i++) { + if (pattern[j] == data[i]) { + j++; + } else { + j = 0; + } + + if (j == pattern.length) { + return i - pattern.length + 1; + } + } + + return -1; + } + private static String getFormattedBloomFilters(int col, OrcProto.BloomFilterIndex[] bloomFilterIndex) { StringBuilder buf = new StringBuilder(); @@ -390,6 +736,21 @@ static Options createOptions() { .withDescription("Pretty print json metadata output") .create('p')); + result.addOption(OptionBuilder + .withLongOpt("recover") + .withDescription("recover corrupted orc files generated by streaming") + .create()); + + result.addOption(OptionBuilder + .withLongOpt("skip-dump") + .withDescription("used along with --recover to directly recover files without dumping") + .create()); + + result.addOption(OptionBuilder + .withLongOpt("backup-path") + .withDescription("specify a backup path to store the corrupted files (default: /tmp)") + .hasArg() + .create()); return result; } @@ -496,10 +857,7 @@ static void printObject(JSONWriter writer, } } - static void printJsonData(Configuration conf, - String filename) throws IOException, JSONException { - Path path = new Path(filename); - Reader reader = OrcFile.createReader(path.getFileSystem(conf), path); + static void printJsonData(final Reader reader) throws IOException, JSONException { PrintStream printStream = System.out; OutputStreamWriter out = new OutputStreamWriter(printStream, "UTF-8"); RecordReader rows = reader.rows(null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java index 6101b97..2c7399b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java @@ -19,14 +19,15 @@ import java.io.IOException; import java.util.List; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; +import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; -import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONStringer; import org.codehaus.jettison.json.JSONWriter; @@ -35,8 +36,10 @@ */ public class JsonFileDump { - public static void printJsonMetaData(List files, Configuration conf, - List rowIndexCols, boolean prettyPrint, boolean printTimeZone) throws JSONException, IOException { + public static void printJsonMetaData(List files, + Configuration conf, + List rowIndexCols, boolean prettyPrint, boolean printTimeZone) + throws JSONException, IOException { JSONStringer writer = new JSONStringer(); boolean multiFile = files.size() > 1; if (multiFile) { @@ -51,7 +54,11 @@ public static void printJsonMetaData(List files, Configuration conf, } writer.key("fileName").value(filename); Path path = new Path(filename); - Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + Reader reader = FileDump.getReader(path, conf, null); + if (reader == null) { + writer.key("status").value("FAILED"); + continue; + } writer.key("fileVersion").value(reader.getFileVersion().getName()); writer.key("writerVersion").value(reader.getWriterVersion()); RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); @@ -178,8 +185,6 @@ public static void printJsonMetaData(List files, Configuration conf, writer.endObject(); } catch (Exception e) { writer.key("status").value("FAILED"); - System.err.println("Unable to dump data for file: " + filename); - e.printStackTrace(); throw e; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 58b85ef..ab0c364 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -533,7 +533,7 @@ private void discoverKeyBounds(Reader reader, * @return the maximum size of the file to use * @throws IOException */ - private static long getLastFlushLength(FileSystem fs, + static long getLastFlushLength(FileSystem fs, Path deltaFile) throws IOException { Path lengths = OrcRecordUpdater.getSideFile(deltaFile); long result = Long.MAX_VALUE; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index c2bfa6e..efe5293 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -139,7 +139,7 @@ public String toString() { } } - static Path getSideFile(Path main) { + public static Path getSideFile(Path main) { return new Path(main + AcidUtils.DELTA_SIDE_FILE_SUFFIX); } diff --git a/ql/src/test/resources/orc-file-dump-bloomfilter.out b/ql/src/test/resources/orc-file-dump-bloomfilter.out index 3420135..ed47156 100644 --- a/ql/src/test/resources/orc-file-dump-bloomfilter.out +++ b/ql/src/test/resources/orc-file-dump-bloomfilter.out @@ -175,3 +175,5 @@ Stripes: File length: 273307 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + diff --git a/ql/src/test/resources/orc-file-dump-bloomfilter2.out b/ql/src/test/resources/orc-file-dump-bloomfilter2.out index 462d41f..f8a21d0 100644 --- a/ql/src/test/resources/orc-file-dump-bloomfilter2.out +++ b/ql/src/test/resources/orc-file-dump-bloomfilter2.out @@ -175,3 +175,5 @@ Stripes: File length: 298416 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + diff --git a/ql/src/test/resources/orc-file-dump-dictionary-threshold.out b/ql/src/test/resources/orc-file-dump-dictionary-threshold.out index 2b30be3..1d0d583 100644 --- a/ql/src/test/resources/orc-file-dump-dictionary-threshold.out +++ b/ql/src/test/resources/orc-file-dump-dictionary-threshold.out @@ -186,3 +186,5 @@ Stripes: File length: 2217685 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + diff --git a/ql/src/test/resources/orc-file-dump.out b/ql/src/test/resources/orc-file-dump.out index 6457018..c6c0955 100644 --- a/ql/src/test/resources/orc-file-dump.out +++ b/ql/src/test/resources/orc-file-dump.out @@ -191,3 +191,5 @@ Stripes: File length: 270923 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + diff --git a/ql/src/test/resources/orc-file-has-null.out b/ql/src/test/resources/orc-file-has-null.out index 2b12ddb..8975c37 100644 --- a/ql/src/test/resources/orc-file-has-null.out +++ b/ql/src/test/resources/orc-file-has-null.out @@ -108,3 +108,5 @@ Stripes: File length: 1823 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + diff --git a/ql/src/test/results/clientpositive/orc_file_dump.q.out b/ql/src/test/results/clientpositive/orc_file_dump.q.out index 7503c81..c741eda 100644 --- a/ql/src/test/results/clientpositive/orc_file_dump.q.out +++ b/ql/src/test/results/clientpositive/orc_file_dump.q.out @@ -195,6 +195,8 @@ Stripes: File length: 33413 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 124 336 65664 4294967435 74.72 42.47 true bob davidson 2013-03-01 09:11:58.703302 45.4 yard duty PREHOOK: query: alter table orc_ppd set tblproperties("orc.bloom.filter.fpp"="0.01") @@ -313,6 +315,8 @@ Stripes: File length: 38565 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 124 336 65664 4294967435 74.72 42.47 true bob davidson 2013-03-01 09:11:58.703302 45.4 yard duty PREHOOK: query: CREATE TABLE orc_ppd_part(t tinyint, @@ -443,5 +447,7 @@ Stripes: File length: 33413 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 124 336 65664 4294967435 74.72 42.47 true bob davidson 2013-03-01 09:11:58.703302 45.4 yard duty 2015 10 diff --git a/ql/src/test/results/clientpositive/orc_merge10.q.out b/ql/src/test/results/clientpositive/orc_merge10.q.out index 89d0bf6..a415776 100644 --- a/ql/src/test/results/clientpositive/orc_merge10.q.out +++ b/ql/src/test/results/clientpositive/orc_merge10.q.out @@ -568,6 +568,8 @@ Stripes: File length: 2137 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 172 val_172 1 0 PREHOOK: query: select * from orcfile_merge1c where ds='1' and part='0' limit 1 @@ -628,6 +630,8 @@ Stripes: File length: 2137 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 172 val_172 1 0 PREHOOK: query: DROP TABLE orcfile_merge1 diff --git a/ql/src/test/results/clientpositive/orc_merge11.q.out b/ql/src/test/results/clientpositive/orc_merge11.q.out index 1c4eb0a..0b9d973 100644 --- a/ql/src/test/results/clientpositive/orc_merge11.q.out +++ b/ql/src/test/results/clientpositive/orc_merge11.q.out @@ -128,6 +128,8 @@ Stripes: File length: 6828 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- -- BEGIN ORC FILE DUMP -- #### A masked pattern was here #### @@ -187,6 +189,8 @@ Stripes: File length: 6828 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 2 foo 0.8 1 1969-12-31 16:00:00 PREHOOK: query: -- concatenate @@ -304,6 +308,8 @@ Stripes: File length: 13348 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 2 foo 0.8 1 1969-12-31 16:00:00 PREHOOK: query: DROP TABLE orc_split_elim diff --git a/ql/src/test/results/clientpositive/tez/orc_merge10.q.out b/ql/src/test/results/clientpositive/tez/orc_merge10.q.out index d7ea13a..d41671a 100644 --- a/ql/src/test/results/clientpositive/tez/orc_merge10.q.out +++ b/ql/src/test/results/clientpositive/tez/orc_merge10.q.out @@ -618,6 +618,8 @@ Stripes: File length: 2393 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 86 val_86 1 0 PREHOOK: query: select * from orcfile_merge1c where ds='1' and part='0' limit 1 @@ -693,6 +695,8 @@ Stripes: File length: 2393 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 86 val_86 1 0 PREHOOK: query: DROP TABLE orcfile_merge1 diff --git a/ql/src/test/results/clientpositive/tez/orc_merge11.q.out b/ql/src/test/results/clientpositive/tez/orc_merge11.q.out index 1c4eb0a..0b9d973 100644 --- a/ql/src/test/results/clientpositive/tez/orc_merge11.q.out +++ b/ql/src/test/results/clientpositive/tez/orc_merge11.q.out @@ -128,6 +128,8 @@ Stripes: File length: 6828 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- -- BEGIN ORC FILE DUMP -- #### A masked pattern was here #### @@ -187,6 +189,8 @@ Stripes: File length: 6828 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 2 foo 0.8 1 1969-12-31 16:00:00 PREHOOK: query: -- concatenate @@ -304,6 +308,8 @@ Stripes: File length: 13348 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + -- END ORC FILE DUMP -- 2 foo 0.8 1 1969-12-31 16:00:00 PREHOOK: query: DROP TABLE orc_split_elim