diff --git data/files/multilined1.txt data/files/multilined1.txt new file mode 100644 index 0000000..7285290 --- /dev/null +++ data/files/multilined1.txt @@ -0,0 +1,26 @@ +Apache Hive (TM) @VERSION@ +Hive defines a simple SQL-like query language, called QL, that enables +users familiar with SQL to query the data. +At the same time, this +language also allows programmers who are familiar with the MapReduce +framework to be able to plug in their custom mappers and reducers to +perform more sophisticated analysis that may not be supported by the +built-in capabilities of the language. +QL can also be extended with +custom scalar functions (UDF's), aggregations (UDAF's), and table +functions (UDTF's). + +Please note that Hadoop is a batch processing system and Hadoop jobs +tend to have high latency and incur substantial overheads in job +submission and scheduling. +Consequently the average latency for Hive +queries is generally very high (minutes) even when data sets involved +are very small (say a few hundred megabytes). +As a result it cannot be +compared with systems such as Oracle where analyses are conducted on a +significantly smaller amount of data but the analyses proceed much +more iteratively with the response times between iterations being less +than a few minutes. +Hive aims to provide acceptable (but not optimal) +latency for interactive data browsing, queries over small data sets or +test queries. diff --git data/files/multilined2.txt data/files/multilined2.txt new file mode 100644 index 0000000..396a37a --- /dev/null +++ data/files/multilined2.txt @@ -0,0 +1,26 @@ +1Apache Hive (TM) @VERSION@10 +2Hive defines a simple SQL-like query language, called QL, that enables +users familiar with SQL to query the data.20 +3At the same time, this +language also allows programmers who are familiar with the MapReduce +framework to be able to plug in their custom mappers and reducers to +perform more sophisticated analysis that may not be supported by the +built-in capabilities of the language.30 +4QL can also be extended with +custom scalar functions (UDF's), aggregations (UDAF's), and table +functions (UDTF's).40 +550 +6Please note that Hadoop is a batch processing system and Hadoop jobs +tend to have high latency and incur substantial overheads in job +submission and scheduling.60 +7Consequently the average latency for Hive +queries is generally very high (minutes) even when data sets involved +are very small (say a few hundred megabytes).70 +8As a result it cannot be +compared with systems such as Oracle where analyses are conducted on a +significantly smaller amount of data but the analyses proceed much +more iteratively with the response times between iterations being less +than a few minutes.80 +9Hive aims to provide acceptable (but not optimal) +latency for interactive data browsing, queries over small data sets or +test queries.90 diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index f19a5c0..663c9f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.MultiLinedDeserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.DelegatedObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; @@ -92,6 +93,8 @@ public class FetchOperator implements Serializable { private transient Writable value; private transient Writable[] vcValues; private transient Deserializer serde; + private transient MultiLinedDeserializer mserde; + private transient Iterator iterPath; private transient Iterator iterPartDesc; private transient Path currPath; @@ -379,6 +382,9 @@ public class FetchOperator implements Serializable { splitNum = 0; serde = tmp.getDeserializerClass().newInstance(); serde.initialize(job, tmp.getProperties()); + if (serde instanceof MultiLinedDeserializer) { + mserde = (MultiLinedDeserializer) serde; + } if (LOG.isDebugEnabled()) { LOG.debug("Creating fetchTask with deserializer typeinfo: " @@ -495,6 +501,9 @@ public class FetchOperator implements Serializable { throw new IOException(e); } } + if (mserde != null && !mserde.isCompleted(value)) { + continue; + } if (hasVC) { vcValues = MapOperator.populateVirtualColumnValues(context, vcCols, vcValues, serde); row[isPartitioned ? 2 : 1] = vcValues; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index b6dbf3d..b04e80a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.MultiLinedDeserializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -73,6 +74,7 @@ public class MapOperator extends Operator implements Serializable, C private final transient LongWritable deserialize_error_count = new LongWritable(); private transient Deserializer deserializer; + private transient MultiLinedDeserializer mdeserializer; private transient Object[] rowWithPart; private transient Writable[] vcValues; @@ -316,6 +318,10 @@ public class MapOperator extends Operator implements Serializable, C return; } + if (deserializer instanceof MultiLinedDeserializer) { + mdeserializer = (MultiLinedDeserializer) deserializer; + } + listInputPaths.add(inp); if (op instanceof TableScanOperator) { @@ -512,6 +518,10 @@ public class MapOperator extends Operator implements Serializable, C Object row = null; try { + if (mdeserializer != null && !mdeserializer.isCompleted(value)) { + return; + } + if (this.hasVC) { this.rowWithPartAndVC[0] = deserializer.deserialize(value); int vcPos = isPartitioned ? 2 : 1; diff --git ql/src/test/queries/clientpositive/multilined_column.q ql/src/test/queries/clientpositive/multilined_column.q new file mode 100644 index 0000000..7f1291c --- /dev/null +++ ql/src/test/queries/clientpositive/multilined_column.q @@ -0,0 +1,21 @@ +CREATE TABLE multilined1(text STRING, dummy STRING) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +WITH SERDEPROPERTIES ( + 'serialization.multilined.row.index'='0' +); + +load data local inpath '../data/files/multilined1.txt' overwrite into table multilined1; + +select * from multilined1; + +CREATE TABLE multilined2(linenum int, text STRING, length int) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +WITH SERDEPROPERTIES ( + 'serialization.multilined.row.index'='1' +); + +load data local inpath '../data/files/multilined2.txt' overwrite into table multilined2; + +select * from multilined2; diff --git ql/src/test/results/clientpositive/multilined_column.q.out ql/src/test/results/clientpositive/multilined_column.q.out new file mode 100644 index 0000000..95024a2 --- /dev/null +++ ql/src/test/results/clientpositive/multilined_column.q.out @@ -0,0 +1,110 @@ +PREHOOK: query: CREATE TABLE multilined1(text STRING, dummy STRING) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +WITH SERDEPROPERTIES ( + 'serialization.multilined.row.index'='0' +) +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE multilined1(text STRING, dummy STRING) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +WITH SERDEPROPERTIES ( + 'serialization.multilined.row.index'='0' +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@multilined1 +PREHOOK: query: load data local inpath '../data/files/multilined1.txt' overwrite into table multilined1 +PREHOOK: type: LOAD +PREHOOK: Output: default@multilined1 +POSTHOOK: query: load data local inpath '../data/files/multilined1.txt' overwrite into table multilined1 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@multilined1 +PREHOOK: query: select * from multilined1 +PREHOOK: type: QUERY +PREHOOK: Input: default@multilined1 +#### A masked pattern was here #### +POSTHOOK: query: select * from multilined1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@multilined1 +#### A masked pattern was here #### +Apache Hive (TM) @VERSION@ +Hive defines a simple SQL-like query language, called QL, that enables +users familiar with SQL to query the data. +At the same time, this +language also allows programmers who are familiar with the MapReduce +framework to be able to plug in their custom mappers and reducers to +perform more sophisticated analysis that may not be supported by the +built-in capabilities of the language. +QL can also be extended with +custom scalar functions (UDF's), aggregations (UDAF's), and table +functions (UDTF's). + +Please note that Hadoop is a batch processing system and Hadoop jobs +tend to have high latency and incur substantial overheads in job +submission and scheduling. +Consequently the average latency for Hive +queries is generally very high (minutes) even when data sets involved +are very small (say a few hundred megabytes). +As a result it cannot be +compared with systems such as Oracle where analyses are conducted on a +significantly smaller amount of data but the analyses proceed much +more iteratively with the response times between iterations being less +than a few minutes. +Hive aims to provide acceptable (but not optimal) +latency for interactive data browsing, queries over small data sets or +test queries. +PREHOOK: query: CREATE TABLE multilined2(linenum int, text STRING, length int) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +WITH SERDEPROPERTIES ( + 'serialization.multilined.row.index'='1' +) +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE multilined2(linenum int, text STRING, length int) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' +WITH SERDEPROPERTIES ( + 'serialization.multilined.row.index'='1' +) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@multilined2 +PREHOOK: query: load data local inpath '../data/files/multilined2.txt' overwrite into table multilined2 +PREHOOK: type: LOAD +PREHOOK: Output: default@multilined2 +POSTHOOK: query: load data local inpath '../data/files/multilined2.txt' overwrite into table multilined2 +POSTHOOK: type: LOAD +POSTHOOK: Output: default@multilined2 +PREHOOK: query: select * from multilined2 +PREHOOK: type: QUERY +PREHOOK: Input: default@multilined2 +#### A masked pattern was here #### +POSTHOOK: query: select * from multilined2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@multilined2 +#### A masked pattern was here #### +1 Apache Hive (TM) @VERSION@ 10 +2 Hive defines a simple SQL-like query language, called QL, that enables +users familiar with SQL to query the data. 20 +3 At the same time, this +language also allows programmers who are familiar with the MapReduce +framework to be able to plug in their custom mappers and reducers to +perform more sophisticated analysis that may not be supported by the +built-in capabilities of the language. 30 +4 QL can also be extended with +custom scalar functions (UDF's), aggregations (UDAF's), and table +functions (UDTF's). 40 +5 50 +6 Please note that Hadoop is a batch processing system and Hadoop jobs +tend to have high latency and incur substantial overheads in job +submission and scheduling. 60 +7 Consequently the average latency for Hive +queries is generally very high (minutes) even when data sets involved +are very small (say a few hundred megabytes). 70 +8 As a result it cannot be +compared with systems such as Oracle where analyses are conducted on a +significantly smaller amount of data but the analyses proceed much +more iteratively with the response times between iterations being less +than a few minutes. 80 +9 Hive aims to provide acceptable (but not optimal) +latency for interactive data browsing, queries over small data sets or +test queries. 90 diff --git serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/Constants.java serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/Constants.java index 2c32999..5f088af 100644 --- serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/Constants.java +++ serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/Constants.java @@ -5,21 +5,8 @@ */ package org.apache.hadoop.hive.serde; -import org.apache.commons.lang.builder.HashCodeBuilder; -import java.util.List; -import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; -import java.util.EnumMap; -import java.util.Set; import java.util.HashSet; -import java.util.EnumSet; -import java.util.Collections; -import java.util.BitSet; -import java.nio.ByteBuffer; -import java.util.Arrays; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.Set; public class Constants { @@ -33,7 +20,11 @@ public class Constants { public static final String SERIALIZATION_NULL_FORMAT = "serialization.null.format"; - public static final String SERIALIZATION_LAST_COLUMN_TAKES_REST = "serialization.last.column.takes.rest"; + public static final String SERIALIZATION_LAST_COLUMN_TAKES_REST + = "serialization.last.column.takes.rest"; + + public static final String SERIALIZATION_MULTILINED_ROW_INDEX + = "serialization.multilined.row.index"; public static final String SERIALIZATION_SORT_ORDER = "serialization.sort.order"; diff --git serde/src/java/org/apache/hadoop/hive/serde2/MultiLinedDeserializer.java serde/src/java/org/apache/hadoop/hive/serde2/MultiLinedDeserializer.java new file mode 100644 index 0000000..cb4c2bb --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/MultiLinedDeserializer.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import org.apache.hadoop.io.Writable; + +public interface MultiLinedDeserializer extends Deserializer { + + boolean isCompleted(Writable value) throws SerDeException; +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java index 0036a8e..ff80560 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java @@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.MultiLinedDeserializer; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -59,7 +60,7 @@ import org.apache.hadoop.io.Writable; * Also LazySimpleSerDe outputs typed columns instead of treating all columns as * String like MetadataTypedColumnsetSerDe. */ -public class LazySimpleSerDe implements SerDe { +public class LazySimpleSerDe implements SerDe, MultiLinedDeserializer { public static final Log LOG = LogFactory.getLog(LazySimpleSerDe.class .getName()); @@ -108,6 +109,63 @@ public class LazySimpleSerDe implements SerDe { return defaultVal; } + private transient int columnCount; + private transient byte[] buffer; + private transient int bufferLen; + + private static final byte[] NEWLINE = new byte[] {'\n'}; + + public boolean isCompleted(Writable value) throws SerDeException { + if (serdeParams.multiLinedColIndex < 0) { + return true; + } + byte[] data; + int length; + if (value instanceof BytesWritable) { + BytesWritable b = (BytesWritable) value; + // For backward-compatibility with hadoop 0.17 + data = b.getBytes(); + length = b.getLength(); + } else if (value instanceof Text) { + Text t = (Text) value; + data = t.getBytes(); + length = t.getLength(); + } else { + throw new SerDeException(getClass().toString() + + ": expects either BytesWritable or Text object!"); + } + appendData(data, length); + for (int i = 0; i < length; i++) { + if (data[i] == serdeParams.separators[0]) { + if (++columnCount == serdeParams.columnNames.size()) { + columnCount = 0; + return true; + } + } + } + if (columnCount != serdeParams.multiLinedColIndex && + ++columnCount == serdeParams.columnNames.size()) { + columnCount = 0; + return true; + } + appendData(NEWLINE, 1); + return false; + } + + private void appendData(byte[] data, int length) { + if (buffer == null) { + buffer = Arrays.copyOf(data, length << 1); + } else { + if (bufferLen + length > buffer.length) { + byte[] newBuffer = new byte[bufferLen + (length << 1)]; + System.arraycopy(buffer, 0, newBuffer, 0, bufferLen); + buffer = newBuffer; + } + System.arraycopy(data, 0, buffer, bufferLen, length); + } + bufferLen += length; + } + /** * SerDeParameters. * @@ -118,6 +176,7 @@ public class LazySimpleSerDe implements SerDe { Text nullSequence; TypeInfo rowTypeInfo; boolean lastColumnTakesRest; + int multiLinedColIndex; List columnNames; List columnTypes; @@ -231,6 +290,11 @@ public class LazySimpleSerDe implements SerDe { serdeParams.lastColumnTakesRest = (lastColumnTakesRestString != null && lastColumnTakesRestString .equalsIgnoreCase("true")); + String multiLinedColIndex = tbl + .getProperty(Constants.SERIALIZATION_MULTILINED_ROW_INDEX); + serdeParams.multiLinedColIndex = multiLinedColIndex != null + ? Integer.parseInt(multiLinedColIndex) : -1; + LazyUtils.extractColumnInfo(tbl, serdeParams, serdeName); // Create the LazyObject for storing the rows @@ -275,19 +339,28 @@ public class LazySimpleSerDe implements SerDe { if (byteArrayRef == null) { byteArrayRef = new ByteArrayRef(); } - if (field instanceof BytesWritable) { + byte[] data; + int length; + if (serdeParams.multiLinedColIndex >= 0) { + data = Arrays.copyOfRange(buffer, 0, bufferLen); + length = bufferLen; + bufferLen = 0; + } else if (field instanceof BytesWritable) { BytesWritable b = (BytesWritable) field; // For backward-compatibility with hadoop 0.17 - byteArrayRef.setData(b.getBytes()); - cachedLazyStruct.init(byteArrayRef, 0, b.getLength()); + data = b.getBytes(); + length = b.getLength(); } else if (field instanceof Text) { Text t = (Text) field; - byteArrayRef.setData(t.getBytes()); - cachedLazyStruct.init(byteArrayRef, 0, t.getLength()); + data = t.getBytes(); + length = t.getLength(); } else { throw new SerDeException(getClass().toString() + ": expects either BytesWritable or Text object!"); } + byteArrayRef.setData(data); + cachedLazyStruct.init(byteArrayRef, 0, length); + lastOperationSerialize = false; lastOperationDeserialize = true; return cachedLazyStruct;