Index: cli/src/java/org/apache/hadoop/hive/cli/RCFileCat.java =================================================================== --- cli/src/java/org/apache/hadoop/hive/cli/RCFileCat.java (revision 1156839) +++ cli/src/java/org/apache/hadoop/hive/cli/RCFileCat.java (working copy) @@ -18,6 +18,16 @@ package org.apache.hadoop.hive.cli; +import java.io.BufferedOutputStream; +import java.io.FileDescriptor; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CodingErrorAction; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,10 +41,29 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; + public class RCFileCat implements Tool{ - + // Size of string buffer in bytes + private static final int STRING_BUFFER_SIZE = 16 * 1024; + // The size to flush the string buffer at + private static final int STRING_BUFFER_FLUSH_SIZE = 14 * 1024; + + // Size of stdout buffer in bytes + private static final int STDOUT_BUFFER_SIZE = 128 * 1024; + // In verbose mode, print an update per RECORD_PRINT_INTERVAL records + private static final int RECORD_PRINT_INTERVAL = (1024*1024); + + public RCFileCat() { + super(); + decoder = Charset.forName("UTF-8").newDecoder(). + onMalformedInput(CodingErrorAction.REPLACE). + onUnmappableCharacter(CodingErrorAction.REPLACE); + } + + private static CharsetDecoder decoder; + Configuration conf = null; - + private static String TAB ="\t"; private static String NEWLINE ="\r\n"; @@ -42,7 +71,10 @@ public int run(String[] args) throws Exception { long start = 0l; long length = -1l; - + int recordCount = 0; + long startT = System.currentTimeMillis(); + boolean verbose = false; + //get options from arguments if (args.length < 1 || args.length > 3) { printUsage(null); @@ -54,13 +86,16 @@ start = Long.parseLong(arg.substring("--start=".length())); } else if (arg.startsWith("--length=")) { length = Long.parseLong(arg.substring("--length=".length())); + } else if (arg.equals("--verbose")) { + verbose = true; } else if (fileName == null){ fileName = new Path(arg); } else { printUsage(null); } } - + + setupBufferedOutput(); FileSystem fs = FileSystem.get(fileName.toUri(), conf); long fileLen = fs.getFileStatus(fileName).getLen(); if (start < 0) { @@ -72,29 +107,61 @@ if (length < 0 || (start + length) > fileLen) { length = fileLen - start; } - + //share the code with RecordReader. FileSplit split = new FileSplit(fileName,start, length, new JobConf(conf)); RCFileRecordReader recordReader = new RCFileRecordReader(conf, split); LongWritable key = new LongWritable(); BytesRefArrayWritable value = new BytesRefArrayWritable(); - Text txt = new Text(); + StringBuilder buf = new StringBuilder(STRING_BUFFER_SIZE); // extra capacity in case we overrun, to avoid resizing while (recordReader.next(key, value)) { - txt.clear(); - for (int i = 0; i < value.size(); i++) { - BytesRefWritable v = value.get(i); - txt.set(v.getData(), v.getStart(), v.getLength()); - System.out.print(txt.toString()); - if (i < value.size() - 1) { - // do not put the TAB for the last column - System.out.print(RCFileCat.TAB); - } + printRecord(value, buf); + recordCount++; + if (verbose && (recordCount % RECORD_PRINT_INTERVAL) == 0) { + long now = System.currentTimeMillis(); + System.err.println("Read " + recordCount/1024 + "k records"); + System.err.println("Read " + ((recordReader.getPos() / (1024L*1024L))) + + "MB"); + System.err.printf("Input scan rate %.2f MB/s\n", + (recordReader.getPos() * 1.0 / (now - startT)) / 1024.0); } - System.out.print(RCFileCat.NEWLINE); + if (buf.length() > STRING_BUFFER_FLUSH_SIZE) { + System.out.print(buf.toString()); + buf.setLength(0); + } } + // print out last part of buffer + System.out.print(buf.toString()); + System.out.flush(); return 0; } + + /** + * Print record to string builder + * @param value + * @param buf + * @throws IOException + */ + private void printRecord(BytesRefArrayWritable value, StringBuilder buf) + throws IOException { + int n = value.size(); + if (n > 0) { + BytesRefWritable v = value.unCheckedGet(0); + ByteBuffer bb = ByteBuffer.wrap(v.getData(), v.getStart(), v.getLength()); + buf.append(decoder.decode(bb)); + for (int i = 1; i < n; i++) { + // do not put the TAB for the last column + buf.append(RCFileCat.TAB); + + v = value.unCheckedGet(i); + bb = ByteBuffer.wrap(v.getData(), v.getStart(), v.getLength()); + buf.append(decoder.decode(bb)); + } + buf.append(RCFileCat.NEWLINE); + } + } + @Override public Configuration getConf() { return conf; @@ -104,26 +171,39 @@ public void setConf(Configuration conf) { this.conf = conf; } - - private static String Usage = "RCFileCat [--start=start_offet] [--length=len] fileName"; - + + private static String Usage = "RCFileCat [--start=start_offet] [--length=len] [--verbose] fileName"; + public static void main(String[] args) { try { + Configuration conf = new Configuration(); RCFileCat instance = new RCFileCat(); instance.setConf(conf); + ToolRunner.run(instance, args); } catch (Exception e) { + e.printStackTrace(); + System.err.println("\n\n\n"); printUsage(e.getMessage()); } } - + + private static void setupBufferedOutput() { + FileOutputStream fdout = + new FileOutputStream(FileDescriptor.out); + BufferedOutputStream bos = + new BufferedOutputStream(fdout, STDOUT_BUFFER_SIZE); + PrintStream ps = + new PrintStream(bos, false); + System.setOut(ps); + } private static void printUsage(String errorMsg) { - System.out.println(Usage); + System.err.println(Usage); if(errorMsg != null) { - System.out.println(errorMsg); + System.err.println(errorMsg); } System.exit(1); } - + }