Index: contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleGroupConcat.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleGroupConcat.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleGroupConcat.java (working copy) @@ -24,61 +24,59 @@ import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; - /** - * This is a simple UDAF that concatenates all arguments from - * different rows into a single string. + * This is a simple UDAF that concatenates all arguments from different rows + * into a single string. * - * It should be very easy to follow and can be used as an example - * for writing new UDAFs. - * - * Note that Hive internally uses a different mechanism (called - * GenericUDAF) to implement built-in aggregation functions, which - * are harder to program but more efficient. + * It should be very easy to follow and can be used as an example for writing + * new UDAFs. + * + * Note that Hive internally uses a different mechanism (called GenericUDAF) to + * implement built-in aggregation functions, which are harder to program but + * more efficient. */ public class UDAFExampleGroupConcat extends UDAF { - + /** - * The actual class for doing the aggregation. - * Hive will automatically look for all internal classes of the UDAF - * that implements UDAFEvaluator. + * The actual class for doing the aggregation. Hive will automatically look + * for all internal classes of the UDAF that implements UDAFEvaluator. */ public static class UDAFExampleGroupConcatEvaluator implements UDAFEvaluator { - + ArrayList data; - + public UDAFExampleGroupConcatEvaluator() { super(); data = new ArrayList(); } - + /** * Reset the state of the aggregation. */ public void init() { data.clear(); } - + /** * Iterate through one row of original data. * * This UDF accepts arbitrary number of String arguments, so we use - * String[]. If it only accepts a single String, then we should use - * a single String argument. + * String[]. If it only accepts a single String, then we should use a single + * String argument. * * This function should always return true. */ public boolean iterate(String[] o) { if (o != null) { StringBuilder sb = new StringBuilder(); - for (int i = 0; i < o.length; i++) { - sb.append(o[i]); + for (String element : o) { + sb.append(element); } data.add(sb.toString()); } return true; } - + /** * Terminate a partial aggregation and return the state. */ @@ -89,8 +87,8 @@ /** * Merge with a partial aggregation. * - * This function should always have a single argument which has - * the same type as the return value of terminatePartial(). + * This function should always have a single argument which has the same + * type as the return value of terminatePartial(). * * This function should always return true. */ @@ -100,14 +98,14 @@ } return true; } - + /** * Terminates the aggregation and return the final result. */ public String terminate() { Collections.sort(data); StringBuilder sb = new StringBuilder(); - for (int i=0; i and HashMap if needed. + * ArrayList and HashMap if needed. */ public static class UDAFAvgState { private long mCount; private double mSum; } - + /** - * The actual class for doing the aggregation. - * Hive will automatically look for all internal classes of the UDAF - * that implements UDAFEvaluator. + * The actual class for doing the aggregation. Hive will automatically look + * for all internal classes of the UDAF that implements UDAFEvaluator. */ public static class UDAFExampleAvgEvaluator implements UDAFEvaluator { - + UDAFAvgState state; - + public UDAFExampleAvgEvaluator() { super(); state = new UDAFAvgState(); init(); } - + /** * Reset the state of the aggregation. */ @@ -71,27 +69,26 @@ state.mSum = 0; state.mCount = 0; } - + /** * Iterate through one row of original data. * - * The number and type of arguments need to the same as we call - * this UDAF from Hive command line. + * The number and type of arguments need to the same as we call this UDAF + * from Hive command line. * * This function should always return true. */ public boolean iterate(Double o) { if (o != null) { state.mSum += o; - state.mCount ++; + state.mCount++; } return true; } - + /** - * Terminate a partial aggregation and return the state. - * If the state is a primitive, just return primitive Java classes - * like Integer or String. + * Terminate a partial aggregation and return the state. If the state is a + * primitive, just return primitive Java classes like Integer or String. */ public UDAFAvgState terminatePartial() { // This is SQL standard - average of zero items should be null. @@ -101,8 +98,8 @@ /** * Merge with a partial aggregation. * - * This function should always have a single argument which has - * the same type as the return value of terminatePartial(). + * This function should always have a single argument which has the same + * type as the return value of terminatePartial(). */ public boolean merge(UDAFAvgState o) { if (o != null) { @@ -111,14 +108,15 @@ } return true; } - + /** * Terminates the aggregation and return the final result. */ public Double terminate() { // This is SQL standard - average of zero items should be null. - return state.mCount == 0 ? null : Double.valueOf(state.mSum / state.mCount); + return state.mCount == 0 ? null : Double.valueOf(state.mSum + / state.mCount); } } - + } Index: contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/udtf/example/GenericUDTFExplode2.java (working copy) @@ -30,51 +30,49 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -@description( - name = "explode2", - value = "_FUNC_(a) - like explode, but outputs two identical columns (for "+ - "testing purposes)" -) +@description(name = "explode2", value = "_FUNC_(a) - like explode, but outputs two identical columns (for " + + "testing purposes)") public class GenericUDTFExplode2 extends GenericUDTF { ListObjectInspector listOI = null; - + @Override - public void close() throws HiveException{ + public void close() throws HiveException { } - + @Override - public StructObjectInspector initialize(ObjectInspector [] args) - throws UDFArgumentException { - + public StructObjectInspector initialize(ObjectInspector[] args) + throws UDFArgumentException { + if (args.length != 1) { throw new UDFArgumentException("explode() takes only one argument"); } - + if (args[0].getCategory() != ObjectInspector.Category.LIST) { throw new UDFArgumentException("explode() takes an array as a parameter"); } - listOI = (ListObjectInspector)args[0]; - + listOI = (ListObjectInspector) args[0]; + ArrayList fieldNames = new ArrayList(); ArrayList fieldOIs = new ArrayList(); fieldNames.add("col1"); fieldNames.add("col2"); fieldOIs.add(listOI.getListElementObjectInspector()); fieldOIs.add(listOI.getListElementObjectInspector()); - return ObjectInspectorFactory.getStandardStructObjectInspector( - fieldNames, fieldOIs); + return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, + fieldOIs); } Object forwardObj[] = new Object[2]; + @Override - public void process(Object [] o) throws HiveException { - + public void process(Object[] o) throws HiveException { + List list = listOI.getList(o[0]); for (Object r : list) { forwardObj[0] = r; forwardObj[1] = r; - this.forward(forwardObj); + forward(forwardObj); } } Index: contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/serde2/TypedBytesSerDe.java (working copy) @@ -29,6 +29,8 @@ import org.apache.hadoop.hive.contrib.util.typedbytes.Type; import org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesWritableInput; import org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesWritableOutput; +import org.apache.hadoop.hive.ql.io.NonSyncDataInputBuffer; +import org.apache.hadoop.hive.ql.io.NonSyncDataOutputBuffer; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -60,20 +62,19 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.hive.ql.io.NonSyncDataOutputBuffer; -import org.apache.hadoop.hive.ql.io.NonSyncDataInputBuffer; /** * TypedBytesSerDe uses typed bytes to serialize/deserialize. - * + * * More info on the typedbytes stuff that Dumbo uses. - * http://issues.apache.org/jira/browse/HADOOP-1722 - * A fast python decoder for this, which is apparently 25% faster than the python version is available at + * http://issues.apache.org/jira/browse/HADOOP-1722 A fast python decoder for + * this, which is apparently 25% faster than the python version is available at * http://github.com/klbostee/ctypedbytes/tree/master */ public class TypedBytesSerDe implements SerDe { - public static final Log LOG = LogFactory.getLog(TypedBytesSerDe.class.getName()); + public static final Log LOG = LogFactory.getLog(TypedBytesSerDe.class + .getName()); int numColumns; StructObjectInspector rowOI; @@ -86,7 +87,7 @@ NonSyncDataInputBuffer inBarrStr; TypedBytesWritableInput tbIn; - List columnNames; + List columnNames; List columnTypes; @Override @@ -110,8 +111,7 @@ List columnTypeProps = Arrays.asList(columnTypeProperty.split(",")); for (String colType : columnTypeProps) { - columnTypes.add(TypeInfoUtils - .getTypeInfoFromTypeString(colType)); + columnTypes.add(TypeInfoUtils.getTypeInfoFromTypeString(colType)); } assert columnNames.size() == columnTypes.size(); @@ -121,8 +121,8 @@ for (int c = 0; c < numColumns; c++) { if (columnTypes.get(c).getCategory() != Category.PRIMITIVE) { throw new SerDeException(getClass().getName() - + " only accepts primitive columns, but column[" + c - + "] named " + columnNames.get(c) + " has category " + + " only accepts primitive columns, but column[" + c + "] named " + + columnNames.get(c) + " has category " + columnTypes.get(c).getCategory()); } } @@ -130,13 +130,16 @@ // Constructing the row ObjectInspector: // The row consists of some string columns, each column will be a java // String object. - List columnOIs = new ArrayList(columnNames.size()); + List columnOIs = new ArrayList( + columnNames.size()); for (int c = 0; c < numColumns; c++) { - columnOIs.add(TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(columnTypes.get(c))); + columnOIs.add(TypeInfoUtils + .getStandardWritableObjectInspectorFromTypeInfo(columnTypes.get(c))); } // StandardStruct uses ArrayList to store the row. - rowOI = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs); + rowOI = ObjectInspectorFactory.getStandardStructObjectInspector( + columnNames, columnOIs); // Constructing the row object, etc, which will be reused for all rows. row = new ArrayList(numColumns); @@ -158,12 +161,12 @@ @Override public Object deserialize(Writable blob) throws SerDeException { - BytesWritable data = (BytesWritable)blob; + BytesWritable data = (BytesWritable) blob; inBarrStr.reset(data.get(), 0, data.getSize()); try { - for (int i=0; i < columnNames.size(); i++) { + for (int i = 0; i < columnNames.size(); i++) { row.set(i, deserializeField(tbIn, columnTypes.get(i), row.get(i))); } @@ -177,83 +180,89 @@ return row; } - static Object deserializeField(TypedBytesWritableInput in, TypeInfo type, Object reuse) throws IOException { + static Object deserializeField(TypedBytesWritableInput in, TypeInfo type, + Object reuse) throws IOException { // read the type in.readType(); switch (type.getCategory()) { - case PRIMITIVE: { - PrimitiveTypeInfo ptype = (PrimitiveTypeInfo)type; - switch (ptype.getPrimitiveCategory()) { + case PRIMITIVE: { + PrimitiveTypeInfo ptype = (PrimitiveTypeInfo) type; + switch (ptype.getPrimitiveCategory()) { - case VOID: { - return null; - } + case VOID: { + return null; + } - case BOOLEAN: { - BooleanWritable r = reuse == null ? new BooleanWritable() : (BooleanWritable)reuse; - r = (BooleanWritable)in.readBoolean(r); - return r; - } - case BYTE: { - ByteWritable r = reuse == null ? new ByteWritable() : (ByteWritable)reuse; - r = (ByteWritable)in.readByte(r); - return r; - } - case SHORT: { - ShortWritable r = reuse == null ? new ShortWritable() : (ShortWritable)reuse; - r = (ShortWritable)in.readShort(r); - return r; - } - case INT: { - IntWritable r = reuse == null ? new IntWritable() : (IntWritable)reuse; - r = (IntWritable)in.readInt(r); - return r; - } - case LONG: { - LongWritable r = reuse == null ? new LongWritable() : (LongWritable)reuse; - r = (LongWritable)in.readLong(r); - return r; - } - case FLOAT: { - FloatWritable r = reuse == null ? new FloatWritable() : (FloatWritable)reuse; - r = (FloatWritable)in.readFloat(r); - return r; - } - case DOUBLE: { - DoubleWritable r = reuse == null ? new DoubleWritable() : (DoubleWritable)reuse; - r = (DoubleWritable)in.readDouble(r); - return r; - } - case STRING: { - Text r = reuse == null ? new Text() : (Text)reuse; - r = (Text)in.readText(r); - return r; - } - default: { - throw new RuntimeException("Unrecognized type: " + ptype.getPrimitiveCategory()); - } - } + case BOOLEAN: { + BooleanWritable r = reuse == null ? new BooleanWritable() + : (BooleanWritable) reuse; + r = in.readBoolean(r); + return r; } - // Currently, deserialization of complex types is not supported - case LIST: - case MAP: - case STRUCT: + case BYTE: { + ByteWritable r = reuse == null ? new ByteWritable() + : (ByteWritable) reuse; + r = in.readByte(r); + return r; + } + case SHORT: { + ShortWritable r = reuse == null ? new ShortWritable() + : (ShortWritable) reuse; + r = in.readShort(r); + return r; + } + case INT: { + IntWritable r = reuse == null ? new IntWritable() : (IntWritable) reuse; + r = in.readInt(r); + return r; + } + case LONG: { + LongWritable r = reuse == null ? new LongWritable() + : (LongWritable) reuse; + r = in.readLong(r); + return r; + } + case FLOAT: { + FloatWritable r = reuse == null ? new FloatWritable() + : (FloatWritable) reuse; + r = in.readFloat(r); + return r; + } + case DOUBLE: { + DoubleWritable r = reuse == null ? new DoubleWritable() + : (DoubleWritable) reuse; + r = in.readDouble(r); + return r; + } + case STRING: { + Text r = reuse == null ? new Text() : (Text) reuse; + r = in.readText(r); + return r; + } default: { - throw new RuntimeException("Unsupported category: " + type.getCategory()); + throw new RuntimeException("Unrecognized type: " + + ptype.getPrimitiveCategory()); } + } } + // Currently, deserialization of complex types is not supported + case LIST: + case MAP: + case STRUCT: + default: { + throw new RuntimeException("Unsupported category: " + type.getCategory()); + } + } } - - @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { try { barrStr.reset(); - StructObjectInspector soi = (StructObjectInspector)objInspector; + StructObjectInspector soi = (StructObjectInspector) objInspector; List fields = soi.getAllStructFieldRefs(); for (int i = 0; i < numColumns; i++) { @@ -272,88 +281,96 @@ return serializeBytesWritable; } - private void serializeField(Object o, ObjectInspector oi, Object reuse) throws IOException { + private void serializeField(Object o, ObjectInspector oi, Object reuse) + throws IOException { switch (oi.getCategory()) { - case PRIMITIVE: { - PrimitiveObjectInspector poi = (PrimitiveObjectInspector)oi; - switch (poi.getPrimitiveCategory()) { - case VOID: { - return; - } - case BOOLEAN: { - BooleanObjectInspector boi = (BooleanObjectInspector)poi; - BooleanWritable r = reuse == null ? new BooleanWritable() : (BooleanWritable)reuse; - r.set(boi.get(o)); - tbOut.write(r); - return; - } - case BYTE: { - ByteObjectInspector boi = (ByteObjectInspector)poi; - ByteWritable r = reuse == null ? new ByteWritable() : (ByteWritable)reuse; - r.set(boi.get(o)); - tbOut.write(r); - return; - } - case SHORT: { - ShortObjectInspector spoi = (ShortObjectInspector)poi; - ShortWritable r = reuse == null ? new ShortWritable() : (ShortWritable)reuse; - r.set(spoi.get(o)); - tbOut.write(r); - return; - } - case INT: { - IntObjectInspector ioi = (IntObjectInspector)poi; - IntWritable r = reuse == null ? new IntWritable() : (IntWritable)reuse; - r.set(ioi.get(o)); - tbOut.write(r); - return; - } - case LONG: { - LongObjectInspector loi = (LongObjectInspector)poi; - LongWritable r = reuse == null ? new LongWritable() : (LongWritable)reuse; - r.set(loi.get(o)); - tbOut.write(r); - return; - } - case FLOAT: { - FloatObjectInspector foi = (FloatObjectInspector)poi; - FloatWritable r = reuse == null ? new FloatWritable() : (FloatWritable)reuse; - r.set(foi.get(o)); - tbOut.write(r); - return; - } - case DOUBLE: { - DoubleObjectInspector doi = (DoubleObjectInspector)poi; - DoubleWritable r = reuse == null ? new DoubleWritable() : (DoubleWritable)reuse; - r.set(doi.get(o)); - tbOut.write(r); - return; - } - case STRING: { - StringObjectInspector soi = (StringObjectInspector)poi; - Text t = soi.getPrimitiveWritableObject(o); - tbOut.write(t); - return; - } - default: { - throw new RuntimeException("Unrecognized type: " + poi.getPrimitiveCategory()); - } - } + case PRIMITIVE: { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi; + switch (poi.getPrimitiveCategory()) { + case VOID: { + return; } - case LIST: - case MAP: - case STRUCT: { - // For complex object, serialize to JSON format - String s = SerDeUtils.getJSONString(o, oi); - Text t = reuse == null ? new Text() : (Text)reuse; - - // convert to Text and write it - t.set(s); + case BOOLEAN: { + BooleanObjectInspector boi = (BooleanObjectInspector) poi; + BooleanWritable r = reuse == null ? new BooleanWritable() + : (BooleanWritable) reuse; + r.set(boi.get(o)); + tbOut.write(r); + return; + } + case BYTE: { + ByteObjectInspector boi = (ByteObjectInspector) poi; + ByteWritable r = reuse == null ? new ByteWritable() + : (ByteWritable) reuse; + r.set(boi.get(o)); + tbOut.write(r); + return; + } + case SHORT: { + ShortObjectInspector spoi = (ShortObjectInspector) poi; + ShortWritable r = reuse == null ? new ShortWritable() + : (ShortWritable) reuse; + r.set(spoi.get(o)); + tbOut.write(r); + return; + } + case INT: { + IntObjectInspector ioi = (IntObjectInspector) poi; + IntWritable r = reuse == null ? new IntWritable() : (IntWritable) reuse; + r.set(ioi.get(o)); + tbOut.write(r); + return; + } + case LONG: { + LongObjectInspector loi = (LongObjectInspector) poi; + LongWritable r = reuse == null ? new LongWritable() + : (LongWritable) reuse; + r.set(loi.get(o)); + tbOut.write(r); + return; + } + case FLOAT: { + FloatObjectInspector foi = (FloatObjectInspector) poi; + FloatWritable r = reuse == null ? new FloatWritable() + : (FloatWritable) reuse; + r.set(foi.get(o)); + tbOut.write(r); + return; + } + case DOUBLE: { + DoubleObjectInspector doi = (DoubleObjectInspector) poi; + DoubleWritable r = reuse == null ? new DoubleWritable() + : (DoubleWritable) reuse; + r.set(doi.get(o)); + tbOut.write(r); + return; + } + case STRING: { + StringObjectInspector soi = (StringObjectInspector) poi; + Text t = soi.getPrimitiveWritableObject(o); tbOut.write(t); + return; } default: { - throw new RuntimeException("Unrecognized type: " + oi.getCategory()); + throw new RuntimeException("Unrecognized type: " + + poi.getPrimitiveCategory()); } + } } + case LIST: + case MAP: + case STRUCT: { + // For complex object, serialize to JSON format + String s = SerDeUtils.getJSONString(o, oi); + Text t = reuse == null ? new Text() : (Text) reuse; + + // convert to Text and write it + t.set(s); + tbOut.write(t); + } + default: { + throw new RuntimeException("Unrecognized type: " + oi.getCategory()); + } + } } } Index: contrib/src/java/org/apache/hadoop/hive/contrib/serde2/s3/S3LogDeserializer.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/serde2/s3/S3LogDeserializer.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/serde2/s3/S3LogDeserializer.java (working copy) @@ -18,17 +18,15 @@ package org.apache.hadoop.hive.contrib.serde2.s3; - import java.nio.charset.CharacterCodingException; -import java.util.Date; import java.util.List; import java.util.Properties; -import java.util.regex.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.contrib.serde2.s3.S3LogStruct; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -38,20 +36,20 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import java.text.SimpleDateFormat; - public class S3LogDeserializer implements Deserializer { - public static final Log LOG = LogFactory.getLog(S3LogDeserializer.class.getName()); + public static final Log LOG = LogFactory.getLog(S3LogDeserializer.class + .getName()); static { StackTraceElement[] sTrace = new Exception().getStackTrace(); - String className = sTrace[0].getClassName(); + sTrace[0].getClassName(); } - + private ObjectInspector cachedObjectInspector; - + + @Override public String toString() { return "S3ZemantaDeserializer[]"; } @@ -59,60 +57,64 @@ public S3LogDeserializer() throws SerDeException { } - // This regex is a bit lax in order to compensate for lack of any escaping done by Amazon S3 ... for example useragent string can have double quotes inside! - static Pattern regexpat = Pattern.compile( "(\\S+) (\\S+) \\[(.*?)\\] (\\S+) (\\S+) (\\S+) (\\S+) (\\S+) \"(.+)\" (\\S+) (\\S+) (\\S+) (\\S+) (\\S+) (\\S+) \"(.*)\" \"(.*)\""); - //static Pattern regexrid = Pattern.compile("x-id=([-0-9a-f]{36})"); - //static SimpleDateFormat dateparser = new SimpleDateFormat("dd/MMM/yyyy:hh:mm:ss ZZZZZ"); - + // This regex is a bit lax in order to compensate for lack of any escaping + // done by Amazon S3 ... for example useragent string can have double quotes + // inside! + static Pattern regexpat = Pattern + .compile("(\\S+) (\\S+) \\[(.*?)\\] (\\S+) (\\S+) (\\S+) (\\S+) (\\S+) \"(.+)\" (\\S+) (\\S+) (\\S+) (\\S+) (\\S+) (\\S+) \"(.*)\" \"(.*)\""); + // static Pattern regexrid = Pattern.compile("x-id=([-0-9a-f]{36})"); + // static SimpleDateFormat dateparser = new + // SimpleDateFormat("dd/MMM/yyyy:hh:mm:ss ZZZZZ"); + S3LogStruct deserializeCache = new S3LogStruct(); - public void initialize(Configuration job, Properties tbl) throws SerDeException { - cachedObjectInspector = ObjectInspectorFactory.getReflectionObjectInspector( - S3LogStruct.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); - + public void initialize(Configuration job, Properties tbl) + throws SerDeException { + + cachedObjectInspector = ObjectInspectorFactory + .getReflectionObjectInspector(S3LogStruct.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + LOG.debug(getClass().getName() + ": initialized"); } - - public static Integer toInt(String s) - { - if (s.compareTo("-") == 0) + + public static Integer toInt(String s) { + if (s.compareTo("-") == 0) { return null; - else + } else { return Integer.valueOf(s); + } } - + public static Object deserialize(S3LogStruct c, String row) throws Exception { Matcher match = regexpat.matcher(row); int t = 1; try { match.matches(); - c.bucketowner = match.group(t++); - c.bucketname = match.group(t++); + c.bucketowner = match.group(t++); + c.bucketname = match.group(t++); } catch (Exception e) { throw new SerDeException("S3 Log Regex did not match:" + row, e); - } - c.rdatetime = match.group(t++); - - // Should we convert the datetime to the format Hive understands by default - either yyyy-mm-dd HH:MM:SS or seconds since epoch? - //Date d = dateparser.parse(c.rdatetime); - //c.rdatetimeepoch = d.getTime() / 1000; - + } + c.rdatetime = match.group(t++); + + // Should we convert the datetime to the format Hive understands by default + // - either yyyy-mm-dd HH:MM:SS or seconds since epoch? + // Date d = dateparser.parse(c.rdatetime); + // c.rdatetimeepoch = d.getTime() / 1000; + c.rip = match.group(t++); c.requester = match.group(t++); c.requestid = match.group(t++); c.operation = match.group(t++); c.rkey = match.group(t++); - c.requesturi= match.group(t++); -// System.err.println(c.requesturi); -/*// Zemanta specific data extractor - try { - Matcher m2 = regexrid.matcher(c.requesturi); - m2.find(); - c.rid = m2.group(1); - } catch (Exception e) { - c.rid = null; - } - */ + c.requesturi = match.group(t++); + // System.err.println(c.requesturi); + /* + * // Zemanta specific data extractor try { Matcher m2 = + * regexrid.matcher(c.requesturi); m2.find(); c.rid = m2.group(1); } catch + * (Exception e) { c.rid = null; } + */ c.httpstatus = toInt(match.group(t++)); c.errorcode = match.group(t++); c.bytessent = toInt(match.group(t++)); @@ -121,15 +123,14 @@ c.turnaroundtime = toInt(match.group(t++)); c.referer = match.group(t++); c.useragent = match.group(t++); - - + return (c); } - + public Object deserialize(Writable field) throws SerDeException { String row = null; if (field instanceof BytesWritable) { - BytesWritable b = (BytesWritable)field; + BytesWritable b = (BytesWritable) field; try { row = Text.decode(b.get(), 0, b.getSize()); } catch (CharacterCodingException e) { @@ -142,19 +143,17 @@ deserialize(deserializeCache, row); return deserializeCache; } catch (ClassCastException e) { - throw new SerDeException( this.getClass().getName() + " expects Text or BytesWritable", e); + throw new SerDeException(this.getClass().getName() + + " expects Text or BytesWritable", e); } catch (Exception e) { throw new SerDeException(e); } } - - + public ObjectInspector getObjectInspector() throws SerDeException { return cachedObjectInspector; } - - /** * @param args */ @@ -164,33 +163,38 @@ S3LogDeserializer serDe = new S3LogDeserializer(); Configuration conf = new Configuration(); Properties tbl = new Properties(); - // Some nasty examples that show how S3 log format is broken ... and to test the regex + // Some nasty examples that show how S3 log format is broken ... and to + // test the regex // These are all sourced from genuine S3 logs - //Text sample = new Text("04ff331638adc13885d6c42059584deabbdeabcd55bf0bee491172a79a87b196 img.zemanta.com [09/Apr/2009:22:00:07 +0000] 190.225.84.114 65a011a29cdf8ec533ec3d1ccaae921c F4FC3FEAD8C00024 REST.GET.OBJECT pixy.gif \"GET /pixy.gif?x-id=23d25db1-160b-48bb-a932-e7dc1e88c321 HTTP/1.1\" 304 - - 828 3 - \"http://www.viamujer.com/2009/03/horoscopo-acuario-abril-mayo-y-junio-2009/\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)\""); - //Text sample = new Text("04ff331638adc13885d6c42059584deabbdeabcd55bf0bee491172a79a87b196 img.zemanta.com [09/Apr/2009:22:19:49 +0000] 60.28.204.7 65a011a29cdf8ec533ec3d1ccaae921c 7D87B6835125671E REST.GET.OBJECT pixy.gif \"GET /pixy.gif?x-id=b50a4544-938b-4a63-992c-721d1a644b28 HTTP/1.1\" 200 - 828 828 4 3 \"\" \"ZhuaXia.com\""); - //Text sample = new Text("04ff331638adc13885d6c42059584deabbdeabcd55bf0bee491172a79a87b196 static.zemanta.com [09/Apr/2009:23:12:39 +0000] 65.94.12.181 65a011a29cdf8ec533ec3d1ccaae921c EEE6FFE9B9F9EA29 REST.HEAD.OBJECT readside/loader.js%22+defer%3D%22defer \"HEAD /readside/loader.js\"+defer=\"defer HTTP/1.0\" 403 AccessDenied 231 - 7 - \"-\" \"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0)\""); - Text sample = new Text("04ff331638adc13885d6c42059584deabbdeabcd55bf0bee491172a79a87b196 img.zemanta.com [10/Apr/2009:05:34:01 +0000] 70.32.81.92 65a011a29cdf8ec533ec3d1ccaae921c F939A7D698D27C63 REST.GET.OBJECT reblog_b.png \"GET /reblog_b.png?x-id=79ca9376-6326-41b7-9257-eea43d112eb2 HTTP/1.0\" 200 - 1250 1250 160 159 \"-\" \"Firefox 0.8 (Linux)\" useragent=\"Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.6) Gecko/20040614 Firefox/0.8\""); + // Text sample = new + // Text("04ff331638adc13885d6c42059584deabbdeabcd55bf0bee491172a79a87b196 img.zemanta.com [09/Apr/2009:22:00:07 +0000] 190.225.84.114 65a011a29cdf8ec533ec3d1ccaae921c F4FC3FEAD8C00024 REST.GET.OBJECT pixy.gif \"GET /pixy.gif?x-id=23d25db1-160b-48bb-a932-e7dc1e88c321 HTTP/1.1\" 304 - - 828 3 - \"http://www.viamujer.com/2009/03/horoscopo-acuario-abril-mayo-y-junio-2009/\" \"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)\""); + // Text sample = new + // Text("04ff331638adc13885d6c42059584deabbdeabcd55bf0bee491172a79a87b196 img.zemanta.com [09/Apr/2009:22:19:49 +0000] 60.28.204.7 65a011a29cdf8ec533ec3d1ccaae921c 7D87B6835125671E REST.GET.OBJECT pixy.gif \"GET /pixy.gif?x-id=b50a4544-938b-4a63-992c-721d1a644b28 HTTP/1.1\" 200 - 828 828 4 3 \"\" \"ZhuaXia.com\""); + // Text sample = new + // Text("04ff331638adc13885d6c42059584deabbdeabcd55bf0bee491172a79a87b196 static.zemanta.com [09/Apr/2009:23:12:39 +0000] 65.94.12.181 65a011a29cdf8ec533ec3d1ccaae921c EEE6FFE9B9F9EA29 REST.HEAD.OBJECT readside/loader.js%22+defer%3D%22defer \"HEAD /readside/loader.js\"+defer=\"defer HTTP/1.0\" 403 AccessDenied 231 - 7 - \"-\" \"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0)\""); + Text sample = new Text( + "04ff331638adc13885d6c42059584deabbdeabcd55bf0bee491172a79a87b196 img.zemanta.com [10/Apr/2009:05:34:01 +0000] 70.32.81.92 65a011a29cdf8ec533ec3d1ccaae921c F939A7D698D27C63 REST.GET.OBJECT reblog_b.png \"GET /reblog_b.png?x-id=79ca9376-6326-41b7-9257-eea43d112eb2 HTTP/1.0\" 200 - 1250 1250 160 159 \"-\" \"Firefox 0.8 (Linux)\" useragent=\"Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.6) Gecko/20040614 Firefox/0.8\""); serDe.initialize(conf, tbl); Object row = serDe.deserialize(sample); System.err.println(serDe.getObjectInspector().getClass().toString()); - ReflectionStructObjectInspector oi = (ReflectionStructObjectInspector)serDe.getObjectInspector(); + ReflectionStructObjectInspector oi = (ReflectionStructObjectInspector) serDe + .getObjectInspector(); List fieldRefs = oi.getAllStructFieldRefs(); for (int i = 0; i < fieldRefs.size(); i++) { System.err.println(fieldRefs.get(i).toString()); Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); - if (fieldData == null) + if (fieldData == null) { System.err.println("null"); - else + } else { System.err.println(fieldData.toString()); + } } } catch (Exception e) { System.err.println("Caught: " + e); e.printStackTrace(); } - + } - - } Index: contrib/src/java/org/apache/hadoop/hive/contrib/serde2/s3/S3LogStruct.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/serde2/s3/S3LogStruct.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/serde2/s3/S3LogStruct.java (working copy) @@ -5,7 +5,8 @@ public String bucketowner; public String bucketname; public String rdatetime; -// public Long rdatetimeepoch; // The format Hive understands by default, should we convert? + // public Long rdatetimeepoch; // The format Hive understands by default, + // should we convert? public String rip; public String requester; public String requestid; @@ -20,5 +21,5 @@ public Integer turnaroundtime; public String referer; public String useragent; -// public String rid; // Specific Zemanta use + // public String rid; // Specific Zemanta use } Index: contrib/src/java/org/apache/hadoop/hive/contrib/serde2/RegexSerDe.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/serde2/RegexSerDe.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/serde2/RegexSerDe.java (working copy) @@ -46,56 +46,55 @@ /** * RegexSerDe uses regular expression (regex) to serialize/deserialize. * - * It can deserialize the data using regex and extracts groups as columns. - * It can also serialize the row object using a format string. + * It can deserialize the data using regex and extracts groups as columns. It + * can also serialize the row object using a format string. * - * In deserialization stage, if a row does not match the regex, then all - * columns in the row will be NULL. If a row matches the regex but has - * less than expected groups, the missing groups will be NULL. If a row - * matches the regex but has more than expected groups, the additional - * groups are just ignored. + * In deserialization stage, if a row does not match the regex, then all columns + * in the row will be NULL. If a row matches the regex but has less than + * expected groups, the missing groups will be NULL. If a row matches the regex + * but has more than expected groups, the additional groups are just ignored. * - * In serialization stage, it uses java string formatter to format the - * columns into a row. If the output type of the column in a query is - * not a string, it will be automatically converted to String by Hive. + * In serialization stage, it uses java string formatter to format the columns + * into a row. If the output type of the column in a query is not a string, it + * will be automatically converted to String by Hive. * - * For the format of the format String, please refer to - * {@link http://java.sun.com/j2se/1.5.0/docs/api/java/util/Formatter.html#syntax} + * For the format of the format String, please refer to {@link http + * ://java.sun.com/j2se/1.5.0/docs/api/java/util/Formatter.html#syntax} * - * NOTE: Obviously, all columns have to be strings. - * Users can use "CAST(a AS INT)" to convert columns to other types. + * NOTE: Obviously, all columns have to be strings. Users can use + * "CAST(a AS INT)" to convert columns to other types. * - * NOTE: This implementation is using String, and javaStringObjectInspector. - * A more efficient implementation should use UTF-8 encoded Text and - * writableStringObjectInspector. We should switch to that when we have a - * UTF-8 based Regex library. + * NOTE: This implementation is using String, and javaStringObjectInspector. A + * more efficient implementation should use UTF-8 encoded Text and + * writableStringObjectInspector. We should switch to that when we have a UTF-8 + * based Regex library. */ public class RegexSerDe implements SerDe { public static final Log LOG = LogFactory.getLog(RegexSerDe.class.getName()); - + int numColumns; String inputRegex; String outputFormatString; - + Pattern inputPattern; - + StructObjectInspector rowOI; ArrayList row; - + @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { - + // We can get the table definition from tbl. - + // Read the configuration parameters inputRegex = tbl.getProperty("input.regex"); outputFormatString = tbl.getProperty("output.format.string"); String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS); String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); - boolean inputRegexIgnoreCase = - "true".equalsIgnoreCase(tbl.getProperty("input.regex.case.insensitive")); + boolean inputRegexIgnoreCase = "true".equalsIgnoreCase(tbl + .getProperty("input.regex.case.insensitive")); // Parse the configuration parameters if (inputRegex != null) { @@ -108,28 +107,29 @@ List columnTypes = TypeInfoUtils .getTypeInfosFromTypeString(columnTypeProperty); assert columnNames.size() == columnTypes.size(); - numColumns = columnNames.size(); - + numColumns = columnNames.size(); + // All columns have to be of type STRING. for (int c = 0; c < numColumns; c++) { if (!columnTypes.get(c).equals(TypeInfoFactory.stringTypeInfo)) { - throw new SerDeException(getClass().getName() - + " only accepts string columns, but column[" + c - + "] named " + columnNames.get(c) + " has type " - + columnTypes.get(c)); + throw new SerDeException(getClass().getName() + + " only accepts string columns, but column[" + c + "] named " + + columnNames.get(c) + " has type " + columnTypes.get(c)); } } - + // Constructing the row ObjectInspector: - // The row consists of some string columns, each column will be a java + // The row consists of some string columns, each column will be a java // String object. - List columnOIs = new ArrayList(columnNames.size()); + List columnOIs = new ArrayList( + columnNames.size()); for (int c = 0; c < numColumns; c++) { columnOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); } - // StandardStruct uses ArrayList to store the row. - rowOI = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs); - + // StandardStruct uses ArrayList to store the row. + rowOI = ObjectInspectorFactory.getStandardStructObjectInspector( + columnNames, columnOIs); + // Constructing the row object, etc, which will be reused for all rows. row = new ArrayList(numColumns); for (int c = 0; c < numColumns; c++) { @@ -149,7 +149,7 @@ return Text.class; } - // Number of rows not matching the regex + // Number of rows not matching the regex long unmatchedRows = 0; long nextUnmatchedRows = 1; // Number of rows that match the regex but have missing groups. @@ -157,44 +157,45 @@ long nextPartialMatchedRows = 1; long getNextNumberToDisplay(long now) { - return now*10; + return now * 10; } - + @Override public Object deserialize(Writable blob) throws SerDeException { if (inputPattern == null) { - throw new SerDeException("This table does not have serde property \"input.regex\"!"); + throw new SerDeException( + "This table does not have serde property \"input.regex\"!"); } - Text rowText = (Text)blob; - + Text rowText = (Text) blob; + Matcher m = inputPattern.matcher(rowText.toString()); - + // If do not match, ignore the line, return a row with all nulls. if (!m.matches()) { - unmatchedRows ++; + unmatchedRows++; if (unmatchedRows >= nextUnmatchedRows) { nextUnmatchedRows = getNextNumberToDisplay(nextUnmatchedRows); // Report the row - LOG.warn("" + unmatchedRows + " unmatched rows are found: " - + rowText); + LOG.warn("" + unmatchedRows + " unmatched rows are found: " + rowText); } return null; } - + // Otherwise, return the row. for (int c = 0; c < numColumns; c++) { try { row.set(c, m.group(c + 1)); } catch (RuntimeException e) { - partialMatchedRows ++; + partialMatchedRows++; if (partialMatchedRows >= nextPartialMatchedRows) { nextPartialMatchedRows = getNextNumberToDisplay(nextPartialMatchedRows); // Report the row - LOG.warn("" + partialMatchedRows + " partially unmatched rows are found, " - + " cannot find group " + c + ": " + rowText); + LOG.warn("" + partialMatchedRows + + " partially unmatched rows are found, " + " cannot find group " + + c + ": " + rowText); } - row.set(c, null); + row.set(c, null); } } return row; @@ -202,50 +203,58 @@ Object[] outputFields; Text outputRowText; - + @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { - + if (outputFormatString == null) { - throw new SerDeException("Cannot write data into table because \"output.format.string\"" - + " is not specified in serde properties of the table."); + throw new SerDeException( + "Cannot write data into table because \"output.format.string\"" + + " is not specified in serde properties of the table."); } - + // Get all the fields out. - // NOTE: The correct way to get fields out of the row is to use objInspector. - // The obj can be a Java ArrayList, or a Java class, or a byte[] or whatever. - // The only way to access the data inside the obj is through ObjectInspector. - - StructObjectInspector outputRowOI = (StructObjectInspector)objInspector; - List outputFieldRefs = outputRowOI.getAllStructFieldRefs(); + // NOTE: The correct way to get fields out of the row is to use + // objInspector. + // The obj can be a Java ArrayList, or a Java class, or a byte[] or + // whatever. + // The only way to access the data inside the obj is through + // ObjectInspector. + + StructObjectInspector outputRowOI = (StructObjectInspector) objInspector; + List outputFieldRefs = outputRowOI + .getAllStructFieldRefs(); if (outputFieldRefs.size() != numColumns) { throw new SerDeException("Cannot serialize the object because there are " - + outputFieldRefs.size() + " fields but the table has " + numColumns + - " columns."); + + outputFieldRefs.size() + " fields but the table has " + numColumns + + " columns."); } - + // Get all data out. for (int c = 0; c < numColumns; c++) { - Object field = outputRowOI.getStructFieldData(obj, outputFieldRefs.get(c)); - ObjectInspector fieldOI = outputFieldRefs.get(c).getFieldObjectInspector(); + Object field = outputRowOI + .getStructFieldData(obj, outputFieldRefs.get(c)); + ObjectInspector fieldOI = outputFieldRefs.get(c) + .getFieldObjectInspector(); // The data must be of type String - StringObjectInspector fieldStringOI = (StringObjectInspector)fieldOI; - // Convert the field to Java class String, because objects of String type can be - // stored in String, Text, or some other classes. - outputFields[c] = fieldStringOI.getPrimitiveJavaObject(field); + StringObjectInspector fieldStringOI = (StringObjectInspector) fieldOI; + // Convert the field to Java class String, because objects of String type + // can be + // stored in String, Text, or some other classes. + outputFields[c] = fieldStringOI.getPrimitiveJavaObject(field); } - + // Format the String String outputRowString = null; try { outputRowString = String.format(outputFormatString, outputFields); } catch (MissingFormatArgumentException e) { - throw new SerDeException("The table contains " + numColumns + throw new SerDeException("The table contains " + numColumns + " columns, but the outputFormatString is asking for more.", e); } outputRowText.set(outputRowString); return outputRowText; } - + } Index: contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextInputFormat.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextInputFormat.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextInputFormat.java (working copy) @@ -22,6 +22,7 @@ import java.io.UnsupportedEncodingException; import java.util.Arrays; +import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; @@ -35,29 +36,26 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.commons.codec.binary.Base64; /** * FileInputFormat for base64 encoded text files. * - * Each line is a base64-encoded record. - * The key is a LongWritable which is the offset. - * The value is a BytesWritable containing the base64-decoded bytes. + * Each line is a base64-encoded record. The key is a LongWritable which is the + * offset. The value is a BytesWritable containing the base64-decoded bytes. * * This class accepts a configurable parameter: * "base64.text.input.format.signature" * - * The UTF-8 encoded signature will be compared with the beginning - * of each decoded bytes. If they don't match, the record is discarded. - * If they match, the signature is stripped off the data. + * The UTF-8 encoded signature will be compared with the beginning of each + * decoded bytes. If they don't match, the record is discarded. If they match, + * the signature is stripped off the data. */ -public class Base64TextInputFormat - implements InputFormat, JobConfigurable { - - - public static class Base64LineRecordReader - implements RecordReader, JobConfigurable { +public class Base64TextInputFormat implements + InputFormat, JobConfigurable { + public static class Base64LineRecordReader implements + RecordReader, JobConfigurable { + LineRecordReader reader; Text text; @@ -65,7 +63,7 @@ this.reader = reader; text = reader.createValue(); } - + @Override public void close() throws IOException { reader.close(); @@ -98,7 +96,7 @@ // text -> byte[] -> value byte[] textBytes = text.getBytes(); int length = text.getLength(); - + // Trim additional bytes if (length != textBytes.length) { textBytes = Arrays.copyOf(textBytes, length); @@ -107,22 +105,24 @@ // compare data header with signature int i; - for (i = 0; i < binaryData.length && i < signature.length && - binaryData[i] == signature[i]; ++i); + for (i = 0; i < binaryData.length && i < signature.length + && binaryData[i] == signature[i]; ++i) { + ; + } // return the row only if it's not corrupted if (i == signature.length) { - value.set(binaryData, signature.length, - binaryData.length - signature.length); + value.set(binaryData, signature.length, binaryData.length + - signature.length); return true; } } // no more data return false; } - + private byte[] signature; - private Base64 base64 = new Base64(); + private final Base64 base64 = new Base64(); @Override public void configure(JobConf job) { @@ -135,26 +135,26 @@ } } catch (UnsupportedEncodingException e) { e.printStackTrace(); - } + } } } - + TextInputFormat format; JobConf job; - + public Base64TextInputFormat() { format = new TextInputFormat(); } - + @Override public void configure(JobConf job) { this.job = job; format.configure(job); } - + public RecordReader getRecordReader( - InputSplit genericSplit, JobConf job, - Reporter reporter) throws IOException { + InputSplit genericSplit, JobConf job, Reporter reporter) + throws IOException { reporter.setStatus(genericSplit.toString()); Base64LineRecordReader reader = new Base64LineRecordReader( new LineRecordReader(job, (FileSplit) genericSplit)); Index: contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java (working copy) @@ -37,69 +37,67 @@ /** * FileOutputFormat for base64 encoded text files. * - * Each line is a base64-encoded record. - * The key is a LongWritable which is the offset. - * The value is a BytesWritable containing the base64-decoded bytes. + * Each line is a base64-encoded record. The key is a LongWritable which is the + * offset. The value is a BytesWritable containing the base64-decoded bytes. * * This class accepts a configurable parameter: * "base64.text.output.format.signature" * - * The UTF-8 encoded signature will be prepended to each BytesWritable - * before we do base64 encoding. + * The UTF-8 encoded signature will be prepended to each BytesWritable before we + * do base64 encoding. */ public class Base64TextOutputFormat - extends HiveIgnoreKeyTextOutputFormat { + extends HiveIgnoreKeyTextOutputFormat { - - public static class Base64RecordWriter implements RecordWriter, - JobConfigurable{ + public static class Base64RecordWriter implements RecordWriter, + JobConfigurable { RecordWriter writer; BytesWritable bytesWritable; - + public Base64RecordWriter(RecordWriter writer) { this.writer = writer; bytesWritable = new BytesWritable(); } - + @Override public void write(Writable w) throws IOException { - + // Get input data byte[] input; int inputLength; if (w instanceof Text) { - input = ((Text)w).getBytes(); - inputLength = ((Text)w).getLength(); + input = ((Text) w).getBytes(); + inputLength = ((Text) w).getLength(); } else { - assert(w instanceof BytesWritable); - input = ((BytesWritable)w).get(); - inputLength = ((BytesWritable)w).getSize(); + assert (w instanceof BytesWritable); + input = ((BytesWritable) w).get(); + inputLength = ((BytesWritable) w).getSize(); } - + // Add signature byte[] wrapped = new byte[signature.length + inputLength]; - for (int i=0; i valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { - - Base64RecordWriter writer = new Base64RecordWriter( - super.getHiveRecordWriter(jc, finalOutPath, BytesWritable.class, + + Base64RecordWriter writer = new Base64RecordWriter(super + .getHiveRecordWriter(jc, finalOutPath, BytesWritable.class, isCompressed, tableProperties, progress)); writer.configure(jc); return writer; } - } Index: contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFDBOutput.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFDBOutput.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFDBOutput.java (working copy) @@ -21,16 +21,15 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; -import org.apache.hadoop.hive.ql.udf.UDFType; -import org.apache.hadoop.hive.ql.exec.UDF; -import org.apache.hadoop.hive.ql.exec.description; -import org.apache.hadoop.hive.ql.udf.generic.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; +import org.apache.hadoop.hive.ql.exec.description; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject; +import org.apache.hadoop.hive.ql.udf.UDFType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; @@ -39,31 +38,28 @@ import org.apache.hadoop.io.IntWritable; /** -* GenericUDFDBOutput is designed to output data directly from Hive to a JDBC datastore. -* This UDF is useful for exporting small to medium summaries that have a unique key. -* -* Due to the nature of hadoop, individual mappers, reducers or entire jobs can fail. -* If a failure occurs a mapper or reducer may be retried. This UDF has no way of -* detecting failures or rolling back a transaction. Consequently, you should only -* only use this to export to a table with a unique key. The unique key should safeguard -* against duplicate data. -* -* Use hive's ADD JAR feature to add your JDBC Driver to the distributed cache, -* otherwise GenericUDFDBoutput will fail. -*/ -@description( - name = "dboutput", - value = "_FUNC_(jdbcstring,username,password,preparedstatement,[arguments]) - sends data to a jdbc driver", - extended = - "argument 0 is the JDBC connection string\n"+ - "argument 1 is the user name\n"+ - "argument 2 is the password\n"+ - "argument 3 is an SQL query to be used in the PreparedStatement\n"+ - "argument (4-n) The remaining arguments must be primitive and are passed to the PreparedStatement object\n" -) -@UDFType(deterministic=false) + * GenericUDFDBOutput is designed to output data directly from Hive to a JDBC + * datastore. This UDF is useful for exporting small to medium summaries that + * have a unique key. + * + * Due to the nature of hadoop, individual mappers, reducers or entire jobs can + * fail. If a failure occurs a mapper or reducer may be retried. This UDF has no + * way of detecting failures or rolling back a transaction. Consequently, you + * should only only use this to export to a table with a unique key. The unique + * key should safeguard against duplicate data. + * + * Use hive's ADD JAR feature to add your JDBC Driver to the distributed cache, + * otherwise GenericUDFDBoutput will fail. + */ +@description(name = "dboutput", value = "_FUNC_(jdbcstring,username,password,preparedstatement,[arguments]) - sends data to a jdbc driver", extended = "argument 0 is the JDBC connection string\n" + + "argument 1 is the user name\n" + + "argument 2 is the password\n" + + "argument 3 is an SQL query to be used in the PreparedStatement\n" + + "argument (4-n) The remaining arguments must be primitive and are passed to the PreparedStatement object\n") +@UDFType(deterministic = false) public class GenericUDFDBOutput extends GenericUDF { - private static Log LOG = LogFactory.getLog(GenericUDFDBOutput.class.getName()); + private static Log LOG = LogFactory + .getLog(GenericUDFDBOutput.class.getName()); ObjectInspector[] argumentOI; GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver; @@ -71,63 +67,71 @@ private String url; private String user; private String pass; - private IntWritable result = new IntWritable(-1); + private final IntWritable result = new IntWritable(-1); + /** - * @param arguments - * argument 0 is the JDBC connection string - * argument 1 is the user name - * argument 2 is the password - * argument 3 is an SQL query to be used in the PreparedStatement - * argument (4-n) The remaining arguments must be primitive and are passed to the PreparedStatement object - */ + * @param arguments + * argument 0 is the JDBC connection string argument 1 is the user + * name argument 2 is the password argument 3 is an SQL query to be + * used in the PreparedStatement argument (4-n) The remaining + * arguments must be primitive and are passed to the + * PreparedStatement object + */ + @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentTypeException { - this.argumentOI = arguments; + argumentOI = arguments; - //this should be connection url,username,password,query,column1[,columnn]* - for (int i=0;i<4;i++){ - if ( arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) { - PrimitiveObjectInspector poi = ((PrimitiveObjectInspector)arguments[i]); - - if (! (poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)){ + // this should be connection url,username,password,query,column1[,columnn]* + for (int i = 0; i < 4; i++) { + if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) { + PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]); + + if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) { throw new UDFArgumentTypeException(i, - "The argument of function should be \"" + Constants.STRING_TYPE_NAME - + "\", but \"" + arguments[i].getTypeName() + "\" is found"); + "The argument of function should be \"" + + Constants.STRING_TYPE_NAME + "\", but \"" + + arguments[i].getTypeName() + "\" is found"); } } } - for (int i=4;i 0) { sb.append(children[0]); - for(int i=1; i cls = - conf.getClassByName(className).asSubclass(Writable.class); + Class cls = conf.getClassByName(className) + .asSubclass(Writable.class); writable = (Writable) ReflectionUtils.newInstance(cls, conf); } catch (ClassNotFoundException e) { throw new IOException(e); Index: contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritableOutput.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritableOutput.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesWritableOutput.java (working copy) @@ -53,13 +53,15 @@ private TypedBytesOutput out; - private TypedBytesWritableOutput() {} + private TypedBytesWritableOutput() { + } private void setTypedBytesOutput(TypedBytesOutput out) { this.out = out; } private static ThreadLocal tbOut = new ThreadLocal() { + @Override protected synchronized Object initialValue() { return new TypedBytesWritableOutput(); } @@ -69,7 +71,8 @@ * Get a thread-local typed bytes writable input for the supplied * {@link TypedBytesOutput}. * - * @param out typed bytes output object + * @param out + * typed bytes output object * @return typed bytes writable output corresponding to the supplied * {@link TypedBytesOutput}. */ @@ -83,7 +86,8 @@ * Get a thread-local typed bytes writable output for the supplied * {@link DataOutput}. * - * @param out data output object + * @param out + * data output object * @return typed bytes writable output corresponding to the supplied * {@link DataOutput}. */ @@ -126,7 +130,7 @@ } else if (w instanceof Text) { writeText((Text) w); } else if (w instanceof ShortWritable) { - writeShort((ShortWritable)w); + writeShort((ShortWritable) w); } else if (w instanceof ArrayWritable) { writeArray((ArrayWritable) w); } else if (w instanceof MapWritable) { Index: contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordWriter.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordWriter.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordWriter.java (working copy) @@ -22,20 +22,21 @@ import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.RecordWriter; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.hive.ql.exec.RecordWriter; public class TypedBytesRecordWriter implements RecordWriter { private OutputStream out; - public void initialize(OutputStream out, Configuration conf) throws IOException { + public void initialize(OutputStream out, Configuration conf) + throws IOException { this.out = out; } public void write(Writable row) throws IOException { - BytesWritable brow = (BytesWritable)row; + BytesWritable brow = (BytesWritable) row; out.write(brow.get(), 0, brow.getSize()); } Index: contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordInput.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordInput.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordInput.java (working copy) @@ -32,13 +32,15 @@ private TypedBytesInput in; - private TypedBytesRecordInput() {} + private TypedBytesRecordInput() { + } private void setTypedBytesInput(TypedBytesInput in) { this.in = in; } private static ThreadLocal tbIn = new ThreadLocal() { + @Override protected synchronized Object initialValue() { return new TypedBytesRecordInput(); } @@ -48,7 +50,8 @@ * Get a thread-local typed bytes record input for the supplied * {@link TypedBytesInput}. * - * @param in typed bytes input object + * @param in + * typed bytes input object * @return typed bytes record input corresponding to the supplied * {@link TypedBytesInput}. */ @@ -62,7 +65,8 @@ * Get a thread-local typed bytes record input for the supplied * {@link DataInput}. * - * @param in data input object + * @param in + * data input object * @return typed bytes record input corresponding to the supplied * {@link DataInput}. */ @@ -134,13 +138,16 @@ return new TypedBytesIndex(in.readMapHeader()); } - public void endRecord(String tag) throws IOException {} + public void endRecord(String tag) throws IOException { + } - public void endVector(String tag) throws IOException {} + public void endVector(String tag) throws IOException { + } - public void endMap(String tag) throws IOException {} + public void endMap(String tag) throws IOException { + } - private static final class TypedBytesIndex implements Index { + private static final class TypedBytesIndex implements Index { private int nelems; private TypedBytesIndex(int nelems) { Index: contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesInput.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesInput.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesInput.java (working copy) @@ -35,13 +35,15 @@ private DataInput in; - private TypedBytesInput() {} + private TypedBytesInput() { + } private void setDataInput(DataInput in) { this.in = in; } private static ThreadLocal tbIn = new ThreadLocal() { + @Override protected synchronized Object initialValue() { return new TypedBytesInput(); } @@ -49,7 +51,9 @@ /** * Get a thread-local typed bytes input for the supplied {@link DataInput}. - * @param in data input object + * + * @param in + * data input object * @return typed bytes input corresponding to the supplied {@link DataInput}. */ public static TypedBytesInput get(DataInput in) { @@ -64,9 +68,10 @@ } /** - * Reads a typed bytes sequence and converts it to a Java object. The first - * byte is interpreted as a type code, and then the right number of - * subsequent bytes are read depending on the obtained type. + * Reads a typed bytes sequence and converts it to a Java object. The first + * byte is interpreted as a type code, and then the right number of subsequent + * bytes are read depending on the obtained type. + * * @return the obtained object or null when the end of the file is reached * @throws IOException */ @@ -159,6 +164,7 @@ /** * Reads a type byte and returns the corresponding {@link Type}. + * * @return the obtained Type or null when the end of the file is reached * @throws IOException */ @@ -179,6 +185,7 @@ /** * Skips a type byte. + * * @return true iff the end of the file was not reached * @throws IOException */ @@ -193,6 +200,7 @@ /** * Reads the bytes following a Type.BYTES code. + * * @return the obtained bytes sequence * @throws IOException */ @@ -205,6 +213,7 @@ /** * Reads the raw bytes following a Type.BYTES code. + * * @return the obtained bytes sequence * @throws IOException */ @@ -222,6 +231,7 @@ /** * Reads the byte following a Type.BYTE code. + * * @return the obtained byte * @throws IOException */ @@ -231,6 +241,7 @@ /** * Reads the raw byte following a Type.BYTE code. + * * @return the obtained byte * @throws IOException */ @@ -243,6 +254,7 @@ /** * Reads the boolean following a Type.BOOL code. + * * @return the obtained boolean * @throws IOException */ @@ -252,6 +264,7 @@ /** * Reads the raw bytes following a Type.BOOL code. + * * @return the obtained bytes sequence * @throws IOException */ @@ -264,6 +277,7 @@ /** * Reads the integer following a Type.INT code. + * * @return the obtained integer * @throws IOException */ @@ -273,6 +287,7 @@ /** * Reads the short following a Type.SHORT code. + * * @return the obtained short * @throws IOException */ @@ -282,6 +297,7 @@ /** * Reads the raw bytes following a Type.INT code. + * * @return the obtained bytes sequence * @throws IOException */ @@ -294,6 +310,7 @@ /** * Reads the long following a Type.LONG code. + * * @return the obtained long * @throws IOException */ @@ -303,6 +320,7 @@ /** * Reads the raw bytes following a Type.LONG code. + * * @return the obtained bytes sequence * @throws IOException */ @@ -315,6 +333,7 @@ /** * Reads the float following a Type.FLOAT code. + * * @return the obtained float * @throws IOException */ @@ -324,6 +343,7 @@ /** * Reads the raw bytes following a Type.FLOAT code. + * * @return the obtained bytes sequence * @throws IOException */ @@ -336,6 +356,7 @@ /** * Reads the double following a Type.DOUBLE code. + * * @return the obtained double * @throws IOException */ @@ -345,6 +366,7 @@ /** * Reads the raw bytes following a Type.DOUBLE code. + * * @return the obtained bytes sequence * @throws IOException */ @@ -357,6 +379,7 @@ /** * Reads the string following a Type.STRING code. + * * @return the obtained string * @throws IOException */ @@ -366,6 +389,7 @@ /** * Reads the raw bytes following a Type.STRING code. + * * @return the obtained bytes sequence * @throws IOException */ @@ -383,6 +407,7 @@ /** * Reads the vector following a Type.VECTOR code. + * * @return the obtained vector * @throws IOException */ @@ -398,17 +423,16 @@ /** * Reads the raw bytes following a Type.VECTOR code. + * * @return the obtained bytes sequence * @throws IOException */ public byte[] readRawVector() throws IOException { Buffer buffer = new Buffer(); int length = readVectorHeader(); - buffer.append(new byte[] { - (byte) Type.VECTOR.code, - (byte) (0xff & (length >> 24)), (byte) (0xff & (length >> 16)), - (byte) (0xff & (length >> 8)), (byte) (0xff & length) - }); + buffer.append(new byte[] { (byte) Type.VECTOR.code, + (byte) (0xff & (length >> 24)), (byte) (0xff & (length >> 16)), + (byte) (0xff & (length >> 8)), (byte) (0xff & length) }); for (int i = 0; i < length; i++) { buffer.append(readRaw()); } @@ -417,6 +441,7 @@ /** * Reads the header following a Type.VECTOR code. + * * @return the number of elements in the vector * @throws IOException */ @@ -426,6 +451,7 @@ /** * Reads the list following a Type.LIST code. + * * @return the obtained list * @throws IOException */ @@ -442,6 +468,7 @@ /** * Reads the raw bytes following a Type.LIST code. + * * @return the obtained bytes sequence * @throws IOException */ @@ -458,6 +485,7 @@ /** * Reads the map following a Type.MAP code. + * * @return the obtained map * @throws IOException */ @@ -475,17 +503,16 @@ /** * Reads the raw bytes following a Type.MAP code. + * * @return the obtained bytes sequence * @throws IOException */ public byte[] readRawMap() throws IOException { Buffer buffer = new Buffer(); int length = readMapHeader(); - buffer.append(new byte[] { - (byte) Type.MAP.code, - (byte) (0xff & (length >> 24)), (byte) (0xff & (length >> 16)), - (byte) (0xff & (length >> 8)), (byte) (0xff & length) - }); + buffer.append(new byte[] { (byte) Type.MAP.code, + (byte) (0xff & (length >> 24)), (byte) (0xff & (length >> 16)), + (byte) (0xff & (length >> 8)), (byte) (0xff & length) }); for (int i = 0; i < length; i++) { buffer.append(readRaw()); buffer.append(readRaw()); @@ -495,6 +522,7 @@ /** * Reads the header following a Type.MAP code. + * * @return the number of key-value pairs in the map * @throws IOException */ Index: contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordOutput.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordOutput.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordOutput.java (working copy) @@ -34,13 +34,15 @@ private TypedBytesOutput out; - private TypedBytesRecordOutput() {} + private TypedBytesRecordOutput() { + } private void setTypedBytesOutput(TypedBytesOutput out) { this.out = out; } private static ThreadLocal tbOut = new ThreadLocal() { + @Override protected synchronized Object initialValue() { return new TypedBytesRecordOutput(); } @@ -50,7 +52,8 @@ * Get a thread-local typed bytes record input for the supplied * {@link TypedBytesOutput}. * - * @param out typed bytes output object + * @param out + * typed bytes output object * @return typed bytes record output corresponding to the supplied * {@link TypedBytesOutput}. */ @@ -64,7 +67,8 @@ * Get a thread-local typed bytes record output for the supplied * {@link DataOutput}. * - * @param out data output object + * @param out + * data output object * @return typed bytes record output corresponding to the supplied * {@link DataOutput}. */ @@ -130,8 +134,10 @@ out.writeListFooter(); } - public void endVector(ArrayList v, String tag) throws IOException {} + public void endVector(ArrayList v, String tag) throws IOException { + } - public void endMap(TreeMap m, String tag) throws IOException {} + public void endMap(TreeMap m, String tag) throws IOException { + } } Index: contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesOutput.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesOutput.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesOutput.java (working copy) @@ -36,13 +36,15 @@ private DataOutput out; - private TypedBytesOutput() {} + private TypedBytesOutput() { + } private void setDataOutput(DataOutput out) { this.out = out; } private static ThreadLocal tbOut = new ThreadLocal() { + @Override protected synchronized Object initialValue() { return new TypedBytesOutput(); } @@ -51,9 +53,10 @@ /** * Get a thread-local typed bytes output for the supplied {@link DataOutput}. * - * @param out data output object - * @return typed bytes output corresponding to the supplied - * {@link DataOutput}. + * @param out + * data output object + * @return typed bytes output corresponding to the supplied {@link DataOutput} + * . */ public static TypedBytesOutput get(DataOutput out) { TypedBytesOutput bout = (TypedBytesOutput) tbOut.get(); @@ -65,11 +68,12 @@ public TypedBytesOutput(DataOutput out) { this.out = out; } - + /** * Writes a Java object as a typed bytes sequence. * - * @param obj the object to be written + * @param obj + * the object to be written * @throws IOException */ public void write(Object obj) throws IOException { @@ -103,7 +107,8 @@ /** * Writes a raw sequence of typed bytes. * - * @param bytes the bytes to be written + * @param bytes + * the bytes to be written * @throws IOException */ public void writeRaw(byte[] bytes) throws IOException { @@ -113,21 +118,25 @@ /** * Writes a raw sequence of typed bytes. * - * @param bytes the bytes to be written - * @param offset an offset in the given array - * @param length number of bytes from the given array to write + * @param bytes + * the bytes to be written + * @param offset + * an offset in the given array + * @param length + * number of bytes from the given array to write * @throws IOException */ - public void writeRaw(byte[] bytes, int offset, int length) - throws IOException { + public void writeRaw(byte[] bytes, int offset, int length) throws IOException { out.write(bytes, offset, length); } /** * Writes a bytes array as a typed bytes sequence, using a given typecode. * - * @param bytes the bytes array to be written - * @param code the typecode to use + * @param bytes + * the bytes array to be written + * @param code + * the typecode to use * @throws IOException */ public void writeBytes(byte[] bytes, int code) throws IOException { @@ -135,11 +144,12 @@ out.writeInt(bytes.length); out.write(bytes); } - + /** * Writes a bytes array as a typed bytes sequence. * - * @param bytes the bytes array to be written + * @param bytes + * the bytes array to be written * @throws IOException */ public void writeBytes(byte[] bytes) throws IOException { @@ -149,7 +159,8 @@ /** * Writes a byte as a typed bytes sequence. * - * @param b the byte to be written + * @param b + * the byte to be written * @throws IOException */ public void writeByte(byte b) throws IOException { @@ -160,7 +171,8 @@ /** * Writes a boolean as a typed bytes sequence. * - * @param b the boolean to be written + * @param b + * the boolean to be written * @throws IOException */ public void writeBool(boolean b) throws IOException { @@ -171,7 +183,8 @@ /** * Writes an integer as a typed bytes sequence. * - * @param i the integer to be written + * @param i + * the integer to be written * @throws IOException */ public void writeInt(int i) throws IOException { @@ -182,7 +195,8 @@ /** * Writes a long as a typed bytes sequence. * - * @param l the long to be written + * @param l + * the long to be written * @throws IOException */ public void writeLong(long l) throws IOException { @@ -193,7 +207,8 @@ /** * Writes a float as a typed bytes sequence. * - * @param f the float to be written + * @param f + * the float to be written * @throws IOException */ public void writeFloat(float f) throws IOException { @@ -204,18 +219,20 @@ /** * Writes a double as a typed bytes sequence. * - * @param d the double to be written + * @param d + * the double to be written * @throws IOException */ public void writeDouble(double d) throws IOException { out.write(Type.DOUBLE.code); out.writeDouble(d); } - + /** * Writes a short as a typed bytes sequence. * - * @param s the short to be written + * @param s + * the short to be written * @throws IOException */ public void writeShort(short s) throws IOException { @@ -226,7 +243,8 @@ /** * Writes a string as a typed bytes sequence. * - * @param s the string to be written + * @param s + * the string to be written * @throws IOException */ public void writeString(String s) throws IOException { @@ -237,7 +255,8 @@ /** * Writes a vector as a typed bytes sequence. * - * @param vector the vector to be written + * @param vector + * the vector to be written * @throws IOException */ public void writeVector(ArrayList vector) throws IOException { @@ -250,7 +269,8 @@ /** * Writes a vector header. * - * @param length the number of elements in the vector + * @param length + * the number of elements in the vector * @throws IOException */ public void writeVectorHeader(int length) throws IOException { @@ -261,7 +281,8 @@ /** * Writes a list as a typed bytes sequence. * - * @param list the list to be written + * @param list + * the list to be written * @throws IOException */ public void writeList(List list) throws IOException { @@ -293,7 +314,8 @@ /** * Writes a map as a typed bytes sequence. * - * @param map the map to be written + * @param map + * the map to be written * @throws IOException */ @SuppressWarnings("unchecked") @@ -309,14 +331,15 @@ /** * Writes a map header. * - * @param length the number of key-value pairs in the map + * @param length + * the number of key-value pairs in the map * @throws IOException */ public void writeMapHeader(int length) throws IOException { out.write(Type.MAP.code); out.writeInt(length); } - + public void writeEndOfRecord() throws IOException { out.write(Type.ENDOFRECORD.code); } Index: contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordReader.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordReader.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/util/typedbytes/TypedBytesRecordReader.java (working copy) @@ -18,20 +18,29 @@ package org.apache.hadoop.hive.contrib.util.typedbytes; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.DataInputStream; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; -import java.util.Arrays; +import java.util.Map; import java.util.Properties; -import java.util.Map; -import java.util.HashMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.RecordReader; +import org.apache.hadoop.hive.ql.io.NonSyncDataOutputBuffer; +import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.FloatWritable; @@ -39,15 +48,6 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.hive.ql.io.NonSyncDataOutputBuffer; -import org.apache.hadoop.hive.ql.exec.RecordReader; -import org.apache.hadoop.hive.serde.Constants; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; public class TypedBytesRecordReader implements RecordReader { @@ -58,12 +58,12 @@ TypedBytesWritableOutput tbOut; ArrayList row = new ArrayList(0); - ArrayList rowTypeName = new ArrayList(0); - List columnTypes; + ArrayList rowTypeName = new ArrayList(0); + List columnTypes; ArrayList srcOIns = new ArrayList(); ArrayList dstOIns = new ArrayList(); - ArrayList converters = new ArrayList(); + ArrayList converters = new ArrayList(); static private Map typedBytesToTypeName = new HashMap(); static { @@ -77,15 +77,18 @@ typedBytesToTypeName.put(getType(11), Constants.SMALLINT_TYPE_NAME); } - public void initialize(InputStream in, Configuration conf, Properties tbl) throws IOException { + public void initialize(InputStream in, Configuration conf, Properties tbl) + throws IOException { din = new DataInputStream(in); tbIn = new TypedBytesWritableInput(din); tbOut = new TypedBytesWritableOutput(barrStr); String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); columnTypes = Arrays.asList(columnTypeProperty.split(",")); - for (String columnType:columnTypes) { - PrimitiveTypeEntry dstTypeEntry = PrimitiveObjectInspectorUtils.getTypeEntryFromTypeName(columnType); - dstOIns.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(dstTypeEntry.primitiveCategory)); + for (String columnType : columnTypes) { + PrimitiveTypeEntry dstTypeEntry = PrimitiveObjectInspectorUtils + .getTypeEntryFromTypeName(columnType); + dstOIns.add(PrimitiveObjectInspectorFactory + .getPrimitiveWritableObjectInspector(dstTypeEntry.primitiveCategory)); } } @@ -112,8 +115,8 @@ return new DoubleWritable(); case STRING: return new Text(); - default: - assert false; // not supported + default: + assert false; // not supported } return null; } @@ -126,13 +129,15 @@ Type type = tbIn.readTypeCode(); // it was a empty stream - if (type == null) + if (type == null) { return -1; + } if (type == Type.ENDOFRECORD) { tbOut.writeEndOfRecord(); - if (barrStr.getLength() > 0) - ((BytesWritable)data).set(barrStr.getData(), 0, barrStr.getLength()); + if (barrStr.getLength() > 0) { + ((BytesWritable) data).set(barrStr.getData(), 0, barrStr.getLength()); + } return barrStr.getLength(); } @@ -143,53 +148,57 @@ row.add(wrt); rowTypeName.add(type.name()); String typeName = typedBytesToTypeName.get(type); - PrimitiveTypeEntry srcTypeEntry = PrimitiveObjectInspectorUtils.getTypeEntryFromTypeName(typeName); - srcOIns.add(PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(srcTypeEntry.primitiveCategory)); - converters.add(ObjectInspectorConverters.getConverter(srcOIns.get(pos), dstOIns.get(pos))); + PrimitiveTypeEntry srcTypeEntry = PrimitiveObjectInspectorUtils + .getTypeEntryFromTypeName(typeName); + srcOIns + .add(PrimitiveObjectInspectorFactory + .getPrimitiveWritableObjectInspector(srcTypeEntry.primitiveCategory)); + converters.add(ObjectInspectorConverters.getConverter(srcOIns.get(pos), + dstOIns.get(pos))); + } else { + if (!rowTypeName.get(pos).equals(type.name())) { + throw new RuntimeException("datatype of row changed from " + + rowTypeName.get(pos) + " to " + type.name()); + } } - else { - if (!rowTypeName.get(pos).equals(type.name())) - throw new RuntimeException("datatype of row changed from " + - rowTypeName.get(pos) + " to " + type.name()); - } - Writable w = row.get(pos); + Writable w = row.get(pos); switch (type) { - case BYTE: { - tbIn.readByte((ByteWritable)w); - break; - } - case BOOL: { - tbIn.readBoolean((BooleanWritable)w); - break; - } - case INT: { - tbIn.readInt((IntWritable)w); - break; - } - case SHORT: { - tbIn.readShort((ShortWritable)w); - break; - } - case LONG: { - tbIn.readLong((LongWritable)w); - break; - } - case FLOAT: { - tbIn.readFloat((FloatWritable)w); - break; - } - case DOUBLE: { - tbIn.readDouble((DoubleWritable)w); - break; - } - case STRING: { - tbIn.readText((Text)w); - break; - } - default: - assert false; // should never come here + case BYTE: { + tbIn.readByte((ByteWritable) w); + break; } + case BOOL: { + tbIn.readBoolean((BooleanWritable) w); + break; + } + case INT: { + tbIn.readInt((IntWritable) w); + break; + } + case SHORT: { + tbIn.readShort((ShortWritable) w); + break; + } + case LONG: { + tbIn.readLong((LongWritable) w); + break; + } + case FLOAT: { + tbIn.readFloat((FloatWritable) w); + break; + } + case DOUBLE: { + tbIn.readDouble((DoubleWritable) w); + break; + } + case STRING: { + tbIn.readText((Text) w); + break; + } + default: + assert false; // should never come here + } write(pos, w); pos++; @@ -199,31 +208,33 @@ private void write(int pos, Writable inpw) throws IOException { String typ = columnTypes.get(pos); - Writable w = (Writable)converters.get(pos).convert(inpw); + Writable w = (Writable) converters.get(pos).convert(inpw); - if (typ.equalsIgnoreCase(Constants.BOOLEAN_TYPE_NAME)) - tbOut.writeBoolean((BooleanWritable)w); - else if (typ.equalsIgnoreCase(Constants.TINYINT_TYPE_NAME)) - tbOut.writeByte((ByteWritable)w); - else if (typ.equalsIgnoreCase(Constants.SMALLINT_TYPE_NAME)) - tbOut.writeShort((ShortWritable)w); - else if (typ.equalsIgnoreCase(Constants.INT_TYPE_NAME)) - tbOut.writeInt((IntWritable)w); - else if (typ.equalsIgnoreCase(Constants.BIGINT_TYPE_NAME)) - tbOut.writeLong((LongWritable)w); - else if (typ.equalsIgnoreCase(Constants.FLOAT_TYPE_NAME)) - tbOut.writeFloat((FloatWritable)w); - else if (typ.equalsIgnoreCase(Constants.DOUBLE_TYPE_NAME)) - tbOut.writeDouble((DoubleWritable)w); - else if (typ.equalsIgnoreCase(Constants.STRING_TYPE_NAME)) - tbOut.writeText((Text)w); - else + if (typ.equalsIgnoreCase(Constants.BOOLEAN_TYPE_NAME)) { + tbOut.writeBoolean((BooleanWritable) w); + } else if (typ.equalsIgnoreCase(Constants.TINYINT_TYPE_NAME)) { + tbOut.writeByte((ByteWritable) w); + } else if (typ.equalsIgnoreCase(Constants.SMALLINT_TYPE_NAME)) { + tbOut.writeShort((ShortWritable) w); + } else if (typ.equalsIgnoreCase(Constants.INT_TYPE_NAME)) { + tbOut.writeInt((IntWritable) w); + } else if (typ.equalsIgnoreCase(Constants.BIGINT_TYPE_NAME)) { + tbOut.writeLong((LongWritable) w); + } else if (typ.equalsIgnoreCase(Constants.FLOAT_TYPE_NAME)) { + tbOut.writeFloat((FloatWritable) w); + } else if (typ.equalsIgnoreCase(Constants.DOUBLE_TYPE_NAME)) { + tbOut.writeDouble((DoubleWritable) w); + } else if (typ.equalsIgnoreCase(Constants.STRING_TYPE_NAME)) { + tbOut.writeText((Text) w); + } else { assert false; + } } public void close() throws IOException { - if (din != null) + if (din != null) { din.close(); + } } static public Type getType(int code) { Index: contrib/src/java/org/apache/hadoop/hive/contrib/udf/example/UDFExampleMapConcat.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/udf/example/UDFExampleMapConcat.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/udf/example/UDFExampleMapConcat.java (working copy) @@ -30,16 +30,16 @@ return null; } ArrayList r = new ArrayList(a.size()); - for (Map.Entry entry: a.entrySet()) { + for (Map.Entry entry : a.entrySet()) { r.add("(" + entry.getKey() + ":" + entry.getValue() + ")"); } Collections.sort(r); - + StringBuilder sb = new StringBuilder(); for (int i = 0; i < r.size(); i++) { sb.append(r.get(i)); } return sb.toString(); } - + } Index: contrib/src/java/org/apache/hadoop/hive/contrib/udf/example/UDFExampleAdd.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/udf/example/UDFExampleAdd.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/udf/example/UDFExampleAdd.java (working copy) @@ -23,18 +23,22 @@ public Integer evaluate(Integer... a) { int total = 0; - for (int i=0; i s = (List)a; - + List s = (List) a; + StringBuilder sb = new StringBuilder(); - for (int i=0; i records, Output output) throws Exception; + void reduce(String key, Iterator records, Output output) + throws Exception; } Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/example/WordCountReduce.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/mr/example/WordCountReduce.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/example/WordCountReduce.java (working copy) @@ -24,19 +24,20 @@ import org.apache.hadoop.hive.contrib.mr.Reducer; /** - * Example Reducer (WordCount). + * Example Reducer (WordCount). */ public final class WordCountReduce { public static void main(final String[] args) throws Exception { new GenericMR().reduce(System.in, System.out, new Reducer() { - public void reduce(String key, Iterator records, Output output) throws Exception { + public void reduce(String key, Iterator records, Output output) + throws Exception { int count = 0; - + while (records.hasNext()) { // note we use col[1] -- the key is provided again as col[0] count += Integer.parseInt(records.next()[1]); } - + output.collect(new String[] { key, String.valueOf(count) }); } }); Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/example/IdentityMapper.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/mr/example/IdentityMapper.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/example/IdentityMapper.java (working copy) @@ -22,13 +22,14 @@ import org.apache.hadoop.hive.contrib.mr.Output; /** - * Example Mapper (Identity). + * Example Mapper (Identity). */ public final class IdentityMapper { public static void main(final String[] args) throws Exception { new GenericMR().map(System.in, System.out, new Mapper() { @Override - public void map(final String[] record, final Output output) throws Exception { + public void map(final String[] record, final Output output) + throws Exception { output.collect(record); } }); Index: contrib/src/java/org/apache/hadoop/hive/contrib/mr/GenericMR.java =================================================================== --- contrib/src/java/org/apache/hadoop/hive/contrib/mr/GenericMR.java (revision 901519) +++ contrib/src/java/org/apache/hadoop/hive/contrib/mr/GenericMR.java (working copy) @@ -40,45 +40,50 @@ * * As an example, here's the wordcount reduce: * - * new GenericMR().reduce(System.in, System.out, new Reducer() { - * public void reduce(String key, Iterator records, Output output) throws Exception { - * int count = 0; + * new GenericMR().reduce(System.in, System.out, new Reducer() { public void + * reduce(String key, Iterator records, Output output) throws + * Exception { int count = 0; * - * while (records.hasNext()) { - * count += Integer.parseInt(records.next()[1]); - * } + * while (records.hasNext()) { count += Integer.parseInt(records.next()[1]); } * - * output.collect(new String[] { key, String.valueOf(count) }); - * }}); + * output.collect(new String[] { key, String.valueOf(count) }); }}); */ public final class GenericMR { - public void map(final InputStream in, final OutputStream out, final Mapper mapper) throws Exception { + public void map(final InputStream in, final OutputStream out, + final Mapper mapper) throws Exception { map(new InputStreamReader(in), new OutputStreamWriter(out), mapper); } - public void map(final Reader in, final Writer out, final Mapper mapper) throws Exception { + public void map(final Reader in, final Writer out, final Mapper mapper) + throws Exception { handle(in, out, new RecordProcessor() { @Override - public void processNext(RecordReader reader, Output output) throws Exception { + public void processNext(RecordReader reader, Output output) + throws Exception { mapper.map(reader.next(), output); } }); } - public void reduce(final InputStream in, final OutputStream out, final Reducer reducer) throws Exception { + public void reduce(final InputStream in, final OutputStream out, + final Reducer reducer) throws Exception { reduce(new InputStreamReader(in), new OutputStreamWriter(out), reducer); } - public void reduce(final Reader in, final Writer out, final Reducer reducer) throws Exception { + public void reduce(final Reader in, final Writer out, final Reducer reducer) + throws Exception { handle(in, out, new RecordProcessor() { @Override - public void processNext(RecordReader reader, Output output) throws Exception { - reducer.reduce(reader.peek()[0], new KeyRecordIterator(reader.peek()[0], reader), output); + public void processNext(RecordReader reader, Output output) + throws Exception { + reducer.reduce(reader.peek()[0], new KeyRecordIterator( + reader.peek()[0], reader), output); } }); } - private void handle(final Reader in, final Writer out, final RecordProcessor processor) throws Exception { + private void handle(final Reader in, final Writer out, + final RecordProcessor processor) throws Exception { final RecordReader reader = new RecordReader(in); final OutputStreamOutput output = new OutputStreamOutput(out); @@ -96,7 +101,8 @@ } private static interface RecordProcessor { - void processNext(final RecordReader reader, final Output output) throws Exception; + void processNext(final RecordReader reader, final Output output) + throws Exception; } private static final class KeyRecordIterator implements Iterator { @@ -110,7 +116,7 @@ @Override public boolean hasNext() { - return (this.reader.hasNext() && this.key.equals(this.reader.peek()[0])); + return (reader.hasNext() && key.equals(reader.peek()[0])); } @Override @@ -119,7 +125,7 @@ throw new NoSuchElementException(); } - return this.reader.next(); + return reader.next(); } @Override @@ -137,21 +143,21 @@ } private RecordReader(final Reader in) { - this.reader = new BufferedReader(in); - this.next = readNext(); + reader = new BufferedReader(in); + next = readNext(); } private String[] next() { final String[] ret = next; - this.next = readNext(); + next = readNext(); return ret; } private String[] readNext() { try { - final String line = this.reader.readLine(); + final String line = reader.readLine(); return (line == null ? null : line.split("\t")); } catch (final Exception e) { throw new RuntimeException(e); @@ -167,7 +173,7 @@ } private void close() throws Exception { - this.reader.close(); + reader.close(); } } @@ -190,13 +196,13 @@ public void collect(String[] record) throws Exception { out.println(_join(record, "\t")); } - + private static String _join(final String[] record, final String separator) { if (record == null || record.length == 0) { return ""; } final StringBuilder sb = new StringBuilder(); - for (int i=0; i< record.length; i++) { + for (int i = 0; i < record.length; i++) { if (i > 0) { sb.append(separator); }