diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java index e3c0955..c50d226 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapRowRecordReader.java @@ -29,6 +29,7 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -48,6 +49,7 @@ import org.apache.hadoop.hive.serde2.io.HiveCharWritable; import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -68,15 +70,17 @@ private static final Logger LOG = LoggerFactory.getLogger(LlapRowRecordReader.class); protected final Configuration conf; - protected final RecordReader reader; + protected final RecordReader reader; protected final Schema schema; protected final AbstractSerDe serde; - protected final Text textData = new Text(); + protected final BytesWritable data; - public LlapRowRecordReader(Configuration conf, Schema schema, RecordReader reader) throws IOException { + public LlapRowRecordReader(Configuration conf, Schema schema, + RecordReader reader) throws IOException { this.conf = conf; this.schema = schema; this.reader = reader; + this.data = new BytesWritable(); try { serde = initSerDe(conf); @@ -114,17 +118,17 @@ public float getProgress() throws IOException { public boolean next(NullWritable key, Row value) throws IOException { Preconditions.checkArgument(value != null); - boolean hasNext = reader.next(key, textData); + boolean hasNext = reader.next(key, data); if (hasNext) { - // Deserialize Text to column values, and populate the row record + // Deserialize data to column values, and populate the row record Object rowObj; try { StructObjectInspector rowOI = (StructObjectInspector) serde.getObjectInspector(); - rowObj = serde.deserialize(textData); + rowObj = serde.deserialize(data); setRowFromStruct(value, rowObj, rowOI); } catch (SerDeException err) { if (LOG.isDebugEnabled()) { - LOG.debug("Error deserializing row from text: " + textData); + LOG.debug("Error deserializing row from data: " + data); } throw new IOException("Error deserializing row data", err); } @@ -246,7 +250,7 @@ protected AbstractSerDe initSerDe(Configuration conf) throws SerDeException { props.put(serdeConstants.LIST_COLUMNS, columns); props.put(serdeConstants.LIST_COLUMN_TYPES, types); props.put(serdeConstants.ESCAPE_CHAR, "\\"); - AbstractSerDe serde = new LazySimpleSerDe(); + AbstractSerDe serde = new LazyBinarySerDe(); serde.initialize(conf, props); return serde; diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java index c3001e9..4a6e9b1 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.llap.Row; import org.apache.hadoop.hive.llap.Schema; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -37,7 +38,7 @@ public class LlapRowInputFormat implements InputFormat { - private LlapBaseInputFormat baseInputFormat = new LlapBaseInputFormat(); + private LlapBaseInputFormat baseInputFormat = new LlapBaseInputFormat(); @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { @@ -48,7 +49,8 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { LlapInputSplit llapSplit = (LlapInputSplit) split; - LlapBaseRecordReader reader = (LlapBaseRecordReader) baseInputFormat.getRecordReader(llapSplit, job, reporter); + LlapBaseRecordReader reader = + (LlapBaseRecordReader) baseInputFormat.getRecordReader(llapSplit, job, reporter); return new LlapRowRecordReader(job, reader.getSchema(), reader); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index eb7ef00..35fc68a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -216,6 +216,7 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; @@ -7138,9 +7139,13 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName()); } else { fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); + Class serdeClass = LazySimpleSerDe.class; + if (fileFormat.equals(PlanUtils.LLAP_OUTPUT_FORMAT_KEY)) { + serdeClass = LazyBinarySerDe.class; + } table_desc = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, - LazySimpleSerDe.class); + serdeClass); } } else { table_desc = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes);