diff --git contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java new file mode 100644 index 0000000..c96cc52 --- /dev/null +++ contrib/src/java/org/apache/hadoop/hive/contrib/serde2/MultiDelimitSerDe.java @@ -0,0 +1,301 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.contrib.serde2; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +public class MultiDelimitSerDe implements SerDe { + + public static final Log LOG = LogFactory.getLog(MultiDelimitSerDe.class.getName()); + public static final byte[] DefaultSeparators = {(byte) 1, (byte) 2, (byte) 3}; + + int numColumns; + String fieldDelimited; + // we don't support using multiple chars as delimiters within complex types + // collection separator + byte collSep; + // map key separator + byte keySep; + + // The object for storing row data + LazyStruct cachedLazyStruct; + //the lazy struct object inspector + ObjectInspector cachedObjectInspector; + + // The wrapper for byte array + ByteArrayRef byteArrayRef; + + LazySimpleSerDe.SerDeParameters serdeParams = null; + //the output stream of serialized objects + ByteStream.Output serializeStream = new ByteStream.Output(); + //the Writable to return in serialize + Text serializeCache = new Text(); + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + + //get the SerDe parameters + serdeParams=LazySimpleSerDe.initSerdeParams(conf,tbl,getClass().getName()); + + fieldDelimited = tbl.getProperty("field.delimited"); + if(fieldDelimited==null || fieldDelimited.equals("")){ + fieldDelimited=tbl.getProperty("input.delimited"); + } + + //TODO: get the "encoding" parameter, set default charset + + //get the collection separator and map key separator + collSep=LazySimpleSerDe.getByte(tbl.getProperty("collection.delimited"), DefaultSeparators[1]); + keySep=LazySimpleSerDe.getByte(tbl.getProperty("mapkey.delimited"), DefaultSeparators[2]); + serdeParams.getSeparators()[1]=collSep; + serdeParams.getSeparators()[2]=keySep; + + // Create the ObjectInspectors for the fields + cachedObjectInspector = LazyFactory.createLazyStructInspector(serdeParams + .getColumnNames(), serdeParams.getColumnTypes(), serdeParams + .getSeparators(), serdeParams.getNullSequence(), serdeParams + .isLastColumnTakesRest(), serdeParams.isEscaped(), serdeParams + .getEscapeChar()); + + cachedLazyStruct = (LazyStruct) LazyFactory.createLazyObject(cachedObjectInspector); + + assert serdeParams.getColumnNames().size()==serdeParams.getColumnTypes().size(); + numColumns = serdeParams.getColumnNames().size(); + } + + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return cachedObjectInspector; + } + + @Override + public Class getSerializedClass() { + return Text.class; + } + + long getNextNumberToDisplay(long now) { + return now * 10; + } + + @Override + public Object deserialize(Writable blob) throws SerDeException { + + if (fieldDelimited == null || fieldDelimited.equals("")) { + throw new SerDeException("This table does not have serde property \"field.delimited\" or \"input.delimited\"!"); + } + + if (byteArrayRef == null) { + byteArrayRef = new ByteArrayRef(); + } + + //we use the default field delimiter('\1') to replace the multiple-char field delimiter + //but we cannot use it to parse the row since column data can contain '\1' as well + String rowStr; + if(blob instanceof BytesWritable){ + BytesWritable b = (BytesWritable) blob; + //TODO: use appropriate charset to decode + rowStr=new String(b.getBytes()); + } + else if(blob instanceof Text){ + Text rowText = (Text) blob; + rowStr=rowText.toString(); + }else{ + throw new SerDeException(getClass().toString()+ ": expects either BytesWritable or Text object!"); + } + //TODO: use appropriate charset to encode + byteArrayRef.setData(rowStr.replaceAll(Pattern.quote(this.fieldDelimited), "\1").getBytes()); + cachedLazyStruct.init(byteArrayRef, 0, byteArrayRef.getData().length); + //use the multi-char delimiter to parse the lazy struct + cachedLazyStruct.parseMultiDelimit(rowStr.getBytes(), fieldDelimited.getBytes()); + return cachedLazyStruct; + } + + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + if (fieldDelimited == null || fieldDelimited.equals("")) { + throw new SerDeException("This table does not have serde property \"field.delimited\" or \"input.delimited\"!"); + } + + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + if (fields.size() != numColumns) { + throw new SerDeException("Cannot serialize the object because there are " + + fields.size() + " fields but the table has " + numColumns + + " columns."); + } + + serializeStream.reset(); + // Get all data out. + for (int c = 0; c < numColumns; c++) { + //write the delimiter to the stream, which means we don't need output.format.string anymore + if (c > 0) { + serializeStream.write(fieldDelimited.getBytes(), 0, fieldDelimited.getBytes().length); + } + + Object field = (list == null ? null : list.get(c)); + ObjectInspector fieldOI = fields.get(c).getFieldObjectInspector(); + + try { + serializeNoEncode(serializeStream, field, fieldOI, serdeParams.getSeparators(), 1, + serdeParams.getNullSequence(), serdeParams.isEscaped(), serdeParams.getEscapeChar(), + serdeParams.getNeedsEscape()); + } catch (IOException e) { + throw new SerDeException(e); + } + } + + serializeCache.set(serializeStream.getData(), 0, serializeStream.getLength()); + return serializeCache; + } + + //this is basically the same as LazySimpleSerDe.serialize. except that we don't use Base64 to encode binary data + //because we're using printable string as delimiter. consider such a row "strAQ==\1", str is a string, AQ== is the + //delimiter and \1 is the binary data. + public static void serializeNoEncode(ByteStream.Output out, Object obj, + ObjectInspector objInspector, byte[] separators, int level, + Text nullSequence, boolean escaped, byte escapeChar, boolean[] needsEscape) + throws IOException, SerDeException { + + if (obj == null) { + out.write(nullSequence.getBytes(), 0, nullSequence.getLength()); + return; + } + + char separator; + List list; + switch (objInspector.getCategory()) { + case PRIMITIVE: + PrimitiveObjectInspector oi=(PrimitiveObjectInspector) objInspector; + if(oi.getPrimitiveCategory()==PrimitiveCategory.BINARY){ + BytesWritable bw = ((BinaryObjectInspector) oi).getPrimitiveWritableObject(obj); + byte[] toWrite = new byte[bw.getLength()]; + System.arraycopy(bw.getBytes(), 0,toWrite, 0, bw.getLength()); + out.write(toWrite, 0, toWrite.length); + } + else { + LazyUtils.writePrimitiveUTF8(out, obj,oi, escaped, escapeChar,needsEscape); + } + return; + case LIST: + separator = (char) separators[level]; + ListObjectInspector loi = (ListObjectInspector) objInspector; + list = loi.getList(obj); + ObjectInspector eoi = loi.getListElementObjectInspector(); + if (list == null) { + out.write(nullSequence.getBytes(), 0, nullSequence.getLength()); + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + out.write(separator); + } + serializeNoEncode(out, list.get(i), eoi, separators, level + 1, nullSequence, + escaped, escapeChar, needsEscape); + } + } + return; + case MAP: + separator = (char) separators[level]; + char keyValueSeparator =(char) separators[level+1]; + + MapObjectInspector moi = (MapObjectInspector) objInspector; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + Map map = moi.getMap(obj); + if (map == null) { + out.write(nullSequence.getBytes(), 0, nullSequence.getLength()); + } else { + boolean first = true; + for (Map.Entry entry : map.entrySet()) { + if (first) { + first = false; + } else { + out.write(separator); + } + serializeNoEncode(out, entry.getKey(), koi, separators, level + 2, + nullSequence, escaped, escapeChar, needsEscape); + out.write(keyValueSeparator); + serializeNoEncode(out, entry.getValue(), voi, separators, level + 2, + nullSequence, escaped, escapeChar, needsEscape); + } + } + return; + case STRUCT: + separator = (char) separators[level]; + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + list = soi.getStructFieldsDataAsList(obj); + if (list == null) { + out.write(nullSequence.getBytes(), 0, nullSequence.getLength()); + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + out.write(separator); + } + serializeNoEncode(out, list.get(i), fields.get(i).getFieldObjectInspector(), + separators, level + 1, nullSequence, escaped, escapeChar, + needsEscape); + } + } + return; + default: + break; + } + + throw new RuntimeException("Unknown category type: "+ objInspector.getCategory()); + } + + + public SerDeStats getSerDeStats() { + // no support for statistics + return null; + } + +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyBinary.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyBinary.java index ae12f20..c25a1b8 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyBinary.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyBinary.java @@ -55,6 +55,8 @@ public void init(ByteArrayRef bytes, int start, int length) { "decoded the data."); } byte[] decoded = arrayByteBase64 ? Base64.decodeBase64(recv) : recv; + // use the original bytes in case decoding should fail + decoded = decoded.length > 0 ? decoded : recv; data.set(decoded, 0, decoded.length); } diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java index a01cd66..c74af4b 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; +import com.google.common.primitives.Bytes; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.serde2.SerDeException; @@ -28,6 +29,7 @@ import org.apache.hadoop.hive.serde2.StructObject; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.Text; /** @@ -285,4 +287,59 @@ protected void setFieldInited(boolean[] fieldInited) { public long getRawDataSerializedSize() { return serializedSize; } + + // parse the struct using multi-char delimiter + public void parseMultiDelimit(byte[] rawRow, byte[] fieldDelimit) { + if (rawRow == null || fieldDelimit == null) { + return; + } + if (fields == null) { + List fieldRefs = ((StructObjectInspector) oi).getAllStructFieldRefs(); + fields = new LazyObject[fieldRefs.size()]; + for (int i = 0; i < fields.length; i++) { + fields[i] = LazyFactory.createLazyObject(fieldRefs.get(i).getFieldObjectInspector()); + } + fieldInited = new boolean[fields.length]; + startPosition = new int[fields.length + 1]; + } + // the indexes of the delimiters + int[] delimitIndexes = findIndexes(rawRow, fieldDelimit); + int diff = fieldDelimit.length - 1; + // first field always starts from 0, even when missing + startPosition[0] = 0; + for (int i = 1; i < fields.length; i++) { + if (delimitIndexes[i - 1] != -1) { + int start = delimitIndexes[i - 1] + fieldDelimit.length; + startPosition[i] = start - i * diff; + } else { + startPosition[i] = length + 1; + } + } + startPosition[fields.length] = length + 1; + Arrays.fill(fieldInited, false); + parsed = true; + } + + // find all the indexes of the sub byte[] + private int[] findIndexes(byte[] array, byte[] target) { + if (fields.length <= 1) { + return new int[0]; + } + int[] indexes = new int[fields.length - 1]; + Arrays.fill(indexes, -1); + indexes[0] = Bytes.indexOf(array, target); + if (indexes[0] == -1) { + return indexes; + } + int indexInNewArray = indexes[0]; + for (int i = 1; i < indexes.length; i++) { + array = Arrays.copyOfRange(array, indexInNewArray + target.length, array.length); + indexInNewArray = Bytes.indexOf(array, target); + if (indexInNewArray == -1) { + break; + } + indexes[i] = indexInNewArray + indexes[i - 1] + target.length; + } + return indexes; + } }