diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java index d3cd45150b..ffe77886f8 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaSerDe.java @@ -156,8 +156,7 @@ int firstMetadataColumnIndex = data.size() - MetadataColumn.values().length; if (delegateSerializerOI == null) { //@TODO check if i can cache this if it is the same. - delegateSerializerOI = - new SubStructObjectInspector(structObjectInspector, firstMetadataColumnIndex); + delegateSerializerOI = new SubStructObjectInspector(structObjectInspector, firstMetadataColumnIndex); } // We always append the metadata columns to the end of the row. final List row = data.subList(0, firstMetadataColumnIndex); @@ -198,24 +197,30 @@ } @Override public Object deserialize(Writable blob) throws SerDeException { - return deserializeKWritable((KafkaWritable) blob); + Object[] rowBoat = new Object[columnNames.size()]; + deserializeKWritable((KafkaWritable) blob, rowBoat); + return rowBoat; } - ArrayList deserializeKWritable(KafkaWritable kafkaWritable) throws SerDeException { - ArrayList resultRow = new ArrayList<>(columnNames.size()); + /** + * @param kafkaWritable Kafka writable object containing the row plus kafka metadata + * @param rowBoat Boat sized to width of the kafka row plus metadata to carry the row to operator upstream. + * + * @throws SerDeException in case of any serde issue. + */ + void deserializeKWritable(KafkaWritable kafkaWritable, Object[] rowBoat) throws SerDeException { + final Object row = delegateSerDe.deserialize(bytesConverter.getWritable(kafkaWritable.getValue())); //first add the value payload elements - for (int i = 0; i < metadataStartIndex; i++) { - resultRow.add(delegateDeserializerOI.getStructFieldData(row, - delegateDeserializerOI.getStructFieldRef(columnNames.get(i)))); + rowBoat[i] = + delegateDeserializerOI.getStructFieldData(row, delegateDeserializerOI.getStructFieldRef(columnNames.get(i))); } //add the metadata columns for (int i = metadataStartIndex; i < columnNames.size(); i++) { final MetadataColumn metadataColumn = MetadataColumn.forName(columnNames.get(i)); - resultRow.add(kafkaWritable.getHiveWritable(metadataColumn)); + rowBoat[i] = (kafkaWritable.getHiveWritable(metadataColumn)); } - return resultRow; } @Override public ObjectInspector getObjectInspector() { @@ -233,7 +238,8 @@ /** * Returns a live view of the base Object inspector starting form 0 to {@code toIndex} exclusive. - * @param baseOI base Object Inspector. + * + * @param baseOI base Object Inspector. * @param toIndex toIndex. */ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { @@ -250,6 +256,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { /** * Look up a field. + * * @param fieldName fieldName to be looked up. */ @SuppressWarnings("OptionalGetWithoutIsPresent") @Override public StructField getStructFieldRef(String fieldName) { @@ -262,7 +269,8 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { /** * returns null for data = null. - * @param data input. + * + * @param data input. * @param fieldRef field to extract. */ @Override public Object getStructFieldData(Object data, StructField fieldRef) { @@ -271,6 +279,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { /** * returns null for data = null. + * * @param data input data. */ @Override public List getStructFieldsDataAsList(Object data) { @@ -288,7 +297,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { /** * Returns the name of the data type that is inspected by this * ObjectInspector. This is used to display the type information to the user. - * + *

* For primitive types, the type name is standardized. For other types, the * type name can be something like "list<int>", "map<int,string>", java class * names, or user-defined type names similar to typedef. @@ -309,6 +318,7 @@ private SubStructObjectInspector(StructObjectInspector baseOI, int toIndex) { /** * Class that encapsulate the logic of serialize and deserialize bytes array to/from the delegate writable format. + * * @param delegate writable class. */ private interface BytesConverter { diff --git kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java index 294085610e..5f55bbce20 100644 --- kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java +++ kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java @@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; import java.util.Properties; @@ -66,6 +65,7 @@ private final VectorAssignRow vectorAssignRow = new VectorAssignRow(); private final KafkaWritable kafkaWritable = new KafkaWritable(); + final Object[] row; VectorizedKafkaRecordReader(KafkaInputSplit inputSplit, Configuration jobConf) { // VectorBatch Context initializing @@ -105,6 +105,9 @@ startOffset == endOffset ? new KafkaRecordReader.EmptyIterator() : new KafkaRecordIterator(consumer, topicPartition, startOffset, endOffset, pollTimeout); + + // row has to be as wide as the entire kafka row plus metadata + row = new Object[((StructObjectInspector) serDe.getObjectInspector()).getAllStructFieldRefs().size()]; } @Override public boolean next(NullWritable nullWritable, VectorizedRowBatch vectorizedRowBatch) throws IOException { @@ -116,6 +119,12 @@ } } + private void cleanRowBoat() { + for (int i = 0; i < row.length; i++) { + row[i] = null; + } + } + @Override public NullWritable createKey() { return NullWritable.get(); } @@ -129,11 +138,10 @@ } @Override public float getProgress() { - if (consumedRecords == 0) { - return 0f; - } if (consumedRecords >= totalNumberRecords) { return 1f; + } else if (consumedRecords == 0) { + return 0f; } return consumedRecords * 1.0f / totalNumberRecords; } @@ -154,19 +162,20 @@ private int readNextBatch(VectorizedRowBatch vectorizedRowBatch, kafkaWritable.set(kRecord); readBytes += kRecord.serializedKeySize() + kRecord.serializedValueSize(); if (projectedColumns.length > 0) { - ArrayList row = serDe.deserializeKWritable(kafkaWritable); + serDe.deserializeKWritable(kafkaWritable, row); for (int i : projectedColumns) { - vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, row.get(i)); + vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, row[i]); } } rowsCount++; } vectorizedRowBatch.size = rowsCount; consumedRecords += rowsCount; + cleanRowBoat(); return rowsCount; } - private static KafkaSerDe createAndInitializeSerde(Configuration jobConf) { + @SuppressWarnings("Duplicates") private static KafkaSerDe createAndInitializeSerde(Configuration jobConf) { KafkaSerDe serDe = new KafkaSerDe(); MapWork mapWork = Preconditions.checkNotNull(Utilities.getMapWork(jobConf), "Map work is null"); Properties