diff --git a/serde/pom.xml b/serde/pom.xml index f8bcc83..c988aa6 100644 --- a/serde/pom.xml +++ b/serde/pom.xml @@ -70,7 +70,13 @@ libthrift ${libthrift.version} - + + net.sf.opencsv + opencsv + 2.3 + + + junit junit diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/CSVSerde.java b/serde/src/java/org/apache/hadoop/hive/serde2/CSVSerde.java new file mode 100644 index 0000000..d88e09e --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/CSVSerde.java @@ -0,0 +1,181 @@ +package org.apache.hadoop.hive.serde2; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +import java.io.CharArrayReader; +import java.io.IOException; +import java.io.Reader; +import java.io.StringWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import au.com.bytecode.opencsv.CSVReader; +import au.com.bytecode.opencsv.CSVWriter; + +/** + * CSVSerde. + * + */ +public final class CSVSerde implements SerDe { + + private ObjectInspector inspector; + private String[] outputFields; + private int numCols; + private List row; + + private char separatorChar; + private char quoteChar; + private char escapeChar; + + + @Override + public void initialize(final Configuration conf, final Properties tbl) throws SerDeException { + + final List columnNames = Arrays.asList(tbl.getProperty(Constants.LIST_COLUMNS).split(",")); + final List columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(tbl.getProperty(Constants.LIST_COLUMN_TYPES)); + + numCols = columnNames.size(); + + final List columnOIs = new ArrayList(numCols); + + for (int i=0; i< numCols; i++) { + columnOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); + } + + this.inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs); + this.outputFields = new String[numCols]; + row = new ArrayList(numCols); + + for (int i=0; i< numCols; i++) { + row.add(null); + } + + separatorChar = getProperty(tbl, "separatorChar", CSVWriter.DEFAULT_SEPARATOR); + quoteChar = getProperty(tbl, "quoteChar", CSVWriter.DEFAULT_QUOTE_CHARACTER); + escapeChar = getProperty(tbl, "escapeChar", CSVWriter.DEFAULT_ESCAPE_CHARACTER); + } + + private final char getProperty(final Properties tbl, final String property, final char def) { + final String val = tbl.getProperty(property); + + if (val != null) { + return val.charAt(0); + } + + return def; + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + final StructObjectInspector outputRowOI = (StructObjectInspector) objInspector; + final List outputFieldRefs = outputRowOI.getAllStructFieldRefs(); + + if (outputFieldRefs.size() != numCols) { + throw new SerDeException("Cannot serialize the object because there are " + + outputFieldRefs.size() + " fields but the table has " + numCols + " columns."); + } + + // Get all data out. + for (int c = 0; c < numCols; c++) { + final Object field = outputRowOI.getStructFieldData(obj, outputFieldRefs.get(c)); + final ObjectInspector fieldOI = outputFieldRefs.get(c).getFieldObjectInspector(); + + // The data must be of type String + final StringObjectInspector fieldStringOI = (StringObjectInspector) fieldOI; + + // Convert the field to Java class String, because objects of String type + // can be stored in String, Text, or some other classes. + outputFields[c] = fieldStringOI.getPrimitiveJavaObject(field); + } + + final StringWriter writer = new StringWriter(); + final CSVWriter csv = newWriter(writer, separatorChar, quoteChar, escapeChar); + + try { + csv.writeNext(outputFields); + csv.close(); + + return new Text(writer.toString()); + } catch (final IOException ioe) { + throw new SerDeException(ioe); + } + } + + @Override + public Object deserialize(final Writable blob) throws SerDeException { + Text rowText = (Text) blob; + + CSVReader csv = null; + try { + csv = newReader(new CharArrayReader(rowText.toString().toCharArray()), separatorChar, quoteChar, escapeChar); + final String[] read = csv.readNext(); + + for (int i=0; i< numCols; i++) { + if (read != null && i < read.length) { + row.set(i, read[i]); + } else { + row.set(i, null); + } + } + + return row; + } catch (final Exception e) { + throw new SerDeException(e); + } finally { + if (csv != null) { + try { + csv.close(); + } catch (final Exception e) { + // ignore + } + } + } + } + + private CSVReader newReader(final Reader reader, char separator, char quote, char escape) { + // CSVReader will throw an exception if any of separator, quote, or escape is the same, but + // the CSV format specifies that the escape character and quote char are the same... very weird + if (CSVWriter.DEFAULT_ESCAPE_CHARACTER == escape) { + return new CSVReader(reader, separator, quote); + } else { + return new CSVReader(reader, separator, quote, escape); + } + } + + private CSVWriter newWriter(final Writer writer, char separator, char quote, char escape) { + if (CSVWriter.DEFAULT_ESCAPE_CHARACTER == escape) { + return new CSVWriter(writer, separator, quote, ""); + } else { + return new CSVWriter(writer, separator, quote, escape, ""); + } + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return inspector; + } + + @Override + public Class getSerializedClass() { + return Text.class; + } + + public SerDeStats getSerDeStats() { + return null; + } +} diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/TestCSVSerde.java b/serde/src/test/org/apache/hadoop/hive/serde2/TestCSVSerde.java new file mode 100644 index 0000000..806e02c --- /dev/null +++ b/serde/src/test/org/apache/hadoop/hive/serde2/TestCSVSerde.java @@ -0,0 +1,82 @@ +package org.apache.hadoop.hive.serde2; + +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.Text; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class TestCSVSerde { + private final CSVSerde csv = new CSVSerde(); + final Properties props = new Properties(); + + @Before + public void setup() throws Exception { + props.put(Constants.LIST_COLUMNS, "a,b,c"); + props.put(Constants.LIST_COLUMN_TYPES, "string,string,string"); + } + + @Test + public void testDeserialize() throws Exception { + csv.initialize(null, props); + final Text in = new Text("hello,\"yes, okay\",1"); + + final List row = (List) csv.deserialize(in); + + assertEquals("hello", row.get(0)); + assertEquals("yes, okay", row.get(1)); + assertEquals("1", row.get(2)); + } + + + @Test + public void testDeserializeCustomSeparators() throws Exception { + props.put("separatorChar", "\t"); + props.put("quoteChar", "'"); + + csv.initialize(null, props); + + final Text in = new Text("hello\t'yes\tokay'\t1"); + final List row = (List) csv.deserialize(in); + + assertEquals("hello", row.get(0)); + assertEquals("yes\tokay", row.get(1)); + assertEquals("1", row.get(2)); + } + + @Test + public void testDeserializeCustomEscape() throws Exception { + props.put("quoteChar", "'"); + props.put("escapeChar", "\\"); + + csv.initialize(null, props); + + final Text in = new Text("hello,'yes\\'okay',1"); + final List row = (List) csv.deserialize(in); + + assertEquals("hello", row.get(0)); + assertEquals("yes'okay", row.get(1)); + assertEquals("1", row.get(2)); + } +}