commit 8fcf5067f7ee58da288eaefff71c51017de5f425 Author: Alan Gates Date: Tue Nov 17 14:07:35 2015 -0800 HIVE-12443 Hive Streaming should expose encoding and serdes for testing diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index c20e04c..5c15675 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -48,7 +48,7 @@ import java.util.List; -abstract class AbstractRecordWriter implements RecordWriter { +public abstract class AbstractRecordWriter implements RecordWriter { static final private Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName()); final HiveConf conf; @@ -110,7 +110,22 @@ protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) return result; } - abstract SerDe getSerde() throws SerializationError; + /** + * Get the SerDe for the Objects created by {@link #encode}. This is public so that test + * frameworks can use it. + * @return serde + * @throws SerializationError + */ + public abstract SerDe getSerde() throws SerializationError; + + /** + * Encode a record as an Object that Hive can read with the ObjectInspector associated with the + * serde returned by {@link #getSerde}. This is public so that test frameworks can use it. + * @param record record to be deserialized + * @return deserialized record as an Object + * @throws SerializationError + */ + public abstract Object encode(byte[] record) throws SerializationError; protected abstract ObjectInspector[] getBucketObjectInspectors(); protected abstract StructObjectInspector getRecordObjectInspector(); diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java index b4d94e3..4f1154e 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java @@ -243,7 +243,7 @@ public void write(long transactionId, byte[] record) } @Override - SerDe getSerde() throws SerializationError { + public SerDe getSerde() throws SerializationError { return serde; } @@ -260,7 +260,8 @@ protected LazySimpleStructObjectInspector getRecordObjectInspector() { return bucketObjInspectors; } - private Object encode(byte[] record) throws SerializationError { + @Override + public Object encode(byte[] record) throws SerializationError { try { BytesWritable blob = new BytesWritable(); blob.set(record, 0, record.length); diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java index 6ab21eb..28ea7d6 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java @@ -86,7 +86,7 @@ public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) } @Override - SerDe getSerde() throws SerializationError { + public SerDe getSerde() throws SerializationError { return serde; } @@ -137,13 +137,8 @@ private static JsonSerDe createSerde(Table tbl, HiveConf conf) } } - /** - * Encode Utf8 encoded string bytes using JsonSerde - * @param utf8StrRecord - * @return The encoded object - * @throws SerializationError - */ - private Object encode(byte[] utf8StrRecord) throws SerializationError { + @Override + public Object encode(byte[] utf8StrRecord) throws SerializationError { try { Text blob = new Text(utf8StrRecord); return serde.deserialize(blob);