Index: cli/src/java/org/apache/hadoop/hive/cli/RCFileCat.java =================================================================== --- cli/src/java/org/apache/hadoop/hive/cli/RCFileCat.java (revision 1156839) +++ cli/src/java/org/apache/hadoop/hive/cli/RCFileCat.java (working copy) @@ -18,6 +18,16 @@ package org.apache.hadoop.hive.cli; +import java.io.BufferedOutputStream; +import java.io.FileDescriptor; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CodingErrorAction; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,8 +41,18 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; + public class RCFileCat implements Tool{ - + private static final int STRING_BUFFER_SIZE = 16 * 1024; + public RCFileCat() { + super(); + decoder = Charset.forName("UTF-8").newDecoder(). + onMalformedInput(CodingErrorAction.REPLACE). + onUnmappableCharacter(CodingErrorAction.REPLACE); + } + + private static CharsetDecoder decoder; + Configuration conf = null; private static String TAB ="\t"; @@ -42,11 +62,15 @@ public int run(String[] args) throws Exception { long start = 0l; long length = -1l; + int recordCount = 0; + long startT = System.currentTimeMillis(); + boolean verbose = false; //get options from arguments if (args.length < 1 || args.length > 3) { printUsage(null); } + setupBufferedOutput(); Path fileName = null; for (int i = 0; i < args.length; i++) { String arg = args[i]; @@ -54,6 +78,8 @@ start = Long.parseLong(arg.substring("--start=".length())); } else if (arg.startsWith("--length=")) { length = Long.parseLong(arg.substring("--length=".length())); + } else if (arg.equals("--verbose")) { + verbose = true; } else if (fileName == null){ fileName = new Path(arg); } else { @@ -78,23 +104,55 @@ RCFileRecordReader recordReader = new RCFileRecordReader(conf, split); LongWritable key = new LongWritable(); BytesRefArrayWritable value = new BytesRefArrayWritable(); - Text txt = new Text(); + StringBuilder buf = new StringBuilder(STRING_BUFFER_SIZE + 2048); // extra capacity in case we overrun, to avoid resizing while (recordReader.next(key, value)) { - txt.clear(); - for (int i = 0; i < value.size(); i++) { - BytesRefWritable v = value.get(i); - txt.set(v.getData(), v.getStart(), v.getLength()); - System.out.print(txt.toString()); - if (i < value.size() - 1) { - // do not put the TAB for the last column - System.out.print(RCFileCat.TAB); - } + printRecord(value, buf); + recordCount++; + if (verbose && (recordCount % (1024*1024)) == 0) { + long now = System.currentTimeMillis(); + System.err.println("Read " + recordCount/1024/1024 + "M records"); + System.err.println("Read " + ((recordReader.getPos() / (1024L*1024L))) + + "MB"); + System.err.printf("Input scan rate %.2f MB/s\n", + (recordReader.getPos() * 1.0 / (now - startT)) / 1024.0); } - System.out.print(RCFileCat.NEWLINE); + if (buf.length() > STRING_BUFFER_SIZE) { + System.out.print(buf.toString()); + buf.setLength(0); + } } + // print out last part of buffer + System.out.print(buf.toString()); + System.out.flush(); return 0; } + + /** + * Print record to string builder + * @param value + * @param buf + * @throws IOException + */ + private void printRecord(BytesRefArrayWritable value, StringBuilder buf) + throws IOException { + int n = value.size(); + if (n > 0) { + BytesRefWritable v = value.unCheckedGet(0); + ByteBuffer bb = ByteBuffer.wrap(v.getData(), v.getStart(), v.getLength()); + buf.append(decoder.decode(bb)); + for (int i = 1; i < n; i++) { + // do not put the TAB for the last column + buf.append(RCFileCat.TAB); + + v = value.unCheckedGet(i); + bb = ByteBuffer.wrap(v.getData(), v.getStart(), v.getLength()); + buf.append(decoder.decode(bb)); + } + buf.append(RCFileCat.NEWLINE); + } + } + @Override public Configuration getConf() { return conf; @@ -105,23 +163,36 @@ this.conf = conf; } - private static String Usage = "RCFileCat [--start=start_offet] [--length=len] fileName"; + private static String Usage = "RCFileCat [--start=start_offet] [--length=len] [--verbose] fileName"; public static void main(String[] args) { try { + Configuration conf = new Configuration(); RCFileCat instance = new RCFileCat(); instance.setConf(conf); + ToolRunner.run(instance, args); } catch (Exception e) { + e.printStackTrace(); + System.err.println("\n\n\n"); printUsage(e.getMessage()); } } + private static void setupBufferedOutput() { + FileOutputStream fdout = + new FileOutputStream(FileDescriptor.out); + BufferedOutputStream bos = + new BufferedOutputStream(fdout, 128*1024); + PrintStream ps = + new PrintStream(bos, false); + System.setOut(ps); + } private static void printUsage(String errorMsg) { - System.out.println(Usage); + System.err.println(Usage); if(errorMsg != null) { - System.out.println(errorMsg); + System.err.println(errorMsg); } System.exit(1); }