diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java index 0e9667a..bae7379 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java @@ -32,9 +32,13 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex; @@ -42,6 +46,9 @@ import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; @@ -49,6 +56,7 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONWriter; +import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -57,6 +65,16 @@ */ public final class FileDump { public static final String UNKNOWN = "UNKNOWN"; + public static final String SEPARATOR = Strings.repeat("_", 120) + "\n"; + public static final int DEFAULT_BLOCK_SIZE = 256 * 1024 * 1024; + public static final String DEFAULT_BACKUP_PATH = "/tmp"; + public static final PathFilter HIDDEN_AND_SIDE_FILE_FILTER = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith(".") && !name.endsWith( + AcidUtils.DELTA_SIDE_FILE_SUFFIX); + } + }; // not used private FileDump() { @@ -76,6 +94,13 @@ public static void main(String[] args) throws Exception { } boolean dumpData = cli.hasOption('d'); + boolean recover = cli.hasOption("recover"); + boolean skipDump = cli.hasOption("skip-dump"); + String backupPath = DEFAULT_BACKUP_PATH; + if (cli.hasOption("backup-path")) { + backupPath = cli.getOptionValue("backup-path"); + } + if (cli.hasOption("r")) { String[] colStrs = cli.getOptionValue("r").split(","); rowIndexCols = new ArrayList(colStrs.length); @@ -101,24 +126,129 @@ public static void main(String[] args) throws Exception { if (dumpData) { printData(filesInPath, conf); + } else if (recover && skipDump) { + recoverFiles(filesInPath, conf, backupPath); } else { if (jsonFormat) { boolean prettyPrint = cli.hasOption('p'); - JsonFileDump.printJsonMetaData(filesInPath, conf, rowIndexCols, prettyPrint, - printTimeZone); + JsonFileDump.printJsonMetaData(filesInPath, conf, rowIndexCols, prettyPrint, printTimeZone); } else { - printMetaData(filesInPath, conf, rowIndexCols, printTimeZone); + printMetaData(filesInPath, conf, rowIndexCols, printTimeZone, recover, backupPath); } } } - private static Collection getAllFilesInPath(final Path path, + /** + * This method returns an ORC reader object if the specified file is readable. If the specified + * file has side file (_flush_length) file, then max footer offset will be read from the side + * file and orc reader will be created from that offset. Since both data file and side file + * use hflush() for flushing the data, there could be some inconsistencies or both files could be + * out-of-sync. Following are the cases under which null will be returned + * + * 1) If the file specified by path is still open for writes + * 2) If *_flush_length file does not return any footer offset + * 3) If *_flush_length returns a valid footer offset but the data file is not readable at that + * position (incomplete data file) + * + * Also, if this method detects any file corruption then it will add the file to the specified + * input list for corrupted files. + * + * In all other cases, where the file is readable this method will return a reader object. + * + * @param path - file to get reader for + * @param conf - configuration object + * @param corruptFiles - fills this list with all possible corrupted files + * @return - reader for the specified file or null + * @throws IOException + */ + static Reader getReader(final Path path, final Configuration conf, + final List corruptFiles) throws IOException { + FileSystem fs = path.getFileSystem(conf); + long dataFileLen = fs.getFileStatus(path).getLen(); + System.err.println("Processing data file " + path + " [length: " + dataFileLen + "]"); + boolean isOpen = false; + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + isOpen = !dfs.isFileClosed(path); + } + + if (isOpen) { + System.err.println("Unable to perform file dump as " + path + " is still open for writes."); + return null; + } + + Path sideFile = OrcRecordUpdater.getSideFile(path); + final boolean sideFileExists = fs.exists(sideFile); + Reader reader = null; + if (sideFileExists) { + final long maxLen = OrcRawRecordMerger.getLastFlushLength(fs, path); + final long sideFileLen = fs.getFileStatus(sideFile).getLen(); + System.err.println("Found flush length file " + sideFile + + " [length: " + sideFileLen + ", maxFooterOffset: " + maxLen + "]"); + // no offsets read from side file + if (maxLen == -1) { + + // if the data file is larger than last flush length, + // 1) It could be corrupted + // 2) It could have more valid footers + if (dataFileLen > maxLen) { + checkIfReadableFromEnd(path, conf, corruptFiles); + } + return reader; + } + + try { + reader = OrcFile.createReader(path, OrcFile.readerOptions(conf).maxLength(maxLen)); + + // if the data file is larger than last flush length, + // 1) It could be corrupted + // 2) It could have more valid footers + if (dataFileLen > maxLen) { + checkIfReadableFromEnd(path, conf, corruptFiles); + } + } catch (Exception e) { + corruptFiles.add(path.toUri().toString()); + System.err.println("Unable to read data from max footer offset." + + " Adding data file to recovery list."); + return null; + } + } else { + reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + } + + return reader; + } + + private static void checkIfReadableFromEnd(final Path path, final Configuration conf, + final List corruptFiles) { + Reader reader; + try { + reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + } catch (Exception e) { + // data file larger but corrupt + corruptFiles.add(path.toUri().toString()); + System.err.println("Data file has more data than max footer offset." + + " Adding data file to recovery list. "); + + // If the file is unreadable then it will throw all sorts of error which is of no use. + // So just ignore the exception. + reader = null; + } + + // data file larger but not corrupt + if (reader != null) { + System.err.println("Data file has more valid data than max footer offset." + + " Adding data file to recovery list."); + } + } + + private static Collection getAllFilesInPath(final Path path, final Configuration conf) throws IOException { List filesInPath = Lists.newArrayList(); FileSystem fs = path.getFileSystem(conf); FileStatus fileStatus = fs.getFileStatus(path); if (fileStatus.isDir()) { - FileStatus[] fileStatuses = fs.listStatus(path, AcidUtils.hiddenFileFilter); + FileStatus[] fileStatuses = fs.listStatus(path, HIDDEN_AND_SIDE_FILE_FILTER); for (FileStatus fileInPath : fileStatuses) { if (fileInPath.isDir()) { filesInPath.addAll(getAllFilesInPath(fileInPath.getPath(), conf)); @@ -133,139 +263,360 @@ public static void main(String[] args) throws Exception { return filesInPath; } - private static void printData(List files, Configuration conf) throws IOException, + private static void printData(List files, + Configuration conf) throws IOException, JSONException { for (String file : files) { try { - printJsonData(conf, file); - if (files.size() > 1) { - System.out.println(Strings.repeat("=", 80) + "\n"); + Path path = new Path(file); + Reader reader = getReader(path, conf, null); + if (reader == null) { + continue; } + printJsonData(reader); + System.out.println(SEPARATOR); } catch (Exception e) { System.err.println("Unable to dump data for file: " + file); - e.printStackTrace(); - System.err.println(Strings.repeat("=", 80) + "\n"); continue; } } } private static void printMetaData(List files, Configuration conf, - List rowIndexCols, boolean printTimeZone) throws IOException { + List rowIndexCols, boolean printTimeZone, final boolean recover, + final String backupPath) + throws IOException { + List corruptFiles = Lists.newArrayList(); for (String filename : files) { - try { - Path path = new Path(filename); - Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); - System.out.println("Structure for " + filename); - System.out.println("File Version: " + reader.getFileVersion().getName() + - " with " + reader.getWriterVersion()); - RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); - System.out.println("Rows: " + reader.getNumberOfRows()); - System.out.println("Compression: " + reader.getCompression()); - if (reader.getCompression() != CompressionKind.NONE) { - System.out.println("Compression size: " + reader.getCompressionSize()); + printMetaDataImpl(filename, conf, rowIndexCols, printTimeZone, corruptFiles); + System.out.println(SEPARATOR); + } + + if (!corruptFiles.isEmpty()) { + if (recover) { + recoverFiles(corruptFiles, conf, backupPath); + } else { + System.err.println(corruptFiles.size() + " file(s) are corrupted." + + " Run the following command to recover corrupted files.\n"); + String fileNames = Joiner.on(" ").skipNulls().join(corruptFiles); + System.err.println("hive --orcfiledump --recover --skip-dump " + fileNames); + System.out.println(SEPARATOR); + } + } + } + + private static void printMetaDataImpl(final String filename, + final Configuration conf, final List rowIndexCols, final boolean printTimeZone, + final List corruptFiles) throws IOException { + Path file = new Path(filename); + Reader reader = getReader(file, conf, corruptFiles); + // if we can create reader then footer is not corrupt and file will readable + if (reader == null) { + return; + } + + System.out.println("Structure for " + filename); + System.out.println("File Version: " + reader.getFileVersion().getName() + + " with " + reader.getWriterVersion()); + RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); + System.out.println("Rows: " + reader.getNumberOfRows()); + System.out.println("Compression: " + reader.getCompression()); + if (reader.getCompression() != CompressionKind.NONE) { + System.out.println("Compression size: " + reader.getCompressionSize()); + } + System.out.println("Type: " + reader.getObjectInspector().getTypeName()); + System.out.println("\nStripe Statistics:"); + List stripeStats = reader.getStripeStatistics(); + for (int n = 0; n < stripeStats.size(); n++) { + System.out.println(" Stripe " + (n + 1) + ":"); + StripeStatistics ss = stripeStats.get(n); + for (int i = 0; i < ss.getColumnStatistics().length; ++i) { + System.out.println(" Column " + i + ": " + + ss.getColumnStatistics()[i].toString()); + } + } + ColumnStatistics[] stats = reader.getStatistics(); + int colCount = stats.length; + System.out.println("\nFile Statistics:"); + for (int i = 0; i < stats.length; ++i) { + System.out.println(" Column " + i + ": " + stats[i].toString()); + } + System.out.println("\nStripes:"); + int stripeIx = -1; + for (StripeInformation stripe : reader.getStripes()) { + ++stripeIx; + long stripeStart = stripe.getOffset(); + OrcProto.StripeFooter footer = rows.readStripeFooter(stripe); + if (printTimeZone) { + String tz = footer.getWriterTimezone(); + if (tz == null || tz.isEmpty()) { + tz = UNKNOWN; } - System.out.println("Type: " + reader.getObjectInspector().getTypeName()); - System.out.println("\nStripe Statistics:"); - List stripeStats = reader.getStripeStatistics(); - for (int n = 0; n < stripeStats.size(); n++) { - System.out.println(" Stripe " + (n + 1) + ":"); - StripeStatistics ss = stripeStats.get(n); - for (int i = 0; i < ss.getColumnStatistics().length; ++i) { - System.out.println(" Column " + i + ": " + - ss.getColumnStatistics()[i].toString()); - } + System.out.println(" Stripe: " + stripe.toString() + " timezone: " + tz); + } else { + System.out.println(" Stripe: " + stripe.toString()); + } + long sectionStart = stripeStart; + for (OrcProto.Stream section : footer.getStreamsList()) { + String kind = section.hasKind() ? section.getKind().name() : UNKNOWN; + System.out.println(" Stream: column " + section.getColumn() + + " section " + kind + " start: " + sectionStart + + " length " + section.getLength()); + sectionStart += section.getLength(); + } + for (int i = 0; i < footer.getColumnsCount(); ++i) { + OrcProto.ColumnEncoding encoding = footer.getColumns(i); + StringBuilder buf = new StringBuilder(); + buf.append(" Encoding column "); + buf.append(i); + buf.append(": "); + buf.append(encoding.getKind()); + if (encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY || + encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { + buf.append("["); + buf.append(encoding.getDictionarySize()); + buf.append("]"); } - ColumnStatistics[] stats = reader.getStatistics(); - int colCount = stats.length; - System.out.println("\nFile Statistics:"); - for (int i = 0; i < stats.length; ++i) { - System.out.println(" Column " + i + ": " + stats[i].toString()); + System.out.println(buf); + } + if (rowIndexCols != null && !rowIndexCols.isEmpty()) { + // include the columns that are specified, only if the columns are included, bloom filter + // will be read + boolean[] sargColumns = new boolean[colCount]; + for (int colIdx : rowIndexCols) { + sargColumns[colIdx] = true; } - System.out.println("\nStripes:"); - int stripeIx = -1; - for (StripeInformation stripe : reader.getStripes()) { - ++stripeIx; - long stripeStart = stripe.getOffset(); - OrcProto.StripeFooter footer = rows.readStripeFooter(stripe); - if (printTimeZone) { - String tz = footer.getWriterTimezone(); - if (tz == null || tz.isEmpty()) { - tz = UNKNOWN; - } - System.out.println(" Stripe: " + stripe.toString() + " timezone: " + tz); - } else { - System.out.println(" Stripe: " + stripe.toString()); - } - long sectionStart = stripeStart; - for (OrcProto.Stream section : footer.getStreamsList()) { - String kind = section.hasKind() ? section.getKind().name() : UNKNOWN; - System.out.println(" Stream: column " + section.getColumn() + - " section " + kind + " start: " + sectionStart + - " length " + section.getLength()); - sectionStart += section.getLength(); - } - for (int i = 0; i < footer.getColumnsCount(); ++i) { - OrcProto.ColumnEncoding encoding = footer.getColumns(i); - StringBuilder buf = new StringBuilder(); - buf.append(" Encoding column "); - buf.append(i); - buf.append(": "); - buf.append(encoding.getKind()); - if (encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY || - encoding.getKind() == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { - buf.append("["); - buf.append(encoding.getDictionarySize()); - buf.append("]"); + RecordReaderImpl.Index indices = rows + .readRowIndex(stripeIx, null, null, null, sargColumns); + for (int col : rowIndexCols) { + StringBuilder buf = new StringBuilder(); + String rowIdxString = getFormattedRowIndices(col, indices.getRowGroupIndex()); + buf.append(rowIdxString); + String bloomFilString = getFormattedBloomFilters(col, indices.getBloomFilterIndex()); + buf.append(bloomFilString); + System.out.println(buf); + } + } + } + + FileSystem fs = file.getFileSystem(conf); + long fileLen = fs.getFileStatus(file).getLen(); + long paddedBytes = getTotalPaddingSize(reader); + // empty ORC file is ~45 bytes. Assumption here is file length always >0 + double percentPadding = ((double) paddedBytes / (double) fileLen) * 100; + DecimalFormat format = new DecimalFormat("##.##"); + System.out.println("\nFile length: " + fileLen + " bytes"); + System.out.println("Padding length: " + paddedBytes + " bytes"); + System.out.println("Padding ratio: " + format.format(percentPadding) + "%"); + OrcRecordUpdater.AcidStats acidStats = OrcRecordUpdater.parseAcidStats(reader); + if (acidStats != null) { + System.out.println("ACID stats:" + acidStats); + } + rows.close(); + } + + private static void recoverFiles(final List corruptFiles, final Configuration conf, + final String backup) + throws IOException { + for (String corruptFile : corruptFiles) { + System.err.println("Recovering file " + corruptFile); + Path corruptPath = new Path(corruptFile); + FileSystem fs = corruptPath.getFileSystem(conf); + FSDataInputStream fdis = fs.open(corruptPath); + try { + long corruptFileLen = fs.getFileStatus(corruptPath).getLen(); + long remaining = corruptFileLen; + List footerOffsets = Lists.newArrayList(); + + // start reading the data file form top to bottom and record the valid footers + while (remaining > 0) { + int toRead = (int) Math.min(DEFAULT_BLOCK_SIZE, remaining); + byte[] data = new byte[toRead]; + long startPos = corruptFileLen - remaining; + fdis.readFully(startPos, data, 0, toRead); + + // find all MAGIC string and see if the file is readable from there + int index = indexOf(data, OrcFile.MAGIC.getBytes(), 0); + long nextFooterOffset; + if (index != -1) { + nextFooterOffset = startPos + index + OrcFile.MAGIC.length() + 1; + if (isReadable(corruptPath, conf, nextFooterOffset)) { + footerOffsets.add(nextFooterOffset); } - System.out.println(buf); } - if (rowIndexCols != null && !rowIndexCols.isEmpty()) { - // include the columns that are specified, only if the columns are included, bloom filter - // will be read - boolean[] sargColumns = new boolean[colCount]; - for (int colIdx : rowIndexCols) { - sargColumns[colIdx] = true; - } - RecordReaderImpl.Index indices = rows - .readRowIndex(stripeIx, null, null, null, sargColumns); - for (int col : rowIndexCols) { - StringBuilder buf = new StringBuilder(); - String rowIdxString = getFormattedRowIndices(col, indices.getRowGroupIndex()); - buf.append(rowIdxString); - String bloomFilString = getFormattedBloomFilters(col, indices.getBloomFilterIndex()); - buf.append(bloomFilString); - System.out.println(buf); + + while (index != -1) { + index = indexOf(data, OrcFile.MAGIC.getBytes(), index + 1); + if (index != -1) { + nextFooterOffset = startPos + index + OrcFile.MAGIC.length() + 1; + if (isReadable(corruptPath, conf, nextFooterOffset)) { + footerOffsets.add(nextFooterOffset); + } } } - } - FileSystem fs = path.getFileSystem(conf); - long fileLen = fs.getContentSummary(path).getLength(); - long paddedBytes = getTotalPaddingSize(reader); - // empty ORC file is ~45 bytes. Assumption here is file length always >0 - double percentPadding = ((double) paddedBytes / (double) fileLen) * 100; - DecimalFormat format = new DecimalFormat("##.##"); - System.out.println("\nFile length: " + fileLen + " bytes"); - System.out.println("Padding length: " + paddedBytes + " bytes"); - System.out.println("Padding ratio: " + format.format(percentPadding) + "%"); - OrcRecordUpdater.AcidStats acidStats = OrcRecordUpdater.parseAcidStats(reader); - if (acidStats != null) { - System.out.println("ACID stats:" + acidStats); - } - rows.close(); - if (files.size() > 1) { - System.out.println(Strings.repeat("=", 80) + "\n"); + System.err.println("Scanning for valid footers - startPos: " + startPos + + " toRead: " + toRead + " remaining: " + remaining); + remaining = remaining - toRead; } + + System.err.println("Readable footerOffsets: " + footerOffsets); + recoverFile(corruptPath, fs, conf, footerOffsets, backup); } catch (Exception e) { - System.err.println("Unable to dump metadata for file: " + filename); + Path recoveryFile = getRecoveryFile(corruptPath); + if (fs.exists(recoveryFile)) { + fs.delete(recoveryFile, false); + } + System.err.println("Unable to recover file " + corruptFile); e.printStackTrace(); - System.err.println(Strings.repeat("=", 80) + "\n"); + System.err.println(SEPARATOR); continue; + } finally { + fdis.close(); } + System.err.println(corruptFile + " recovered successfully!"); + System.err.println(SEPARATOR); } } + private static void recoverFile(final Path corruptPath, final FileSystem fs, + final Configuration conf, final List footerOffsets, final String backup) + throws IOException { + + // first recover the file to .recovered file and then once successful rename it to actual file + Path recoveredPath = getRecoveryFile(corruptPath); + + // make sure that file does not exist + if (fs.exists(recoveredPath)) { + fs.delete(recoveredPath, false); + } + + // if there are no valid footers, the file should still be readable so create an empty orc file + if (footerOffsets == null || footerOffsets.isEmpty()) { + System.err.println("No readable footers found. Creating empty orc file."); + StructObjectInspector soi = ObjectInspectorFactory.getStandardStructObjectInspector( + Lists.newArrayList(), Lists.newArrayList(), + Lists.newArrayList()); + Writer writer = OrcFile + .createWriter(recoveredPath, OrcFile.writerOptions(conf).inspector(soi)); + writer.close(); + } else { + // read corrupt file and copy it to recovered file until last valid footer + FSDataOutputStream fdos = fs.create(recoveredPath, true); + FSDataInputStream fdis = fs.open(corruptPath); + try { + long fileLen = footerOffsets.get(footerOffsets.size() - 1); + long remaining = fileLen; + + while (remaining > 0) { + int toRead = (int) Math.min(DEFAULT_BLOCK_SIZE, remaining); + byte[] data = new byte[toRead]; + long startPos = fileLen - remaining; + fdis.readFully(startPos, data, 0, toRead); + fdos.write(data); + System.err.println("Copying data to recovery file - startPos: " + startPos + + " toRead: " + toRead + " remaining: " + remaining); + remaining = remaining - toRead; + } + } catch (Exception e) { + fs.delete(recoveredPath, false); + throw new IOException(e); + } finally { + fdis.close(); + fdos.close(); + } + } + + // validate the recovered file once again and start moving corrupt files to backup folder + if (isReadable(recoveredPath, conf, Long.MAX_VALUE)) { + Path backupDataPath; + String scheme = corruptPath.toUri().getScheme(); + String authority = corruptPath.toUri().getAuthority(); + String filePath = corruptPath.toUri().getPath(); + + // use the same filesystem as corrupt file if backup-path is not explicitly specified + if (backup.equals(DEFAULT_BACKUP_PATH)) { + backupDataPath = new Path(scheme, authority, DEFAULT_BACKUP_PATH + filePath); + } else { + backupDataPath = new Path(backup + filePath); + } + + // Move data file to backup path + moveFiles(fs, corruptPath, backupDataPath); + + // Move side file to backup path + Path sideFilePath = OrcRecordUpdater.getSideFile(corruptPath); + Path backupSideFilePath = new Path(backupDataPath.getParent(), sideFilePath.getName()); + moveFiles(fs, sideFilePath, backupSideFilePath); + + // finally move recovered file to actual file + moveFiles(fs, recoveredPath, corruptPath); + + // we are done recovering, backing up and validating + System.err.println("Validation of recovered file successful!"); + } + } + + private static void moveFiles(final FileSystem fs, final Path src, final Path dest) + throws IOException { + try { + // create the dest directory if not exist + if (!fs.exists(dest.getParent())) { + fs.mkdirs(dest.getParent()); + } + + // if the destination file exists for some reason delete it + fs.delete(dest, false); + + if (fs.rename(src, dest)) { + System.err.println("Moved " + src + " to " + dest); + } else { + throw new IOException("Unable to move " + src + " to " + dest); + } + + } catch (Exception e) { + throw new IOException("Unable to move " + src + " to " + dest, e); + } + } + + private static Path getRecoveryFile(final Path corruptPath) { + return new Path(corruptPath.getParent(), corruptPath.getName() + ".recovered"); + } + + private static boolean isReadable(final Path corruptPath, final Configuration conf, + final long maxLen) { + try { + OrcFile.createReader(corruptPath, OrcFile.readerOptions(conf).maxLength(maxLen)); + return true; + } catch (Exception e) { + // ignore this exception as maxLen is unreadable + return false; + } + } + + // search for byte pattern in another byte array + private static int indexOf(final byte[] data, final byte[] pattern, final int index) { + if (data == null || data.length == 0 || pattern == null || pattern.length == 0 || + index > data.length || index < 0) { + return -1; + } + + int j = 0; + for (int i = index; i < data.length; i++) { + if (pattern[j] == data[i]) { + j++; + } else { + j = 0; + } + + if (j == pattern.length) { + return i - pattern.length + 1; + } + } + + return -1; + } + private static String getFormattedBloomFilters(int col, OrcProto.BloomFilterIndex[] bloomFilterIndex) { StringBuilder buf = new StringBuilder(); @@ -387,10 +738,25 @@ static Options createOptions() { .create('j')); result.addOption(OptionBuilder - .withLongOpt("pretty") - .withDescription("Pretty print json metadata output") - .create('p')); + .withLongOpt("pretty") + .withDescription("Pretty print json metadata output") + .create('p')); + + result.addOption(OptionBuilder + .withLongOpt("recover") + .withDescription("recover corrupted orc files generated by streaming") + .create()); + result.addOption(OptionBuilder + .withLongOpt("skip-dump") + .withDescription("used along with --recover to directly recover files without dumping") + .create()); + + result.addOption(OptionBuilder + .withLongOpt("backup-path") + .withDescription("used along with --recover to directly recover files without dumping") + .hasArg() + .create()); return result; } @@ -497,10 +863,7 @@ static void printObject(JSONWriter writer, } } - static void printJsonData(Configuration conf, - String filename) throws IOException, JSONException { - Path path = new Path(filename); - Reader reader = OrcFile.createReader(path.getFileSystem(conf), path); + static void printJsonData(final Reader reader) throws IOException, JSONException { PrintStream printStream = System.out; OutputStreamWriter out = new OutputStreamWriter(printStream, "UTF-8"); RecordReader rows = reader.rows(null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java index 7f673dc..f9a6d9e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/JsonFileDump.java @@ -19,14 +19,15 @@ import java.io.IOException; import java.util.List; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; +import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; -import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONStringer; import org.codehaus.jettison.json.JSONWriter; @@ -35,8 +36,10 @@ */ public class JsonFileDump { - public static void printJsonMetaData(List files, Configuration conf, - List rowIndexCols, boolean prettyPrint, boolean printTimeZone) throws JSONException, IOException { + public static void printJsonMetaData(List files, + Configuration conf, + List rowIndexCols, boolean prettyPrint, boolean printTimeZone) + throws JSONException, IOException { JSONStringer writer = new JSONStringer(); boolean multiFile = files.size() > 1; if (multiFile) { @@ -51,7 +54,11 @@ public static void printJsonMetaData(List files, Configuration conf, } writer.key("fileName").value(filename); Path path = new Path(filename); - Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + Reader reader = FileDump.getReader(path, conf, null); + if (reader == null) { + writer.key("status").value("FAILED"); + continue; + } writer.key("fileVersion").value(reader.getFileVersion().getName()); writer.key("writerVersion").value(reader.getWriterVersion()); RecordReaderImpl rows = (RecordReaderImpl) reader.rows(); @@ -179,8 +186,6 @@ public static void printJsonMetaData(List files, Configuration conf, writer.endObject(); } catch (Exception e) { writer.key("status").value("FAILED"); - System.err.println("Unable to dump data for file: " + filename); - e.printStackTrace(); throw e; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index ebe1afd..4aa95ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -17,9 +17,15 @@ */ package org.apache.hadoop.hive.ql.io.orc; -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Deque; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -36,15 +42,10 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Deque; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; +import com.google.common.annotations.VisibleForTesting; /** * Merges a base and a list of delta files together into a single stream of @@ -533,7 +534,7 @@ private void discoverKeyBounds(Reader reader, * @return the maximum size of the file to use * @throws IOException */ - private static long getLastFlushLength(FileSystem fs, + static long getLastFlushLength(FileSystem fs, Path deltaFile) throws IOException { Path lengths = OrcRecordUpdater.getSideFile(deltaFile); long result = Long.MAX_VALUE; diff --git a/ql/src/test/resources/orc-file-dump-bloomfilter.out b/ql/src/test/resources/orc-file-dump-bloomfilter.out index 90aee15..7c3db78 100644 --- a/ql/src/test/resources/orc-file-dump-bloomfilter.out +++ b/ql/src/test/resources/orc-file-dump-bloomfilter.out @@ -175,3 +175,5 @@ Stripes: File length: 273307 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + diff --git a/ql/src/test/resources/orc-file-dump-bloomfilter2.out b/ql/src/test/resources/orc-file-dump-bloomfilter2.out index a3a8c18..a4f006b 100644 --- a/ql/src/test/resources/orc-file-dump-bloomfilter2.out +++ b/ql/src/test/resources/orc-file-dump-bloomfilter2.out @@ -175,3 +175,5 @@ Stripes: File length: 298416 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + diff --git a/ql/src/test/resources/orc-file-dump-dictionary-threshold.out b/ql/src/test/resources/orc-file-dump-dictionary-threshold.out index a521db7..8ad856d 100644 --- a/ql/src/test/resources/orc-file-dump-dictionary-threshold.out +++ b/ql/src/test/resources/orc-file-dump-dictionary-threshold.out @@ -186,3 +186,5 @@ Stripes: File length: 2217685 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ + diff --git a/ql/src/test/resources/orc-file-dump.out b/ql/src/test/resources/orc-file-dump.out index d7ee117..5aaa0f3 100644 --- a/ql/src/test/resources/orc-file-dump.out +++ b/ql/src/test/resources/orc-file-dump.out @@ -191,3 +191,5 @@ Stripes: File length: 270923 bytes Padding length: 0 bytes Padding ratio: 0% +________________________________________________________________________________________________________________________ +