commit fa102c5b0cb6f240d954d2e1f88e8bd1cd2200e2 Author: Alan Gates Date: Mon Jul 29 14:35:37 2019 -0700 HIVE-19207 Support avro record writer for streaming ingest. (Alan Gates) diff --git streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java index 03c9fe00a3..e4c9914db3 100644 --- streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java +++ streaming/src/java/org/apache/hive/streaming/AbstractRecordWriter.java @@ -64,7 +64,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractRecordWriter implements RecordWriter { +/** + * All RecordWriter impelmentations should extend this class. + * + * NOTE: the way this class is written it is not thread safe. It sets the state for each record and then + * has its implementations do multiple operations on it. Callers should use a separate instance for each writing + * thread. + * @param type of the records to write + */ +public abstract class AbstractRecordWriter implements RecordWriter { private static final Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName()); private static final String DEFAULT_LINE_DELIMITER_PATTERN = "[\r\n]"; @@ -112,6 +120,14 @@ public AbstractRecordWriter(final String lineDelimiter) { DEFAULT_LINE_DELIMITER_PATTERN : lineDelimiter; } + /** + * Used with implementations that do not operate on text types. + */ + protected AbstractRecordWriter() { + lineDelimiter = ""; + + } + protected static class OrcMemoryPressureMonitor implements HeapMemoryMonitor.Listener { private static final Logger LOG = LoggerFactory.getLogger(OrcMemoryPressureMonitor.class.getName()); private final AtomicBoolean lowMemoryCanary; @@ -299,7 +315,31 @@ protected String getWatermark(String partition) { * @return deserialized record as an Object * @throws SerializationError - any error during serialization or deserialization of record */ - public abstract Object encode(byte[] record) throws SerializationError; + /* + public Object encode(byte[] record) throws SerializationError { + throw new UnsupportedOperationException(); + } + */ + + /** + * This sets the record to be used for subsequent {@link #getEncoded()} and {@link #getRecordLength()} calls. + * @param record record to be encoded and measured. + * @throws SerializationError any error during serialization of the record + */ + protected abstract void encode(T record) throws SerializationError; + + /** + * Encode the record as an Object that Hive can read with the ObjectInspector associated with the + * serde returned by {@link #createSerde}. {@link #encode(Object)} must be called before this. + * @return deserialized record as an Object + */ + protected abstract Object getEncoded(); + + /** + * Get the length of the record. {@link #encode(Object)} must be called before this. + * @return length of the record before encoding. + */ + protected abstract long getRecordLength(); // returns the bucket number to which the record belongs to protected int getBucket(Object row) { @@ -434,13 +474,17 @@ public void close() throws StreamingIOFailure { public void write(final long writeId, final InputStream inputStream) throws StreamingException { try (Scanner scanner = new Scanner(inputStream).useDelimiter(lineDelimiter)) { while (scanner.hasNext()) { - write(writeId, scanner.next().getBytes()); + writeBytes(writeId, scanner.next().getBytes()); } } } - @Override - public void write(final long writeId, final byte[] record) throws StreamingException { + //@Override + // TODO only here for write(InputStream), need to figure out how to get rid of it + private void writeBytes(final long writeId, final byte[] record) throws StreamingException { + // TODO this won't end well if we're not using text, see what I can do to improve this + write(writeId, (T)record); + /* checkAutoFlush(); ingestSizeBytes += record.length; try { @@ -456,6 +500,31 @@ public void write(final long writeId, final byte[] record) throws StreamingExcep throw new StreamingIOFailure("Error writing record in transaction write id (" + writeId + ")", e); } + */ + } + + @Override + public void write(long writeId, T record) throws StreamingException { + checkAutoFlush(); + try { + encode(record); + Object encodedRow = getEncoded(); + long recordSize = getRecordLength(); + int bucket = getBucket(encodedRow); + ingestSizeBytes += recordSize; + List partitionValues = getPartitionValues(encodedRow); + getRecordUpdater(partitionValues, bucket).insert(writeId, encodedRow); + + // ingest size bytes gets resetted on flush() whereas connection stats is not + conn.getConnectionStats().incrementRecordsWritten(); + conn.getConnectionStats().incrementRecordsSize(recordSize); + } catch (IOException e) { + throw new StreamingIOFailure("Error writing record in transaction write id (" + + writeId + ")", e); + } catch (ClassCastException e) { + throw new SerializationError("Column type mismatch when serializing record, usually" + + " this means the record doesn't match the schema the writer expected", e); + } } protected void checkAutoFlush() throws StreamingIOFailure { diff --git streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java index 6ab3ffeb8c..6570a28c5c 100644 --- streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java +++ streaming/src/java/org/apache/hive/streaming/AbstractStreamingTransaction.java @@ -105,6 +105,25 @@ public void write(final InputStream inputStream) throws StreamingException { } } + @Override + public void write(T record) throws StreamingException { + checkIsClosed(); + boolean success = false; + try { + recordWriter.write(getCurrentWriteId(), record); + success = true; + } catch (SerializationError ex) { + //this exception indicates that a {@code record} could not be parsed and the + //caller can decide whether to drop it or send it to dead letter queue. + //rolling back the txn and retrying won't help since the tuple will be exactly the same + //when it's replayed. + success = true; + throw ex; + } finally { + markDead(success); + } + } + /** * A transaction batch opens a single HDFS file and writes multiple transaction to it. If there is any issue * with the write, we can't continue to write to the same file any as it may be corrupted now (at the tail). diff --git streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index f2beafea9e..bd830f262c 100644 --- streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -640,6 +640,12 @@ public void write(final InputStream inputStream) throws StreamingException { currentTransactionBatch.write(inputStream); } + @Override + public void write(T record) throws StreamingException { + checkState(); + currentTransactionBatch.write(record); + } + /** * Close connection */ diff --git streaming/src/java/org/apache/hive/streaming/RecordWriter.java streaming/src/java/org/apache/hive/streaming/RecordWriter.java index 5b027548a9..d69e6d83e9 100644 --- streaming/src/java/org/apache/hive/streaming/RecordWriter.java +++ streaming/src/java/org/apache/hive/streaming/RecordWriter.java @@ -21,13 +21,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.io.Writable; import java.io.InputStream; import java.util.List; import java.util.Set; -public interface RecordWriter { +public interface RecordWriter { /** * Initialize record writer. @@ -54,17 +55,19 @@ default void init(StreamingConnection connection, long minWriteId, } /** - * Writes using a hive RecordUpdater. + * Writes using a hive RecordUpdater. This is intended for use with text types, like CSV or JSON. Implementation + * is optional. * * @param writeId - the write ID of the table mapping to Txn in which the write occurs * @param record - the record to be written * @throws StreamingException - thrown when write fails */ - void write(long writeId, byte[] record) throws StreamingException; + //void write(long writeId, byte[] record) throws StreamingException; /** * Writes using a hive RecordUpdater. The specified input stream will be automatically closed - * by the API after reading all the records out of it. + * by the API after reading all the records out of it. This is intended for use with text types, like CSV or + * JSON. Implementation is optional. * * @param writeId - the write ID of the table mapping to Txn in which the write occurs * @param inputStream - the record to be written @@ -72,6 +75,16 @@ default void init(StreamingConnection connection, long minWriteId, */ void write(long writeId, InputStream inputStream) throws StreamingException; + /** + * Writes using a hive RecordUpdater. This is intended for use with non-text types, such as Avro. Implementation + * is optional. + * @param writeId the write ID of the table mapping to Txn in which the write occurs + * @param record the record to be written + * @throws StreamingException if the write fails, serialization fails, or the underlying implementation does not + * support writing this type of record. + */ + void write(long writeId, T record) throws StreamingException; + /** * Flush records from buffer. Invoked by TransactionBatch.commitTransaction() * diff --git streaming/src/java/org/apache/hive/streaming/SerializationError.java streaming/src/java/org/apache/hive/streaming/SerializationError.java index 1473ff80a7..f273a79683 100644 --- streaming/src/java/org/apache/hive/streaming/SerializationError.java +++ streaming/src/java/org/apache/hive/streaming/SerializationError.java @@ -20,7 +20,11 @@ public class SerializationError extends StreamingException { - SerializationError(String msg, Exception e) { + public SerializationError(String msg) { + super(msg); + } + + public SerializationError(String msg, Exception e) { super(msg,e); } } diff --git streaming/src/java/org/apache/hive/streaming/StreamingConnection.java streaming/src/java/org/apache/hive/streaming/StreamingConnection.java index eb7c3c68bb..40b5bca3b7 100644 --- streaming/src/java/org/apache/hive/streaming/StreamingConnection.java +++ streaming/src/java/org/apache/hive/streaming/StreamingConnection.java @@ -51,13 +51,22 @@ void write(byte[] record) throws StreamingException; /** - * Write record using RecordWriter. + * Write record using RecordWriter. This can only be used with records in byte format, not object format. * * @param inputStream - input stream of records * @throws StreamingException - if there are errors when writing */ void write(InputStream inputStream) throws StreamingException; + /** + * Write records in object format rather than text. To use this you must use a {@link RecordWriter} that knows + * how to write records of this format. + * @param record record to write. + * @param type of the record to write out + * @throws StreamingException if there are errors writing + */ + void write(T record) throws StreamingException; + /** * Commit a transaction to make the writes visible for readers. * diff --git streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java index 01c1164d58..73050eeab0 100644 --- streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java +++ streaming/src/java/org/apache/hive/streaming/StreamingTransaction.java @@ -80,6 +80,15 @@ void commit(@Nullable Set partitions, @Nullable String key, */ void write(InputStream stream) throws StreamingException; + /** + * Write object based data within a transaction. This expects {@link #beginNextTransaction()} to have been + * called before this and for {@link #commit()} to be called after (perhaps after many calls to this). + * @param record record to write + * @param type of the record to write + * @throws StreamingException if the attempt to serialize and write the record fails. + */ + void write(T record) throws StreamingException; + /** * Free/close resources used by the streaming transaction. * @throws StreamingException diff --git streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java index 13de1d4719..f083045fd0 100644 --- streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java +++ streaming/src/java/org/apache/hive/streaming/StrictDelimitedInputWriter.java @@ -19,9 +19,7 @@ package org.apache.hive.streaming; -import java.io.InputStream; import java.util.Properties; -import java.util.Scanner; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; @@ -39,11 +37,13 @@ * * NOTE: This record writer is NOT thread-safe. Use one record writer per streaming connection. */ -public class StrictDelimitedInputWriter extends AbstractRecordWriter { +public class StrictDelimitedInputWriter extends AbstractRecordWriter { private char fieldDelimiter; private char collectionDelimiter; private char mapKeyDelimiter; private LazySimpleSerDe serde; + private Object encoded; + private long length; private StrictDelimitedInputWriter(Builder builder) { super(builder.lineDelimiter); @@ -88,11 +88,12 @@ public StrictDelimitedInputWriter build() { } @Override - public Object encode(byte[] record) throws SerializationError { + public void encode(byte[] record) throws SerializationError { try { BytesWritable blob = new BytesWritable(); blob.set(record, 0, record.length); - return serde.deserialize(blob); + encoded = serde.deserialize(blob); + length = record.length; } catch (SerDeException e) { throw new SerializationError("Unable to convert byte[] record into Object", e); } @@ -115,4 +116,14 @@ public LazySimpleSerDe createSerde() throws SerializationError { throw new SerializationError("Error initializing serde", e); } } + + @Override + protected Object getEncoded() { + return encoded; + } + + @Override + protected long getRecordLength() { + return length; + } } diff --git streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java index 416bd67cfd..b23fcbfc0e 100644 --- streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java +++ streaming/src/java/org/apache/hive/streaming/StrictJsonWriter.java @@ -34,8 +34,10 @@ * * NOTE: This record writer is NOT thread-safe. Use one record writer per streaming connection. */ -public class StrictJsonWriter extends AbstractRecordWriter { +public class StrictJsonWriter extends AbstractRecordWriter { private JsonSerDe serde; + private Object encoded; + private long length; public StrictJsonWriter(final Builder builder) { super(builder.lineDelimiter); @@ -79,12 +81,23 @@ public JsonSerDe createSerde() throws SerializationError { } @Override - public Object encode(byte[] utf8StrRecord) throws SerializationError { + public void encode(byte[] utf8StrRecord) throws SerializationError { try { Text blob = new Text(utf8StrRecord); - return serde.deserialize(blob); + encoded = serde.deserialize(blob); + length = utf8StrRecord.length; } catch (SerDeException e) { throw new SerializationError("Unable to convert byte[] record into Object", e); } } + + @Override + protected Object getEncoded() { + return encoded; + } + + @Override + protected long getRecordLength() { + return length; + } } diff --git streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java index 12516f5914..e05df087ba 100644 --- streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java +++ streaming/src/java/org/apache/hive/streaming/StrictRegexWriter.java @@ -38,9 +38,11 @@ * * NOTE: This record writer is NOT thread-safe. Use one record writer per streaming connection. */ -public class StrictRegexWriter extends AbstractRecordWriter { +public class StrictRegexWriter extends AbstractRecordWriter { private String regex; private RegexSerDe serde; + private Object encoded; + private long length; private StrictRegexWriter(final Builder builder) { super(builder.lineDelimiter); @@ -108,13 +110,23 @@ public RegexSerDe createSerde() throws SerializationError { * @throws SerializationError - in case of any deserialization error */ @Override - public Object encode(byte[] utf8StrRecord) throws SerializationError { + public void encode(byte[] utf8StrRecord) throws SerializationError { try { Text blob = new Text(utf8StrRecord); - return serde.deserialize(blob); + encoded = serde.deserialize(blob); + length = utf8StrRecord.length; } catch (SerDeException e) { throw new SerializationError("Unable to convert byte[] record into Object", e); } } + @Override + protected Object getEncoded() { + return encoded; + } + + @Override + protected long getRecordLength() { + return length; + } } diff --git streaming/src/java/org/apache/hive/streaming/avro/AvroWriterAvroSerde.java streaming/src/java/org/apache/hive/streaming/avro/AvroWriterAvroSerde.java new file mode 100644 index 0000000000..2acc3c588d --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/avro/AvroWriterAvroSerde.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.Writable; +import org.apache.hive.streaming.SerializationError; + +import javax.annotation.Nullable; +import java.util.Properties; + +abstract class AvroWriterAvroSerde extends AbstractSerDe { + + protected TranslatorInfo tInfo; + protected Schema avroSchema; + + protected AvroWriterAvroSerde(Schema avroSchema) { + this.avroSchema = avroSchema; + } + + @Override + public void initialize(@Nullable Configuration conf, Properties tbl) { + // Not necessary, as only the outer class will be constructing it. + throw new UnsupportedOperationException(); + } + + @Override + public Class getSerializedClass() { + // Not necessary, as this SerDe won't serialize + throw new UnsupportedOperationException(); + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) { + // Not necessary, this SerDe is for deserialization only + throw new UnsupportedOperationException(); + } + + @Override + public SerDeStats getSerDeStats() { + // Don't think this is necessary. + throw new UnsupportedOperationException(); + } + + @Override + public Object deserialize(Writable blob) { + // Use deserialize(GenericRecord) instead + throw new UnsupportedOperationException(); + } + + @Override + public ObjectInspector getObjectInspector() { + return tInfo.getObjectInspector(); + } + + /** + * This should write its result into tInfo. + */ + abstract protected void buildTranslator() throws SerializationError; + + abstract protected Object deserialize(GenericRecord record) throws SerializationError; +} diff --git streaming/src/java/org/apache/hive/streaming/avro/DeserializerOutput.java streaming/src/java/org/apache/hive/streaming/avro/DeserializerOutput.java new file mode 100644 index 0000000000..2da4501438 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/avro/DeserializerOutput.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming.avro; + +class DeserializerOutput { + private final long addedLength; + private final Object deserialized; + + DeserializerOutput(long addedLength, Object deserialized) { + this.addedLength = addedLength; + this.deserialized = deserialized; + } + + long getAddedLength() { + return addedLength; + } + + Object getDeserialized() { + return deserialized; + } +} diff --git streaming/src/java/org/apache/hive/streaming/avro/MappingAvroWriter.java streaming/src/java/org/apache/hive/streaming/avro/MappingAvroWriter.java new file mode 100644 index 0000000000..32c5d7e98b --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/avro/MappingAvroWriter.java @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hive.streaming.AbstractRecordWriter; +import org.apache.hive.streaming.SerializationError; +import org.apache.hive.streaming.StreamingConnection; +import org.apache.hive.streaming.StreamingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +/** + * A writer for Avro that accepts a map describing how Avro columns should be translated to Hive columns. + * This allows ingesting Avro records into Hive tables that do not exactly match the Avro schema. It is + * assumed that every Avro record has all the columns referenced in the mapping. + * + *

If you are writing into a partitioned table, you must provide columns for each partition column + * for each record.

+ * + *

The access into Avro can be into elements of a complex type. For example, an Avro record that had + * a field a could be mapped to a Hive top level column a if the rest of the record is not + * of interest. For unions created for nullable types, there is no need to specify the union in the map, + * this will "see through it" to the non-nullable type. The syntax for specifying elements of complex + * types is a basic path type expression: + *

    + *
  • maps: colname[mapkey]
  • + *
  • records: colname.subcolname
  • + *
  • lists: colname[element_number]
  • + *
  • unions: colname[offset_number]
  • + *
+ *

This can be multiple level, so if there is a table T with column a which is a union where the 1st element + * is a record with a column b which is a map with value type integer, then you could map the the + * total key of that map to an integer column with a[1].b[total]

+ * + *

At this time writing in to non-top level columns in Hive (ie, into a field of a record) is not supported.

+ */ +public class MappingAvroWriter extends AbstractRecordWriter { + private static final Logger LOG = LoggerFactory.getLogger(MappingAvroWriter.class); + + private final MappingAvroWriterAvroSerDe serde; + private final Map schemaMapping; + private Object encoded; + private long length; + private List hiveSchema; + + private MappingAvroWriter(Builder builder) { + Schema avroSchema = builder.avroSchema == null ? new Schema.Parser().parse(builder.avroSchemaStr) : builder.avroSchema; + serde = new MappingAvroWriterAvroSerDe(avroSchema); + schemaMapping = builder.schemaMapping; + } + + public static class Builder { + private Schema avroSchema; // Avro schema + private String avroSchemaStr; // Avro schema as a string + private Map schemaMapping; + + /** + * Build with an Avro schema object. Call this or {@link #withAvroSchema(String)} but not both. + * @param schema Avro schema + * @return this pointer + */ + public Builder withAvroSchema(Schema schema) { + this.avroSchema = schema; + return this; + } + + /** + * Build with an Avro schema as a String. Useful if you are reading the value from a property or a file. + * Call this or {@link #withAvroSchema(Schema)} but not both. + * @param schema Avro schema as a string + * @return this pointer + */ + public Builder withAvroSchema(String schema) { + this.avroSchemaStr = schema; + return this; + } + + /** + * Set the mapping of the Hive schema to the Avro schema. The keys are Hive column names. + * The values are Avro column names (or a path to an element of a complex type). + * See class level comments on syntax for accessing elements of Avro complex types. + * @param mapping schema mapping + * @return this pointer + */ + public Builder withSchemaMapping(Map mapping) { + if (schemaMapping == null) schemaMapping = new HashMap<>(); + for (Map.Entry mapEntry : mapping.entrySet()) { + schemaMapping.put(mapEntry.getKey().toLowerCase(), mapEntry.getValue()); + } + return this; + } + + /** + * Add a column to the mapping of the Avro schema to the Hive schema. + * @param hiveCol Hive column name. + * @param avroCol Avro column name, or path to element of Avro complex type + * @return + */ + public Builder addMappingColumn(String hiveCol, String avroCol) { + if (schemaMapping == null) schemaMapping = new HashMap<>(); + schemaMapping.put(hiveCol.toLowerCase(), avroCol); + return this; + } + + public MappingAvroWriter build() { + if (avroSchemaStr == null && avroSchema == null) { + throw new IllegalStateException("You must provide an Avro schema, either as a string or an object"); + } + + if (avroSchemaStr != null && avroSchema != null) { + throw new IllegalStateException("You have provided two schemas, provide either an object or a string"); + } + + if (schemaMapping == null) { + throw new IllegalStateException("You have not provided a schema mapping"); + } + + return new MappingAvroWriter(this); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + @Override + public AbstractSerDe createSerde() throws SerializationError { + return serde; + } + + @Override + public void init(StreamingConnection conn, long minWriteId, long maxWriteId, int statementId) throws StreamingException { + // Override this so we can set up our translator properly, as it needs the Hive schema, which is only available once + // the RecordWriter has been embedded in a StreamingConection. However, this leads to an extremely intricate dance. + // We have to build the translator before super.init is called because it fetches the ObjectInspector. + // But if we get an exception while building the translator we still need to go ahead and call super.init so it + // can set itself up properly, even though we're just going to tear it all down. Otherwise it fails during the + // connection close. + StreamingException cachedException = null; + try { + hiveSchema = conn.getTable().getAllCols(); + serde.buildTranslator(); + } catch (SerializationError e) { + cachedException = e; + // This is a hack and a half, but I can't think of a better way around it atm. + // Build a quick fake object inspector so the following init call doesn't blow up. All the fields can be + // string as this object inspector shouldn't be used for anything other than fetching the names of + // partition columns + List colNames = new ArrayList<>(hiveSchema.size()); + List ois = new ArrayList<>(hiveSchema.size()); + for (FieldSchema fs : hiveSchema) { + colNames.add(fs.getName()); + ois.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING)); + } + serde.setFakeObjectInspector(ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois)); + } + super.init(conn, minWriteId, maxWriteId, statementId); + if (cachedException != null) throw cachedException; + } + + @Override + protected void encode(GenericRecord record) throws SerializationError { + try { + encoded = serde.deserialize(record); + } catch (ArrayIndexOutOfBoundsException e) { + // This generally means the schema is screwed up + throw new SerializationError("Unable to serialize record, likely due to record schema not matching schema " + + "passed to writer", e); + } + } + + @Override + protected Object getEncoded() { + return encoded; + } + + @Override + protected long getRecordLength() { + return length; + } + + /** + * This rather specialized SerDe builds a decoder to handle every row. It does make the assumption that + * every row passed to it has the same schema. + */ + private class MappingAvroWriterAvroSerDe extends AvroWriterAvroSerde { + + public MappingAvroWriterAvroSerDe(Schema avroSchema) { + super(avroSchema); + // Can't build the translator here, because it needs the table data, and the table info won't be + // set until initialize has been called. + } + + + + @Override + protected void buildTranslator() throws SerializationError { + // Do a quick sanity to check to assure that all the Hive columns they specified actually exist + // Have to make a copy of the key set to avoid screwing up the map. + Set hiveColsFromSchemaMapping = new HashSet<>(schemaMapping.keySet()); + for (FieldSchema fs : hiveSchema) hiveColsFromSchemaMapping.remove(fs.getName()); + if (hiveColsFromSchemaMapping.size() > 0) { + throw new SerializationError("Unknown Hive columns " + StringUtils.join(hiveColsFromSchemaMapping, ", ") + + " referenced in schema mapping"); + } + final int size = schemaMapping.size(); + List colNames = new ArrayList<>(size); + List ois = new ArrayList<>(size); + // a map of Hive field name to (avro field name, deserializer) + final Map deserializers = new HashMap<>(size); + for (FieldSchema fs : hiveSchema) { + colNames.add(fs.getName()); + String avroCol = schemaMapping.get(fs.getName().toLowerCase()); + if (avroCol == null) { + // This wasn't in the map, so just set it to null + ObjectInspector oi = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING); + + ois.add(oi); + deserializers.put(fs.getName(), null); + // TODO should I be setting this to a default value? + } else { + String topLevelColName = findTopLevelColName(avroCol); + Schema.Field field = avroSchema.getField(topLevelColName); + // Make sure the mapped to Avro column exists + if (field == null) throw new SerializationError("Mapping to non-existent avro column " + topLevelColName); + MappingTranslatorInfo mtInfo = mapAvroColumn(avroCol, field.schema(), topLevelColName); + ois.add(mtInfo.getObjectInspector()); + deserializers.put(fs.getName(), mtInfo); + } + } + + tInfo = new TranslatorInfo(ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois), + o -> { + GenericData.Record record = (GenericData.Record) o; + assert record != null; + List row = new ArrayList<>(size); + long length = 0; + for (FieldSchema fs : hiveSchema) { + MappingTranslatorInfo mtInfo = deserializers.get(fs.getName()); + if (mtInfo == null) { + row.add(null); + } else { + DeserializerOutput output = mtInfo.getDeserializer().apply(record.get(mtInfo.getTopLevelColumn())); + if (output == null) { + row.add(null); + } else { + length += output.getAddedLength(); + row.add(output.getDeserialized()); + } + } + } + return new DeserializerOutput(length, row); + }); + } + + protected Object deserialize(GenericRecord record) { + assert tInfo != null; + DeserializerOutput output = tInfo.getDeserializer().apply(record); + length = output.getAddedLength(); + return output.getDeserialized(); + } + + void setFakeObjectInspector(ObjectInspector oi) { + tInfo = new TranslatorInfo(oi, null); + } + + private String findTopLevelColName(String avroCol) { + return avroCol.split("[\\[.]", 2)[0]; + } + + private String findKey(String avroCol) throws SerializationError { + assert avroCol.charAt(0) == '['; + int closeBracketAt = avroCol.indexOf(']'); + if (closeBracketAt < 0) throw new SerializationError("Unmatched [ in " + avroCol); + return avroCol.substring(1, closeBracketAt); + } + + private String findIndexRemainder(String avroCol) { + int closeBracketAt = avroCol.indexOf(']'); + assert closeBracketAt > 0; // Since this should always be called after findKey we should be guaranteed + // that we've already checked for the close bracket. + return closeBracketAt < avroCol.length() - 1 ? avroCol.substring(closeBracketAt + 1) : null; + + } + + private String findRemainder(String avroCol) { + for (int i = 0; i < avroCol.length(); i++) { + if (avroCol.charAt(i) == '.' || avroCol.charAt(i) == '[') return avroCol.substring(i); + } + return null; + } + + private MappingTranslatorInfo mapAvroColumn(String avroCol, Schema schema, String topLevelColName) throws SerializationError { + return new MappingTranslatorInfo(parseAvroColumn(avroCol, schema), topLevelColName); + } + + private TranslatorInfo parseAvroColumn(String avroCol, Schema schema) throws SerializationError { + if (avroCol.charAt(0) == '.') { + // its a record + if (schema.getType() != Schema.Type.RECORD) { + throw new SerializationError("Attempt to dereference '" + avroCol + "' when containing column is not a record"); + } + String fieldName = findTopLevelColName(avroCol.substring(1)); + Schema.Field innerField = schema.getField(fieldName); + if (innerField == null) { + throw new SerializationError("Attempt to reference non-existent record field '" + fieldName + "'"); + } + final TranslatorInfo subInfo = getSubTranslatorInfo(findRemainder(avroCol.substring(1)), innerField.schema()); + return new TranslatorInfo(subInfo.getObjectInspector(), o -> { + GenericData.Record record = (GenericData.Record)o; + Object element = record.get(fieldName); + return element == null ? null : subInfo.getDeserializer().apply(element); + }); + } else if (avroCol.charAt(0) == '[') { + // it's a array, map, or union + String key = findKey(avroCol); + String remainder = findIndexRemainder(avroCol); + switch (schema.getType()) { + case ARRAY: + int index; + try { + index = Integer.valueOf(key); + } catch (NumberFormatException e) { + throw new SerializationError("Attempt to dereference array with non-number '" + key + "'", e); + } + final TranslatorInfo arraySubInfo = getSubTranslatorInfo(remainder, schema.getElementType()); + return new TranslatorInfo(arraySubInfo.getObjectInspector(), o -> { + List avroList = (List)o; + Object element = (index >= avroList.size()) ? null : avroList.get(index); + return element == null ? null : arraySubInfo.getDeserializer().apply(element); + }); + + case MAP: + final TranslatorInfo mapSubInfo = getSubTranslatorInfo(remainder, schema.getValueType()); + return new TranslatorInfo(mapSubInfo.getObjectInspector(), o -> { + Map avroMap = (Map)o; + Object val = avroMap.get(key); + return val == null ? null : mapSubInfo.getDeserializer().apply(val); + }); + + case UNION: + int unionTag; + try { + unionTag = Integer.valueOf(key); + } catch (NumberFormatException e) { + throw new SerializationError("Attempt to dereference union with non-number '" + key + "'", e); + } + if (unionTag >= schema.getTypes().size()) { + throw new SerializationError("Attempt to read union element " + unionTag + " in union with only " + + schema.getTypes().size() + " elements"); + } + final TranslatorInfo unionSubInfo = getSubTranslatorInfo(remainder, schema.getTypes().get(unionTag)); + return new TranslatorInfo(unionSubInfo.getObjectInspector(), o -> { + int offset = GenericData.get().resolveUnion(schema, o); + return offset == unionTag ? + unionSubInfo.getDeserializer().apply(o) : + null; // When the unionTag and offset don't match, it means this union has a different element in it. + }); + + default: + throw new SerializationError("Attempt to deference '" + avroCol + + "' when containing column is not an array, map, or union"); + } + } else { + // it's a column name + // If this column name is followed by a dereference symbol (. or [) than we need to parse it. Otherwise take + // this column as is and place it in the map, whether it's simple or complex. + String remainder = findRemainder(avroCol); + return (remainder == null) ? + Translators.buildColTranslatorInfo(schema) : + parseAvroColumn(remainder, schema); + } + } + + private TranslatorInfo getSubTranslatorInfo(String remainder, Schema subSchema) throws SerializationError { + // if split col name has only one element (that is, the name wasn't really split), then just return + // a translator info for that field. If it has a subelement, then parse further down. + return remainder == null ? + Translators.buildColTranslatorInfo(subSchema) : + parseAvroColumn(remainder, subSchema); + } + } + + private static class MappingTranslatorInfo { + private final String topLevelColumn; + private final TranslatorInfo tInfo; + + MappingTranslatorInfo(TranslatorInfo tInfo, String topLevelColumn) { + this.topLevelColumn = topLevelColumn; + this.tInfo = tInfo; + } + + String getTopLevelColumn() { + return topLevelColumn; + } + + ObjectInspector getObjectInspector() { + return tInfo.getObjectInspector(); + } + + Function getDeserializer() { + return tInfo.getDeserializer(); + } + } +} diff --git streaming/src/java/org/apache/hive/streaming/avro/StrictAvroWriter.java streaming/src/java/org/apache/hive/streaming/avro/StrictAvroWriter.java new file mode 100644 index 0000000000..7624a1d8e9 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/avro/StrictAvroWriter.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hive.streaming.AbstractRecordWriter; +import org.apache.hive.streaming.SerializationError; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A writer for Avro records that assumes the Avro schema and Hive schema match. Matching + * means that they have the same number of columns and compatible types. For partition columns and + * struct types, the name matching has to be exact. It is assumed that + * every Avro record has the same schema, with the Hive partition columns at the end of the row. + *

The Avro to Hive type compatibility is:

+ * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Avro Hive
array list
boolean boolean
bytes binary
double double
enum string
fixed binary
float float
int int
long bigint
map map
null (see below)
record struct
string string
union union*
+ *

*Notes on null and union:

+ *

Hive does not have a null type or the concept of a nullable type, instead all types are nullable. + * When this writer encounters a union of null and a type, it will "look through" the union to the non-null type + * and declare that to be the type of the column. If it encounters a union or two or more non-nullable types, + * it will map it as a Hive union. So, if it encounters a type that is union(long, null) it will expect to + * find a hive type of bigint for that column. But if it encounters a union(long, double) it will expect to + * find a hive type of union(bigint, double) for that column.

+ *

Null type is not directly supported, that is a null type cannot appear outside a union.

+ */ +public class StrictAvroWriter extends AbstractRecordWriter { + private static final Logger LOG = LoggerFactory.getLogger(StrictAvroWriter.class); + + // I cannot use AvroSerDe because it requires AvroGenericRecordWritable. And I can't use + // AvroGenericRecordWritable because it depends on the schema being encoded with every record, which is + // silly. And in general records we pull off Kafka won't have the schema in every record. + private final AvroWriterAvroSerde serde; + private Object encoded; + private long length; + + private StrictAvroWriter(Builder builder) { + Schema avroSchema = builder.avroSchema == null ? new Schema.Parser().parse(builder.avroSchemaStr) : builder.avroSchema; + serde = new StrictAvroWriterAvroSerDe(avroSchema); + } + + public static class Builder { + private Schema avroSchema; // Avro schema + private String avroSchemaStr; // Avro schema as a string + + /** + * Build with an Avro schema object. Call this or {@link #withSchema(String)} but not both. + * @param schema Avro schema + * @return this pointer + */ + public Builder withSchema(Schema schema) { + this.avroSchema = schema; + return this; + } + + /** + * Build with an Avro schema as a String. Useful if you are reading the value from a property or a file. + * Call this or {@link #withSchema(Schema)} but not both. + * @param schema + * @return + */ + public Builder withSchema(String schema) { + this.avroSchemaStr = schema; + return this; + } + + public StrictAvroWriter build() { + if (avroSchemaStr == null && avroSchema == null) { + throw new IllegalStateException("You must provide an Avro schema, either as a string or an object"); + } + + if (avroSchemaStr != null && avroSchema != null) { + throw new IllegalStateException("You have provided two schemas, provide either an object or a string"); + } + + return new StrictAvroWriter(this); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + @Override + public AbstractSerDe createSerde() throws SerializationError { + return serde; + } + + @Override + protected void encode(GenericRecord record) throws SerializationError { + try { + encoded = serde.deserialize(record); + } catch (ArrayIndexOutOfBoundsException e) { + // This generally means the schema is screwed up + throw new SerializationError("Unable to serialize record, likely due to record schema not matching schema " + + "passed to writer", e); + } + } + + @Override + protected Object getEncoded() { + return encoded; + } + + @Override + protected long getRecordLength() { + return length; + } + + /** + * This rather specialized SerDe builds a decoder to handle every row. It does make the assumption that + * every row passed to it has the same schema. + */ + private class StrictAvroWriterAvroSerDe extends AvroWriterAvroSerde { + + public StrictAvroWriterAvroSerDe(Schema avroSchema) { + super(avroSchema); + // Build the translator, since we have all the info we need up front. + buildTranslator(); + } + + @Override + protected void buildTranslator() { + tInfo = Translators.buildRecordTranslatorInfo(avroSchema); + } + + protected Object deserialize(GenericRecord record) throws SerializationError { + DeserializerOutput output = tInfo.getDeserializer().apply(record); + length = output.getAddedLength(); + return output.getDeserialized(); + } + + } + +} diff --git streaming/src/java/org/apache/hive/streaming/avro/TranslatorInfo.java streaming/src/java/org/apache/hive/streaming/avro/TranslatorInfo.java new file mode 100644 index 0000000000..11b32f7cc1 --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/avro/TranslatorInfo.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming.avro; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +import java.util.function.Function; + +class TranslatorInfo { + private final ObjectInspector oi; + private final Function deserializer; + + TranslatorInfo(ObjectInspector objectInspector, Function deserializer) { + this.oi = objectInspector; + this.deserializer = deserializer; + } + + ObjectInspector getObjectInspector() { + return oi; + } + + Function getDeserializer() { + return deserializer; + } +} diff --git streaming/src/java/org/apache/hive/streaming/avro/Translators.java streaming/src/java/org/apache/hive/streaming/avro/Translators.java new file mode 100644 index 0000000000..3a669e7efc --- /dev/null +++ streaming/src/java/org/apache/hive/streaming/avro/Translators.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +class Translators { + private static final Logger LOG = LoggerFactory.getLogger(Translators.class); + + static TranslatorInfo buildColTranslatorInfo(Schema schema) { + // Look through the nullable type union, as Hive assumes everything is nullable. + + switch (schema.getType()) { + case ARRAY: + return buildListTranslatorInfo(schema); + + case BOOLEAN: + return new TranslatorInfo( + PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.BOOLEAN), + avroVal -> new DeserializerOutput(1, avroVal)); + + case BYTES: + return new TranslatorInfo( + PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.BINARY), + avroVal -> { + ByteBuffer buf = (ByteBuffer)avroVal; + buf.rewind(); + byte[] result = new byte[buf.limit()]; + buf.get(result); + return new DeserializerOutput(result.length, result); + }); + + case DOUBLE: + return new TranslatorInfo( + PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.DOUBLE), + eightByteNumericDeserializer); + + case ENUM: + // Copied this from AvroDeserializer, not 100% sure it works. + return new TranslatorInfo( + PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING), + stringDeserializer); + + case FIXED: + return new TranslatorInfo( + PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.BINARY), + avroVal -> { + GenericData.Fixed fixed = (GenericData.Fixed)avroVal; + return new DeserializerOutput(fixed.bytes().length, fixed.bytes()); + }); + + case FLOAT: + return new TranslatorInfo( + PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.FLOAT), + fourByteNumericDeserializer); + + case INT: + return new TranslatorInfo( + PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.INT), + fourByteNumericDeserializer); + + case LONG: + return new TranslatorInfo( + PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.LONG), + eightByteNumericDeserializer); + + case MAP: + return buildMapTranslatorInfo(schema); + + case NULL: + throw new UnsupportedOperationException("Null type only supported as part of nullable struct"); + + case RECORD: + return buildRecordTranslatorInfo(schema); + + case STRING: + return new TranslatorInfo(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector( + PrimitiveObjectInspector.PrimitiveCategory.STRING), stringDeserializer); + + case UNION: + return buildUnionTranslatorInfo(schema); + + default: throw new RuntimeException("Unknown Avro type: " + schema.getName()); + } + } + + static TranslatorInfo buildListTranslatorInfo(final Schema schema) { + TranslatorInfo elementTranslatorInfo = + buildColTranslatorInfo(schema.getElementType()); + Function listDeserializer = (o) -> { + List avroList = (List)o; + List hiveList = new ArrayList<>(avroList.size()); + long length = 0; + for (Object avroVal : avroList) { + DeserializerOutput output = elementTranslatorInfo.getDeserializer().apply(avroVal); + hiveList.add(output.getDeserialized()); + length += output.getAddedLength(); + } + return new DeserializerOutput(length, hiveList); + }; + return new TranslatorInfo( + ObjectInspectorFactory.getStandardListObjectInspector(elementTranslatorInfo.getObjectInspector()), listDeserializer); + + } + + static TranslatorInfo buildMapTranslatorInfo(final Schema schema) { + final TranslatorInfo valTranslatorInfo = buildColTranslatorInfo(schema.getValueType()); + ObjectInspector oi = ObjectInspectorFactory.getStandardMapObjectInspector( + PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING), + valTranslatorInfo.getObjectInspector()); + Function deserializer = (o) -> { + Map avroMap = (Map)o; + Map hiveMap = new HashMap<>(avroMap.size()); + long length = 0; + for (Map.Entry e : avroMap.entrySet()) { + DeserializerOutput key = stringDeserializer.apply(e.getKey().toString()); + DeserializerOutput value = valTranslatorInfo.getDeserializer().apply(e.getValue()); + hiveMap.put(key.getDeserialized(), value.getDeserialized()); + length += key.getAddedLength() + value.getAddedLength(); + } + return new DeserializerOutput(length, hiveMap); + }; + return new TranslatorInfo(oi, deserializer); + } + + static TranslatorInfo buildRecordTranslatorInfo(final Schema schema) { + final int size = schema.getFields().size(); + List colNames = new ArrayList<>(size); + List ois = new ArrayList<>(size); + final List> deserializers = new ArrayList<>(size); + for (Schema.Field field : schema.getFields()) { + colNames.add(field.name()); + TranslatorInfo tInfo = buildColTranslatorInfo(field.schema()); + ois.add(tInfo.getObjectInspector()); + deserializers.add(tInfo.getDeserializer()); + } + return new TranslatorInfo(ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois), + (o) -> { + GenericData.Record record = (GenericData.Record)o; + assert record != null; + List row = new ArrayList<>(size); + long length = 0; + for (int i = 0; i < size; i++) { + DeserializerOutput output = deserializers.get(i).apply(record.get(i)); + length += output.getAddedLength(); + row.add(output.getDeserialized()); + } + return new DeserializerOutput(length, row); + }); + } + + static TranslatorInfo buildUnionTranslatorInfo(final Schema schema) { + // If this is just a union to make a type nullable, don't model it as a union, model it as the type. + if (AvroSerdeUtils.isNullableType(schema)) { + LOG.debug("Encountered union of just nullable type, treating it as the type"); + return buildColTranslatorInfo(AvroSerdeUtils.getOtherTypeFromNullableType(schema)); + } + List ois = new ArrayList<>(schema.getTypes().size()); + List> deserializers = new ArrayList<>(schema.getTypes().size()); + for (Schema type : schema.getTypes()) { + TranslatorInfo tInfo = buildColTranslatorInfo(type); + ois.add(tInfo.getObjectInspector()); + deserializers.add(tInfo.getDeserializer()); + + } + return new TranslatorInfo(ObjectInspectorFactory.getStandardUnionObjectInspector(ois), (union) -> { + int offset = GenericData.get().resolveUnion(schema, union); + DeserializerOutput output = deserializers.get(offset).apply(union); + return new DeserializerOutput(output.getAddedLength(), + new StandardUnionObjectInspector.StandardUnion((byte)offset, output.getDeserialized())); + }); + } + + private static Function stringDeserializer = avroVal -> { + String s = avroVal.toString(); + return new DeserializerOutput(s.length(), s); + }; + + private static Function fourByteNumericDeserializer = avroVal -> new DeserializerOutput(4, avroVal); + + private static Function eightByteNumericDeserializer = avroVal -> new DeserializerOutput(8, avroVal); +} diff --git streaming/src/test/org/apache/hive/streaming/TestStreaming.java streaming/src/test/org/apache/hive/streaming/TestStreaming.java index dbff263aed..41f4007d43 100644 --- streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -3220,7 +3220,7 @@ public String toString() { * This is test-only wrapper around the real RecordWriter. * It can simulate faults from lower levels to test error handling logic. */ - private static final class FaultyWriter implements RecordWriter { + private static final class FaultyWriter implements RecordWriter { private final RecordWriter delegate; private boolean shouldThrow = false; diff --git streaming/src/test/org/apache/hive/streaming/avro/AvroTestBase.java streaming/src/test/org/apache/hive/streaming/avro/AvroTestBase.java new file mode 100644 index 0000000000..341a17f9df --- /dev/null +++ streaming/src/test/org/apache/hive/streaming/avro/AvroTestBase.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming.avro; + +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hive.streaming.TestStreaming; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +class AvroTestBase { + private static final Logger LOG = LoggerFactory.getLogger(AvroTestBase.class); + protected static final String dbName = "avrostreamingdb"; + + protected static HiveConf conf = null; + protected IDriver driver; + protected final IMetaStoreClient msClient; + + public AvroTestBase() throws Exception { + conf = new HiveConf(this.getClass()); + conf.set("fs.raw.impl", TestStreaming.RawFileSystem.class.getName()); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + TxnDbUtil.setConfValues(conf); + conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); + + + //1) Start from a clean slate (metastore) + TxnDbUtil.cleanDb(conf); + TxnDbUtil.prepDb(conf); + + //2) obtain metastore clients + msClient = new HiveMetaStoreClient(conf); + } + + @Before + public void setup() throws IOException { + SessionState.start(new CliSessionState(conf)); + driver = DriverFactory.newDriver(conf); + driver.setMaxRows(200002);//make sure Driver returns all results + // drop and recreate the necessary databases and tables + execSql("drop database if exists " + dbName + " cascade"); + execSql("create database " + dbName); + } + + @After + public void cleanup() { + msClient.close(); + driver.close(); + } + + protected void dropAndCreateTable(String fullTableName, String schema) throws IOException { + dropAndCreateTable(fullTableName, schema, null); + } + + protected void dropAndCreateTable(String fullTableName, String schema, String partCols) throws IOException { + execSql("drop table if exists " + fullTableName); + StringBuilder buf = new StringBuilder("create table ") + .append(fullTableName) + .append(" (") + .append(schema) + .append(")"); + if (partCols != null) { + buf.append(" partitioned by (") + .append(partCols) + .append(')'); + + } + buf.append(" stored as orc TBLPROPERTIES('transactional'='true')"); + execSql(buf.toString()); + } + + protected void execSql(String sql) throws IOException { + runSql(sql, false); + } + + protected List querySql(String sql) throws IOException { + return runSql(sql, true); + } + + protected List runSql(String sql, boolean expectResults) throws IOException { + LOG.debug("Going to run: " + sql); + CommandProcessorResponse cpr = driver.run(sql); + if (cpr.getResponseCode() != 0) { + throw new RuntimeException("Failed to run statement <" + sql + ">: " + cpr); + } + if (expectResults) { + List results = new ArrayList<>(); + driver.getResults(results); + return results; + } else { + return Collections.emptyList(); + } + } +} diff --git streaming/src/test/org/apache/hive/streaming/avro/TestMappingAvroWriter.java streaming/src/test/org/apache/hive/streaming/avro/TestMappingAvroWriter.java new file mode 100644 index 0000000000..026d77c703 --- /dev/null +++ streaming/src/test/org/apache/hive/streaming/avro/TestMappingAvroWriter.java @@ -0,0 +1,889 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming.avro; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.RecordWriter; +import org.apache.hive.streaming.SerializationError; +import org.apache.hive.streaming.StreamingException; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestMappingAvroWriter extends AvroTestBase { + public TestMappingAvroWriter() throws Exception { + } + + @Test + public void basic() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("basic") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .endRecord(); + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + recordBuilder.set("field1", Integer.toString(i + 10)); + recordBuilder.set("field2", i); + records.add(recordBuilder.build()); + } + + String tableName = "b"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 int, f2 string"); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("f1", "field2") + .addMappingColumn("f2", "field1") + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select f1, f2 from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) Assert.assertEquals(i + "\t" + (i + 10), results.get(i)); + } + + @Test + public void hiveColumnMissing() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("hiveColMissing") + .namespace("org.apache.hive.streaming") + .fields() + .nullableBoolean("nullableBoolean", false) + .requiredBytes("avroBytes") + .endRecord(); + + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + if (i % 2 == 0) recordBuilder.set("nullableBoolean", true); + else if (i % 7 == 0) recordBuilder.set("nullableBoolean", false); + else recordBuilder.set("nullableBoolean", null); + + recordBuilder.set("avroBytes", ByteBuffer.wrap(Integer.toString(i).getBytes())); + + records.add(recordBuilder.build()); + } + String tableName = "hiveColMissing"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "hiveBoolean boolean, hiveMia string, hiveBinary binary"); + + Map schemaMap = new HashMap<>(); + schemaMap.put("hiveBoolean", "nullableBoolean"); + schemaMap.put("hiveBinary", "avroBytes"); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema.toString()) + .withSchemaMapping(schemaMap) + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select hiveBoolean, hiveMia, hiveBinary " + + " from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) { + StringBuilder buf = new StringBuilder(); + + if (i % 2 == 0) buf.append("true"); + else if (i % 7 == 0) buf.append("false"); + else buf.append("NULL"); + + buf.append("\tNULL"); + + buf.append("\t") + .append(i); + + Assert.assertEquals(buf.toString(), results.get(i)); + } + } + + @Test + public void avroColMissing() throws StreamingException, IOException { + Schema fixedSchema = SchemaBuilder.fixed("avroColMissing").size(10); + Schema schema = SchemaBuilder.record("allTypes") + .namespace("org.apache.hive.streaming") + .fields() + .requiredDouble("avroDouble") + .name("avroEnum") + .type().enumeration("enumType").symbols("apple", "orange", "banana").enumDefault("apple") + .requiredInt("avroInt") + .name("avroFixed") + .type(fixedSchema).noDefault() + .endRecord(); + + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + double d = i + (double)i * 0.1; + recordBuilder.set("avroDouble", d); + + if (i % 2 == 0) recordBuilder.set("avroEnum", "apple"); + else if (i % 7 == 0) recordBuilder.set("avroEnum", "orange"); + else recordBuilder.set("avroEnum", "banana"); + + recordBuilder.set("avroInt", i); + + StringBuilder buf = new StringBuilder(); + for (int j = 0; j < 10; j++) buf.append(i); + GenericData.Fixed fixed = new GenericData.Fixed(fixedSchema, buf.toString().getBytes()); + recordBuilder.set("avroFixed", fixed); + + records.add(recordBuilder.build()); + } + String tableName = "avroColMissing"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "hiveDouble double, hiveEnum string, hiveFixed binary " + ); + + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("hiveDouble", "avroDouble") + .addMappingColumn("hiveEnum", "avroEnum") + .addMappingColumn("hiveFixed", "avroFixed") + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select hiveDouble, hiveEnum, hiveFixed from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) { + StringBuilder buf = new StringBuilder(); + double d = i + (double)i * 0.1; + buf.append(d); + + if (i % 2 == 0) buf.append("\tapple"); + else if (i % 7 == 0) buf.append("\torange"); + else buf.append("\tbanana"); + + buf.append('\t'); + for (int j = 0; j < 10; j++) buf.append(i); + + Assert.assertEquals(buf.toString(), results.get(i)); + } + } + + @Test + public void recordDereference() throws StreamingException, IOException { + Schema innerRecordSchema = SchemaBuilder.record("recordType") + .fields() + .requiredString("innerString") + .requiredInt("innerInt") + .endRecord(); + Schema schema = SchemaBuilder.record("recordDeref") + .namespace("org.apache.hive.streaming") + .fields() + .requiredDouble("avroDouble") + .name("recordField") + .type(innerRecordSchema).noDefault() + .endRecord(); + + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + double d = i + (double)i * 0.1; + recordBuilder.set("avroDouble", d); + + GenericRecordBuilder innerBuilder = new GenericRecordBuilder(innerRecordSchema); + innerBuilder.set("innerString", Integer.toString(i*100)); + innerBuilder.set("innerInt", i); + recordBuilder.set("recordField", innerBuilder.build()); + + records.add(recordBuilder.build()); + } + String tableName = "recordderef"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "hiveInt int "); + + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("hiveInt", "recordField.innerInt") + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select hiveInt from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) { + Assert.assertEquals(Integer.toString(i), results.get(i)); + } + } + + @Test + public void arrayDereference() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("arrayderef") + .namespace("org.apache.hive.streaming") + .fields() + .name("avroArray") + .type().array().items().intType().noDefault() + .requiredDouble("avroDouble") + .endRecord(); + + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + List list = new ArrayList<>(); + for (int j = i; j < 10; j++) list.add(j); + recordBuilder.set("avroArray", list); + + double d = i + (double)i * 0.1; + recordBuilder.set("avroDouble", d); + + records.add(recordBuilder.build()); + } + String tableName = "arrayderef"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 int, f2 int "); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("f1", "avroArray[0]") + .addMappingColumn("f2", "avroArray[1]") + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select f1, f2 from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) { + StringBuilder buf = new StringBuilder(); + buf.append(i) + .append('\t'); + if (i < 9) buf.append(i + 1); + else buf.append("NULL"); + + Assert.assertEquals(buf.toString(), results.get(i)); + } + } + + @Test + public void mapDereference() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("mapderef") + .namespace("org.apache.hive.streaming") + .fields() + .requiredLong("avroLong") + .name("avroMap") + .type().map().values().intType().mapDefault(Collections.emptyMap()) + .endRecord(); + + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + recordBuilder.set("avroLong", new Long(i)); + + Map m = new HashMap<>(); + m.put("always", i); + m.put("alsoAlways", i * 10); + if (i % 2 == 0) m.put("sometimes", i * 100); + recordBuilder.set("avroMap", m); + + records.add(recordBuilder.build()); + } + String tableName = "mapderef"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 int, f2 int "); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("f1", "avroMap[always]") + .addMappingColumn("f2", "avroMap[sometimes]") + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select f1, f2 from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) { + StringBuilder buf = new StringBuilder(); + buf.append(i) + .append('\t'); + if (i % 2 == 0) buf.append(i * 100); + else buf.append("NULL"); + + Assert.assertEquals(buf.toString(), results.get(i)); + } + } + + @Test + public void unionDereference() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("unionDeref") + .namespace("org.apache.hive.streaming") + .fields() + .requiredLong("avroLong") + .name("avroUnion") + .type().unionOf().booleanType().and().intType().endUnion().booleanDefault(true) + .endRecord(); + + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + recordBuilder.set("avroLong", new Long(i)); + + if (i % 2 == 0) recordBuilder.set("avroUnion", i); + else recordBuilder.set("avroUnion", true); + + records.add(recordBuilder.build()); + } + String tableName = "unionderef"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 int, f2 boolean "); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("f1", "avroUnion[1]") + .addMappingColumn("f2", "avroUnion[0]") + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select f1, f2 from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) { + StringBuilder buf = new StringBuilder(); + + if (i % 2 == 0) buf.append(i).append("\tNULL"); + else buf.append("NULL\t").append(true); + + Assert.assertEquals(buf.toString(), results.get(i)); + } + } + + @Test + public void multiLevelDereference() throws StreamingException, IOException { + Schema innerRecordSchema = SchemaBuilder.record("recordType") + .fields() + .name("avroUnion") + .type().unionOf().array().items().intType().and().intType().endUnion().noDefault() + .requiredString("innerString") + .requiredInt("innerInt") + .endRecord(); + Schema schema = SchemaBuilder.record("multilevel") + .namespace("org.apache.hive.streaming") + .fields() + .name("avroArray").type().array().items() + .map().values().type(innerRecordSchema).noDefault() + .requiredInt("avroInt") + .endRecord(); + + String[] mapKeys = new String[] {"abel", "boaz", "caleb", "david", "eli", "philip", "gad", "hosea", + "ichabod", "jeremiah" }; + + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + List> list = new ArrayList<>(); + for (int j = i; j < 10; j++) { + Map m = new HashMap<>(); + for (int k = 0; k < i; k++) { + GenericRecordBuilder innerBuilder = new GenericRecordBuilder(innerRecordSchema); + if (i % 2 == 0) { + innerBuilder.set("avroUnion", k); + } else { + List innerList = new ArrayList<>(); + for (int n = 10; n < 20; n++) innerList.add(n); + innerBuilder.set("avroUnion", innerList); + } + innerBuilder.set("innerString", Integer.toString(k * 100)); + innerBuilder.set("innerInt", k); + m.put(mapKeys[k], innerBuilder.build()); + } + list.add(m); + } + recordBuilder.set("avroArray", list); + recordBuilder.set("avroInt", i); + records.add(recordBuilder.build()); + } + String tableName = "multilevelderef"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "toplevelint int, burieddeep int "); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("burieddeep", "avroArray[2][boaz].avroUnion[0][1]") + //.addMappingColumn("burieddeep", "avroArray[2][boaz].avroUnion[0][1]") + .addMappingColumn("toplevelint", "avroInt") + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select toplevelint, burieddeep from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) { + StringBuilder buf = new StringBuilder(); + buf.append(i) + .append('\t'); + if (i < 8 && i >= 2 && i % 2 != 0) { + buf.append(11); + } else { + buf.append("NULL"); + } + Assert.assertEquals(buf.toString(), results.get(i)); + } + } + + @Test + public void nonExistentHiveCol() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("noHiveCol") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .endRecord(); + + String tableName = "nohivecol"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 int, f2 string"); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("f1", "field2") + .addMappingColumn("nosuch", "field1") + .build(); + + try { + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + conn.beginTransaction(); + Assert.fail(); + } catch (SerializationError e) { + Assert.assertEquals("Unknown Hive columns nosuch referenced in schema mapping", e.getMessage()); + } + } + + @Test + public void nonExistentAvroCol() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("noAvroCol") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .endRecord(); + + String tableName = "noavrocol"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 int, f2 string"); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("f1", "field2") + .addMappingColumn("f2", "nosuch") + .build(); + + try { + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + conn.beginTransaction(); + Assert.fail(); + } catch (SerializationError e) { + Assert.assertEquals("Mapping to non-existent avro column nosuch", e.getMessage()); + } + + } + + @Test + public void partitioned() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("partitioned") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .requiredString("field3") + .endRecord(); + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + recordBuilder.set("field1", Integer.toString(i)); + recordBuilder.set("field2", i); + recordBuilder.set("field3", "a"); + records.add(recordBuilder.build()); + } + for (int i = 0; i < 10; i++) { + recordBuilder.set("field1", Integer.toString(i)); + recordBuilder.set("field2", i); + recordBuilder.set("field3", "b"); + records.add(recordBuilder.build()); + } + + String tableName = "ptable"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 string, f2 int", "f3 string"); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("f1", "field1") + .addMappingColumn("f2", "field2") + .addMappingColumn("f3", "field3") + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select f1, f2 from " + fullTableName + " where f3 = 'a'"); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) Assert.assertEquals(i + "\t" + i, results.get(i)); + results = querySql("select f1, f2 from " + fullTableName + " where f3 = 'b'"); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) Assert.assertEquals(i + "\t" + i, results.get(i)); + } + + @Test(expected = IllegalStateException.class) + public void missingMapping() throws StreamingException, IOException { + MappingAvroWriter.newBuilder() + .withAvroSchema("abcdef") + .build(); + } + + @Test + public void unionDerefOutOfRange() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("unionDerefOOR") + .namespace("org.apache.hive.streaming") + .fields() + .requiredLong("avroLong") + .name("avroUnion") + .type().unionOf().booleanType().and().intType().endUnion().booleanDefault(true) + .endRecord(); + + String tableName = "unionderefoor"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 int, f2 string"); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("f1", "avroUnion[1]") + .addMappingColumn("f2", "avroUnion[3]") + .build(); + + try { + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + conn.beginTransaction(); + Assert.fail(); + } catch (SerializationError e) { + Assert.assertEquals("Attempt to read union element 3 in union with only 2 elements", e.getMessage()); + } + } + + @Test + public void unionDerefNonNumber() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("unionDerefNan") + .namespace("org.apache.hive.streaming") + .fields() + .requiredLong("avroLong") + .name("avroUnion") + .type().unionOf().booleanType().and().intType().endUnion().booleanDefault(true) + .endRecord(); + + String tableName = "unionderefnan"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 int, f2 string"); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("f1", "avroUnion[1]") + .addMappingColumn("f2", "avroUnion[fred]") + .build(); + + try { + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + conn.beginTransaction(); + Assert.fail(); + } catch (SerializationError e) { + Assert.assertEquals("Attempt to dereference union with non-number 'fred'", e.getMessage()); + } + } + + @Test + public void arrayDerefNonNumber() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("arrayderefNan") + .namespace("org.apache.hive.streaming") + .fields() + .name("avroArray") + .type().array().items().intType().noDefault() + .requiredDouble("avroDouble") + .endRecord(); + + String tableName = "arrayderefnan"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 int, f2 string"); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("f1", "avroArray[0]") + .addMappingColumn("f2", "avroArray[fred]") + .build(); + + try { + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + conn.beginTransaction(); + Assert.fail(); + } catch (SerializationError e) { + Assert.assertEquals("Attempt to dereference array with non-number 'fred'", e.getMessage()); + } + } + + @Test + public void recordDerefNoSuchField() throws StreamingException, IOException { + Schema innerRecordSchema = SchemaBuilder.record("recordType") + .fields() + .requiredString("innerString") + .requiredInt("innerInt") + .endRecord(); + Schema schema = SchemaBuilder.record("recordDerefNoSuchField") + .namespace("org.apache.hive.streaming") + .fields() + .requiredDouble("avroDouble") + .name("recordField") + .type(innerRecordSchema).noDefault() + .endRecord(); + + String tableName = "recordderefnosuchfield"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 int, f2 string"); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("f1", "recordField.innerInt") + .addMappingColumn("f2", "recordField.nosuch") + .build(); + + try { + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + conn.beginTransaction(); + Assert.fail(); + } catch (SerializationError e) { + Assert.assertEquals("Attempt to reference non-existent record field 'nosuch'", e.getMessage()); + } + } + + @Test + public void derefNotARecord() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("derefNotARecord") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .endRecord(); + + String tableName = "derefnotarecord"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 int, f2 string"); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("f1", "field2.oops") + .addMappingColumn("f2", "field1") + .build(); + + try { + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + conn.beginTransaction(); + Assert.fail(); + } catch (SerializationError e) { + Assert.assertEquals("Attempt to dereference '.oops' when containing column is not a record", e.getMessage()); + } + } + + @Test + public void derefNotIndexable() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("derefNotIndexable") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .endRecord(); + + String tableName = "derefnotindexable"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 int, f2 string"); + + RecordWriter writer = MappingAvroWriter.newBuilder() + .withAvroSchema(schema) + .addMappingColumn("f1", "field2[1]") + .addMappingColumn("f2", "field1") + .build(); + + try { + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + conn.beginTransaction(); + Assert.fail(); + } catch (SerializationError e) { + Assert.assertEquals("Attempt to deference '[1]' when containing column is not an array, map, or union", e.getMessage()); + } + } + // TODO test multi-level dereference +} diff --git streaming/src/test/org/apache/hive/streaming/avro/TestStrictAvroWriter.java streaming/src/test/org/apache/hive/streaming/avro/TestStrictAvroWriter.java new file mode 100644 index 0000000000..18837e0111 --- /dev/null +++ streaming/src/test/org/apache/hive/streaming/avro/TestStrictAvroWriter.java @@ -0,0 +1,604 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.streaming.avro; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.hive.streaming.HiveStreamingConnection; +import org.apache.hive.streaming.RecordWriter; +import org.apache.hive.streaming.SerializationError; +import org.apache.hive.streaming.StreamingException; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestStrictAvroWriter extends AvroTestBase { + private static final Logger LOG = LoggerFactory.getLogger(TestStrictAvroWriter.class); + + public TestStrictAvroWriter() throws Exception { + } + + @Test + public void withSchemaObject() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("wsoasw") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .endRecord(); + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + recordBuilder.set("field1", Integer.toString(i)); + recordBuilder.set("field2", i); + records.add(recordBuilder.build()); + } + + String tableName = "wso"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 string, f2 int"); + + RecordWriter writer = StrictAvroWriter.newBuilder() + .withSchema(schema) + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select f1, f2 from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) Assert.assertEquals(i + "\t" + i, results.get(i)); + } + + @Test + public void allAvroTypes() throws StreamingException, IOException { + Schema fixedSchema = SchemaBuilder.fixed("fixedType").size(10); + Schema innerRecordSchema = SchemaBuilder.record("recordType") + .fields() + .requiredString("innerString") + .requiredInt("innerInt") + .endRecord(); + Schema schema = SchemaBuilder.record("allTypes") + .namespace("org.apache.hive.streaming") + .fields() + .name("avroArray") + .type().array().items().intType().noDefault() + .nullableBoolean("nullableBoolean", false) + .requiredBytes("avroBytes") + .requiredDouble("avroDouble") + .name("avroEnum") + .type().enumeration("enumType").symbols("apple", "orange", "banana").enumDefault("apple") + .name("avroFixed") + .type(fixedSchema).noDefault() + .requiredFloat("avroFloat") + .requiredLong("avroLong") + .name("avroMap") + .type().map().values().intType().mapDefault(Collections.emptyMap()) + .name("recordField") + .type(innerRecordSchema).noDefault() + .name("avroUnion") + .type().unionOf().booleanType().and().intType().endUnion().booleanDefault(true) + .endRecord(); + + LOG.debug("Avro schema is " + schema.toString(true)); + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + List list = new ArrayList<>(); + for (int j = i; j < 10; j++) list.add(j); + recordBuilder.set("avroArray", list); + + if (i % 2 == 0) recordBuilder.set("nullableBoolean", true); + else if (i % 7 == 0) recordBuilder.set("nullableBoolean", false); + else recordBuilder.set("nullableBoolean", null); + + recordBuilder.set("avroBytes", ByteBuffer.wrap(Integer.toString(i).getBytes())); + + double d = i + (double)i * 0.1; + recordBuilder.set("avroDouble", d); + + if (i % 2 == 0) recordBuilder.set("avroEnum", "apple"); + else if (i % 7 == 0) recordBuilder.set("avroEnum", "orange"); + else recordBuilder.set("avroEnum", "banana"); + + StringBuilder buf = new StringBuilder(); + for (int j = 0; j < 10; j++) buf.append(i); + GenericData.Fixed fixed = new GenericData.Fixed(fixedSchema, buf.toString().getBytes()); + recordBuilder.set("avroFixed", fixed); + + float f = i + (float)i * 0.1f; + recordBuilder.set("avroFloat", f); + + recordBuilder.set("avroLong", new Long(i)); + + // More than one element in the map causes ordering issues when we + // compare the results. + Map m = new HashMap<>(); + m.put(Integer.toString(i), i); + recordBuilder.set("avroMap", m); + + GenericRecordBuilder innerBuilder = new GenericRecordBuilder(innerRecordSchema); + innerBuilder.set("innerString", Integer.toString(i*100)); + innerBuilder.set("innerInt", i); + recordBuilder.set("recordField", innerBuilder.build()); + + if (i % 2 == 0) recordBuilder.set("avroUnion", i); + else recordBuilder.set("avroUnion", true); + + records.add(recordBuilder.build()); + } + String tableName = "alltypes"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "hiveArray array, hiveBoolean boolean, hiveBinary binary, " + + "hiveDouble double, hiveEnum string, hiveFixed binary, hiveFloat float, " + + "hiveLong bigint, hiveMap map, " + + "hiveRecord struct, " + + "hiveUnion uniontype " + ); + + + RecordWriter writer = StrictAvroWriter.newBuilder() + .withSchema(schema) + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select hiveArray, hiveBoolean, hiveBinary, hiveDouble, " + + "hiveEnum, hiveFixed, hiveFloat, hiveLong, hiveMap, hiveRecord, hiveUnion " + + " from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) { + StringBuilder buf = new StringBuilder(); + buf.append('['); + boolean first = true; + for (int j = i; j < 10; j++) { + if (first) first = false; + else buf.append(','); + buf.append(j); + } + buf.append("]\t"); + + if (i % 2 == 0) buf.append("true"); + else if (i % 7 == 0) buf.append("false"); + else buf.append("NULL"); + + buf.append("\t") + .append(i); + + double d = i + (double)i * 0.1; + buf.append("\t") + .append(d); + + if (i % 2 == 0) buf.append("\tapple"); + else if (i % 7 == 0) buf.append("\torange"); + else buf.append("\tbanana"); + + buf.append('\t'); + for (int j = 0; j < 10; j++) buf.append(i); + + float f = i + (float)i * 0.1f; + buf.append("\t") + .append(f); + + buf.append("\t") + .append(i); + + buf.append("\t{\"") + .append(i) + .append("\":") + .append(i); + buf.append('}'); + + buf.append("\t{\"innerstring\":\"") + .append(i*100) + .append("\",\"innerint\":") + .append(i) + .append('}'); + + buf.append("\t{"); + if (i % 2 == 0) buf.append(1).append(':').append(i); + else buf.append(0).append(':').append(true); + buf.append('}'); + + Assert.assertEquals(buf.toString(), results.get(i)); + } + } + + @Test + public void withSchemaString() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("wssasw") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .endRecord(); + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + recordBuilder.set("field1", Integer.toString(i)); + recordBuilder.set("field2", i); + records.add(recordBuilder.build()); + } + + String tableName = "wss"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 string, f2 int"); + + RecordWriter writer = StrictAvroWriter.newBuilder() + .withSchema(schema.toString()) + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select f1, f2 from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) Assert.assertEquals(i + "\t" + i, results.get(i)); + } + + @Test(expected = IllegalStateException.class) + public void noSchema() { + RecordWriter writer = StrictAvroWriter.newBuilder() + .build(); + } + + @Test(expected = IllegalStateException.class) + public void twoSchemas() { + Schema schema = SchemaBuilder.record("twoschemas") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .endRecord(); + RecordWriter writer = StrictAvroWriter.newBuilder() + .withSchema(schema) + .withSchema(schema.toString()) + .build(); + } + + // Test where the record has columns that the schema handed to the streaming writer does not + @Test + public void fieldsInStreamingSchemaNotInRecordSchema() throws StreamingException, IOException { + Schema recordSchema = SchemaBuilder.record("rswrecord") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .endRecord(); + + Schema streamingSchema = SchemaBuilder.record("rswstreaming") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .endRecord(); + + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(recordSchema); + for (int i = 0; i < 10; i++) { + recordBuilder.set("field1", Integer.toString(i)); + records.add(recordBuilder.build()); + } + + String tableName = "rsw"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 string, f2 int"); + + RecordWriter writer = StrictAvroWriter.newBuilder() + .withSchema(streamingSchema) + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + try { + conn.write(records.get(0)); + Assert.fail(); + } catch (SerializationError e) { + Assert.assertTrue(e.getMessage().startsWith("Unable to serialize record, likely due to record schema not" + + " matching schema passed to writer")); + Assert.assertEquals(ArrayIndexOutOfBoundsException.class, e.getCause().getClass()); + } + } + + @Test + public void fieldsInRecordSchemaNotInStreamingSchema() throws StreamingException, IOException { + Schema recordSchema = SchemaBuilder.record("firsnissr") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .endRecord(); + + Schema streamingSchema = SchemaBuilder.record("firsnisss") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .endRecord(); + + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(recordSchema); + for (int i = 0; i < 10; i++) { + recordBuilder.set("field1", Integer.toString(i)); + recordBuilder.set("field2", i); + records.add(recordBuilder.build()); + } + + String tableName = "firsniss"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 string"); + + RecordWriter writer = StrictAvroWriter.newBuilder() + .withSchema(streamingSchema) + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select f1 from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) Assert.assertEquals(Integer.toString(i), results.get(i)); + } + + @Test + public void withDifferentTypesInRecordAndStreamingSchemas() throws StreamingException, IOException { + Schema recordSchema = SchemaBuilder.record("wdtirssr") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredString("field2") + .endRecord(); + + Schema streamingSchema = SchemaBuilder.record("wdtirsss") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .endRecord(); + + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(recordSchema); + for (int i = 0; i < 10; i++) { + recordBuilder.set("field1", Integer.toString(i)); + recordBuilder.set("field2", Integer.toString(i*10)); + records.add(recordBuilder.build()); + } + + String tableName = "wdtirss"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 string, f2 int"); + + RecordWriter writer = StrictAvroWriter.newBuilder() + .withSchema(streamingSchema) + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + try { + conn.write(records.get(0)); + Assert.fail(); + } catch (SerializationError e) { + Assert.assertTrue(e.getMessage().startsWith("Column type mismatch when serializing record")); + Assert.assertEquals(ClassCastException.class, e.getCause().getClass()); + } + } + + + @Test + public void recordHasColumnsTableDoesnt() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("tdtr") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .endRecord(); + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + recordBuilder.set("field1", Integer.toString(i)); + recordBuilder.set("field2", i); + records.add(recordBuilder.build()); + } + + String tableName = "tdtr"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 string"); + + RecordWriter writer = StrictAvroWriter.newBuilder() + .withSchema(schema) + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select f1 from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) Assert.assertEquals(Integer.toString(i), results.get(i)); + } + + @Test + public void partitioned() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("p") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .requiredString("f3") + .endRecord(); + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + recordBuilder.set("field1", Integer.toString(i)); + recordBuilder.set("field2", i); + recordBuilder.set("f3", "a"); + records.add(recordBuilder.build()); + } + for (int i = 0; i < 10; i++) { + recordBuilder.set("field1", Integer.toString(i)); + recordBuilder.set("field2", i); + recordBuilder.set("f3", "b"); + records.add(recordBuilder.build()); + } + + String tableName = "p"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 string, f2 int", "f3 string"); + + RecordWriter writer = StrictAvroWriter.newBuilder() + .withSchema(schema) + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select f1, f2 from " + fullTableName + " where f3 = 'a'"); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) Assert.assertEquals(i + "\t" + i, results.get(i)); + results = querySql("select f1, f2 from " + fullTableName + " where f3 = 'b'"); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) Assert.assertEquals(i + "\t" + i, results.get(i)); + } + + @Test + public void withTypeDescrepncy() throws StreamingException, IOException { + Schema schema = SchemaBuilder.record("inttolong") + .namespace("org.apache.hive.streaming") + .fields() + .requiredString("field1") + .requiredInt("field2") + .endRecord(); + List records = new ArrayList<>(); + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema); + for (int i = 0; i < 10; i++) { + recordBuilder.set("field1", Integer.toString(i)); + recordBuilder.set("field2", i); + records.add(recordBuilder.build()); + } + + String tableName = "inttolong"; + String fullTableName = dbName + "." + tableName; + dropAndCreateTable(fullTableName, "f1 string, f2 string"); + + RecordWriter writer = StrictAvroWriter.newBuilder() + .withSchema(schema) + .build(); + + HiveStreamingConnection conn = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tableName) + .withTransactionBatchSize(5) + .withRecordWriter(writer) + .withHiveConf(conf) + .connect(); + + conn.beginTransaction(); + for (GenericRecord r : records) conn.write(r); + conn.commitTransaction(); + conn.close(); + + List results = querySql("select f1, f2 from " + fullTableName); + Assert.assertEquals(10, results.size()); + for (int i = 0; i < 10; i++) Assert.assertEquals(i + "\t" + i, results.get(i)); + } + +}