Index: src/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java =================================================================== --- src/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java (revision 0) +++ src/contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java (revision 22627) @@ -0,0 +1,294 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.serde2; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +public class MultiDelimitSerDe implements SerDe { + + public static final Log LOG = LogFactory.getLog(MultiDelimitSerDe.class.getName()); + public static final byte[] DefaultSeparators = {(byte) 1, (byte) 2, (byte) 3}; + + int numColumns; + String fieldDelimited; + // we don't support using multiple chars as delimiters within complex types + // collection separator + byte collSep; + // map key separator + byte keySep; + + // The object for storing row data + LazyStruct cachedLazyStruct; + //the lazy struct object inspector + ObjectInspector cachedObjectInspector; + + // The wrapper for byte array + ByteArrayRef byteArrayRef; + + LazySimpleSerDe.SerDeParameters serdeParams = null; + //the output stream of serialized objects + ByteStream.Output serializeStream = new ByteStream.Output(); + //the Writable to return in serialize + Text serializeCache = new Text(); + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + + //get the SerDe parameters + serdeParams=LazySimpleSerDe.initSerdeParams(conf,tbl,getClass().getName()); + + fieldDelimited = tbl.getProperty("field.delimited"); + + //get the collection separator and map key separator + collSep=LazySimpleSerDe.getByte(tbl.getProperty("collection.delimited"), DefaultSeparators[1]); + keySep=LazySimpleSerDe.getByte(tbl.getProperty("mapkey.delimited"), DefaultSeparators[2]); + serdeParams.getSeparators()[1]=collSep; + serdeParams.getSeparators()[2]=keySep; + + // Create the ObjectInspectors for the fields + cachedObjectInspector = LazyFactory.createLazyStructInspector(serdeParams + .getColumnNames(), serdeParams.getColumnTypes(), serdeParams + .getSeparators(), serdeParams.getNullSequence(), serdeParams + .isLastColumnTakesRest(), serdeParams.isEscaped(), serdeParams + .getEscapeChar()); + + cachedLazyStruct = (LazyStruct) LazyFactory.createLazyObject(cachedObjectInspector); + + assert serdeParams.getColumnNames().size()==serdeParams.getColumnTypes().size(); + numColumns = serdeParams.getColumnNames().size(); + } + + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return cachedObjectInspector; + } + + @Override + public Class getSerializedClass() { + return Text.class; + } + + long getNextNumberToDisplay(long now) { + return now * 10; + } + + @Override + public Object deserialize(Writable blob) throws SerDeException { + + if (fieldDelimited == null || fieldDelimited.equalsIgnoreCase("")) { + throw new SerDeException("This table does not have serde property \"field.delimited\"!"); + } + + if (byteArrayRef == null) { + byteArrayRef = new ByteArrayRef(); + } + + //we use the default field delimiter('\1') to replace the multiple-char field delimiter + //but we cannot use it to parse the row since column data can contain '\1' as well + String rowStr; + if(blob instanceof BytesWritable){ + BytesWritable b = (BytesWritable) blob; + rowStr=new String(b.getBytes(),0,b.getLength()); + } + else if(blob instanceof Text){ + Text rowText = (Text) blob; + rowStr=rowText.toString(); + }else{ + throw new SerDeException(getClass().toString()+ ": expects either BytesWritable or Text object!"); + } + byteArrayRef.setData(rowStr.replaceAll(Pattern.quote(this.fieldDelimited), "\1").getBytes()); + cachedLazyStruct.init(byteArrayRef, 0, rowStr.replaceAll(Pattern.quote(this.fieldDelimited), "\1").length()); + //use the multi-char delimiter to parse the lazy struct + cachedLazyStruct.parseMultiDelimit(rowStr, fieldDelimited); + return cachedLazyStruct; + } + + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + if (fieldDelimited == null || fieldDelimited.equalsIgnoreCase("")) { + throw new SerDeException("This table does not have serde property \"field.delimited\"!"); + } + + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + if (fields.size() != numColumns) { + throw new SerDeException("Cannot serialize the object because there are " + + fields.size() + " fields but the table has " + numColumns + + " columns."); + } + + serializeStream.reset(); + // Get all data out. + for (int c = 0; c < numColumns; c++) { + //write the delimiter to the stream, which means we don't need output.format.string anymore + if (c > 0) { + serializeStream.write(fieldDelimited.getBytes(), 0, fieldDelimited.length()); + } + + Object field = (list == null ? null : list.get(c)); + ObjectInspector fieldOI = fields.get(c).getFieldObjectInspector(); + + try { + serializeNoEncode(serializeStream, field, fieldOI, serdeParams.getSeparators(), 1, + serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar(), + serdeParams.getNeedsEscape()); + } catch (IOException e) { + throw new SerDeException(e); + } + } + + serializeCache.set(serializeStream.getData(), 0, serializeStream.getCount()); + return serializeCache; + } + + //this is basically the same as LazySimpleSerDe.serialize. except that we don't use Base64 to encode binary data + //because we're using printable string as delimiter. consider such a row "strAQ==\1", str is a string, AQ== is the + //delimiter and \1 is the binary data. + public static void serializeNoEncode(ByteStream.Output out, Object obj, + ObjectInspector objInspector, byte[] separators, int level, + Text nullSequence, boolean escaped, byte escapeChar, boolean[] needsEscape) + throws IOException, SerDeException { + + if (obj == null) { + out.write(nullSequence.getBytes(), 0, nullSequence.getLength()); + return; + } + + char separator; + List list; + switch (objInspector.getCategory()) { + case PRIMITIVE: + PrimitiveObjectInspector oi=(PrimitiveObjectInspector) objInspector; + if(oi.getPrimitiveCategory()==PrimitiveCategory.BINARY){ + BytesWritable bw = ((BinaryObjectInspector) oi).getPrimitiveWritableObject(obj); + byte[] toWrite = new byte[bw.getLength()]; + System.arraycopy(bw.getBytes(), 0,toWrite, 0, bw.getLength()); + out.write(toWrite, 0, toWrite.length); + } + else { + LazyUtils.writePrimitiveUTF8(out, obj,oi, escaped, escapeChar,needsEscape); + } + return; + case LIST: + separator = (char) separators[level]; + ListObjectInspector loi = (ListObjectInspector) objInspector; + list = loi.getList(obj); + ObjectInspector eoi = loi.getListElementObjectInspector(); + if (list == null) { + out.write(nullSequence.getBytes(), 0, nullSequence.getLength()); + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + out.write(separator); + } + serializeNoEncode(out, list.get(i), eoi, separators, level + 1, nullSequence, + escaped, escapeChar, needsEscape); + } + } + return; + case MAP: + separator = (char) separators[level]; + char keyValueSeparator =(char) separators[level+1]; + + MapObjectInspector moi = (MapObjectInspector) objInspector; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + Map map = moi.getMap(obj); + if (map == null) { + out.write(nullSequence.getBytes(), 0, nullSequence.getLength()); + } else { + boolean first = true; + for (Map.Entry entry : map.entrySet()) { + if (first) { + first = false; + } else { + out.write(separator); + } + serializeNoEncode(out, entry.getKey(), koi, separators, level + 2, + nullSequence, escaped, escapeChar, needsEscape); + out.write(keyValueSeparator); + serializeNoEncode(out, entry.getValue(), voi, separators, level + 2, + nullSequence, escaped, escapeChar, needsEscape); + } + } + return; + case STRUCT: + separator = (char) separators[level]; + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + list = soi.getStructFieldsDataAsList(obj); + if (list == null) { + out.write(nullSequence.getBytes(), 0, nullSequence.getLength()); + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + out.write(separator); + } + serializeNoEncode(out, list.get(i), fields.get(i).getFieldObjectInspector(), + separators, level + 1, nullSequence, escaped, escapeChar, + needsEscape); + } + } + return; + default: + break; + } + + throw new RuntimeException("Unknown category type: "+ objInspector.getCategory()); + } + + + public SerDeStats getSerDeStats() { + // no support for statistics + return null; + } + +} Index: src/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyBinary.java =================================================================== --- src/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyBinary.java (revision 21711) +++ src/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyBinary.java (revision 22627) @@ -18,14 +18,11 @@ package org.apache.hadoop.hive.serde2.lazy; -import java.nio.charset.CharacterCodingException; - import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.serde2.lazy.objectinspector.primitive.LazyBinaryObjectInspector; import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.Text; public class LazyBinary extends LazyPrimitive { @@ -55,6 +52,8 @@ "decoded the data."); } byte[] decoded = arrayByteBase64 ? Base64.decodeBase64(recv) : recv; + //use the original bytes in case decoding should fail + decoded = decoded.length > 0 ? decoded : recv; data.set(decoded, 0, decoded.length); } Index: src/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java =================================================================== --- src/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java (revision 21711) +++ src/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java (revision 22627) @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -275,4 +276,62 @@ public long getRawDataSerializedSize() { return serializedSize; } + + //parse the struct using multi-char delimiter + public void parseMultiDelimit(String rawRow,String fieldDelimit){ + if(rawRow==null || fieldDelimit==null) { + return; + } + if (!rawRow.replaceAll(Pattern.quote(fieldDelimit), String.valueOf((char) oi.getSeparator())) + .equals(new String(bytes.getData(), 0, length))) { + return; + } + if (fields == null) { + List fieldRefs = ((StructObjectInspector) oi) + .getAllStructFieldRefs(); + fields = new LazyObject[fieldRefs.size()]; + for (int i = 0; i < fields.length; i++) { + fields[i] = LazyFactory.createLazyObject(fieldRefs.get(i) + .getFieldObjectInspector()); + } + fieldInited = new boolean[fields.length]; + // Extra element to make sure we have the same formula to compute the + // length of each element of the array. + startPosition = new int[fields.length + 1]; + } + //the indexes of the delimiters + int[] delimitIndexes=findIndexes(rawRow, fieldDelimit); + int diff=fieldDelimit.length()-1; + //first field always starts from 0, even when missing + startPosition[0]=0; + for(int i=1;i