diff --git a/data/files/parquet_create.txt b/data/files/parquet_create.txt
new file mode 100644
index 0000000..ccd48ee
--- /dev/null
+++ b/data/files/parquet_create.txt
@@ -0,0 +1,3 @@
+1|foo line1|key11:value11,key12:value12,key13:value13|a,b,c|one,two
+2|bar line2|key21:value21,key22:value22,key23:value23|d,e,f|three,four
+3|baz line3|key31:value31,key32:value32,key33:value33|g,h,i|five,six
diff --git a/pom.xml b/pom.xml
index 41f5337..fbb21df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,6 +127,7 @@
requires netty < 3.6.0 we force hadoops version
-->
3.4.0.Final
+ 1.3.2
0.10.1
2.5.0
1.0.1
@@ -222,6 +223,17 @@
${bonecp.version}
+ com.twitter
+ parquet-hadoop-bundle
+ ${parquet.version}
+
+
+ com.twitter
+ parquet-column
+ ${parquet.version}
+ tests
+
+
com.sun.jersey
jersey-core
${jersey.version}
diff --git a/ql/pom.xml b/ql/pom.xml
index 7087a4c..53d0b9e 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -67,6 +67,10 @@
${kryo.version}
+ com.twitter
+ parquet-hadoop-bundle
+
+
commons-codec
commons-codec
${commons-codec.version}
@@ -204,6 +208,12 @@
+ com.twitter
+ parquet-column
+ tests
+ test
+
+
junit
junit
${junit.version}
@@ -476,6 +486,7 @@
org.apache.hive:hive-exec
org.apache.hive:hive-serde
com.esotericsoftware.kryo:kryo
+ com.twiter:parquet-hadoop-bundle
org.apache.thrift:libthrift
commons-lang:commons-lang
org.json:json
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
new file mode 100644
index 0000000..2380ba9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
@@ -0,0 +1,128 @@
+/**
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import parquet.hadoop.ParquetInputFormat;
+import parquet.hadoop.ParquetInputSplit;
+import parquet.hadoop.util.ContextUtil;
+
+
+/**
+ *
+ * A Parquet InputFormat for Hive (with the deprecated package mapred)
+ *
+ * TODO : Refactor all of the wrappers here Talk about it on : https://github.com/Parquet/parquet-mr/pull/28s
+ *
+ */
+public class MapredParquetInputFormat extends FileInputFormat {
+
+ public static final Log LOG = LogFactory.getLog(MapredParquetInputFormat.class);
+
+ private final ParquetInputFormat realInput;
+ private final ProjectionPusher projectionPusher;
+
+ public MapredParquetInputFormat() {
+ this(new ParquetInputFormat(DataWritableReadSupport.class), new ProjectionPusher());
+ }
+
+ protected MapredParquetInputFormat(final ParquetInputFormat inputFormat, ProjectionPusher pusher) {
+ this.realInput = inputFormat;
+ this.projectionPusher = pusher;
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.InputSplit[] getSplits(
+ final org.apache.hadoop.mapred.JobConf job,
+ final int numSplits
+ ) throws IOException {
+ Path[] dirs = getInputPathsForJob(job);
+ if (dirs.length == 0) {
+ throw new IOException("No input paths specified in job");
+ }
+
+ return getSplits(job, numSplits, makeQualifiedPathFromPaths(dirs, job));
+ }
+
+ protected Path makeQualifiedPathFromPaths(
+ Path[] paths,
+ final org.apache.hadoop.mapred.JobConf job
+ ) throws IOException {
+ return new Path((paths[paths.length - 1]).makeQualified(getFsForJob(job)).toUri().getPath());
+ }
+
+ protected FileSystem getFsForJob(org.apache.hadoop.mapred.JobConf job) throws IOException {
+ return FileSystem.get(job);
+ }
+
+ protected Path[] getInputPathsForJob(org.apache.hadoop.mapred.JobConf job) {
+ return FileInputFormat.getInputPaths(job);
+ }
+
+ public org.apache.hadoop.mapred.InputSplit[] getSplits(
+ final org.apache.hadoop.mapred.JobConf job,
+ final int numSplits,
+ Path tmpPath
+ ) throws IOException {
+ final JobConf cloneJobConf = projectionPusher.pushProjectionsAndFilters(job, tmpPath);
+ final List splits = realInput.getSplits(getJobContext(cloneJobConf));
+
+ final InputSplit[] resultSplits = new InputSplit[splits.size()];
+ int i = 0;
+
+ for (final org.apache.hadoop.mapreduce.InputSplit split : splits) {
+ try {
+ resultSplits[i++] = new ParquetInputSplitWrapper((ParquetInputSplit) split);
+ } catch (final InterruptedException e) {
+ throw new RuntimeException("Cannot create an InputSplitWrapper", e);
+ }
+ }
+
+ return resultSplits;
+ }
+
+ protected JobContext getJobContext(JobConf jobConf) {
+ return ContextUtil.newJobContext(jobConf, null);
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordReader getRecordReader(
+ final org.apache.hadoop.mapred.InputSplit split,
+ final org.apache.hadoop.mapred.JobConf job,
+ final org.apache.hadoop.mapred.Reporter reporter
+ ) throws IOException {
+ try {
+ return (RecordReader) new ParquetRecordReaderWrapper(realInput, split, job, reporter, projectionPusher);
+ } catch (final InterruptedException e) {
+ throw new RuntimeException("Cannot create a RecordReaderWrapper", e);
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
new file mode 100644
index 0000000..aa5768a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java
@@ -0,0 +1,128 @@
+/**
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
+import org.apache.hadoop.hive.ql.io.parquet.convert.HiveSchemaConverter;
+import org.apache.hadoop.hive.ql.io.parquet.write.DataWritableWriteSupport;
+import org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.util.Progressable;
+
+import parquet.hadoop.ParquetOutputFormat;
+
+/**
+ * A Parquet OutputFormat for Hive (with the deprecated package mapred)
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class MapredParquetOutputFormat extends FileOutputFormat implements
+HiveOutputFormat {
+
+ protected ParquetOutputFormat realOutputFormat;
+ public static final Log LOG = LogFactory.getLog(MapredParquetOutputFormat.class);
+
+ public MapredParquetOutputFormat() {
+ realOutputFormat = new ParquetOutputFormat(new DataWritableWriteSupport());
+ }
+
+ public MapredParquetOutputFormat(final OutputFormat mapreduceOutputFormat) {
+ realOutputFormat = (ParquetOutputFormat) mapreduceOutputFormat;
+ }
+
+ @Override
+ public void checkOutputSpecs(final FileSystem ignored, final JobConf job) throws IOException {
+ realOutputFormat.checkOutputSpecs(ShimLoader.getHadoopShims().getHCatShim().createJobContext(job, null));
+ }
+
+ @Override
+ public RecordWriter getRecordWriter(
+ final FileSystem ignored,
+ final JobConf job,
+ final String name,
+ final Progressable progress
+ ) throws IOException {
+ throw new RuntimeException("Should never be used");
+ }
+
+ /**
+ *
+ * Create the parquet schema from the hive schema, and return the RecordWriterWrapper which
+ * contains the real output format
+ */
+ @Override
+ public FSRecordWriter getHiveRecordWriter(
+ final JobConf jobConf,
+ final Path finalOutPath,
+ final Class extends Writable> valueClass,
+ final boolean isCompressed,
+ final Properties tableProperties,
+ final Progressable progress) throws IOException {
+
+ LOG.info("getHiveRecordWriter " + this);
+ LOG.info("creating new record writer...");
+
+ // Seriously? Hard coded property names?
+ final String columnNameProperty = tableProperties.getProperty("columns");
+ final String columnTypeProperty = tableProperties.getProperty("columns.types");
+ List columnNames;
+ List columnTypes;
+
+ if (columnNameProperty.length() == 0) {
+ columnNames = new ArrayList();
+ } else {
+ columnNames = Arrays.asList(columnNameProperty.split(","));
+ }
+
+ if (columnTypeProperty.length() == 0) {
+ columnTypes = new ArrayList();
+ } else {
+ columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+ }
+
+ DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jobConf);
+ return getParquerRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress);
+ }
+
+ protected ParquetRecordWriterWrapper getParquerRecordWriterWrapper(
+ ParquetOutputFormat realOutputFormat,
+ JobConf jobConf,
+ String finalOutPath,
+ Progressable progress
+ ) throws IOException {
+ return new ParquetRecordWriterWrapper(realOutputFormat, jobConf, finalOutPath.toString(), progress);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetInputSplitWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetInputSplitWrapper.java
new file mode 100644
index 0000000..f18695f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetInputSplitWrapper.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import parquet.hadoop.ParquetInputSplit;
+
+public class ParquetInputSplitWrapper extends FileSplit implements InputSplit {
+
+ private ParquetInputSplit realSplit;
+
+ public ParquetInputSplit getRealSplit() {
+ return realSplit;
+ }
+
+ // MapReduce instantiates this.
+ public ParquetInputSplitWrapper() {
+ super((Path) null, 0, 0, (String[]) null);
+ }
+
+ public ParquetInputSplitWrapper(final ParquetInputSplit realSplit) throws IOException, InterruptedException {
+ super(realSplit.getPath(), realSplit.getStart(), realSplit.getLength(), realSplit.getLocations());
+ this.realSplit = realSplit;
+ }
+
+ @Override
+ public long getLength() {
+ if (realSplit == null) {
+ return 0;
+ } else {
+ try {
+ return realSplit.getLength();
+ } catch (IOException ex) {
+ throw new RuntimeException("Cannot get the length of the ParquetInputSplit: " + realSplit, ex);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException("Cannot get the length of the ParquetInputSplit: " + realSplit, ex);
+ }
+ }
+ }
+
+ @Override
+ public String[] getLocations() throws IOException {
+ try {
+ return realSplit.getLocations();
+ } catch (final InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void readFields(final DataInput in) throws IOException {
+ final String className = WritableUtils.readString(in);
+ Class> splitClass;
+
+ try {
+ splitClass = Class.forName(className);
+ } catch (final ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+
+ realSplit = (ParquetInputSplit) ReflectionUtils.newInstance(splitClass, null);
+ ((Writable) realSplit).readFields(in);
+ }
+
+ @Override
+ public void write(final DataOutput out) throws IOException {
+ WritableUtils.writeString(out, realSplit.getClass().getName());
+ ((Writable) realSplit).write(out);
+ }
+
+ @Override
+ public Path getPath() {
+ return realSplit.getPath();
+ }
+
+ @Override
+ public long getStart() {
+ return realSplit.getStart();
+ }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
new file mode 100644
index 0000000..0288e38
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+
+public class ProjectionPusher {
+
+ public static final Log LOG = LogFactory.getLog(ProjectionPusher.class);
+
+ private final Map pathToPartitionInfo =
+ new LinkedHashMap();
+ /**
+ * MapredWork is the Hive object which describes input files,
+ * columns projections, and filters.
+ */
+ private MapWork mapWork;
+
+ private static final List virtualColumns;
+
+ static {
+ List vcols = new ArrayList();
+ vcols.add("INPUT__FILE__NAME");
+ vcols.add("BLOCK__OFFSET__INSIDE__FILE");
+ vcols.add("ROW__OFFSET__INSIDE__BLOCK");
+ vcols.add("RAW__DATA__SIZE");
+ virtualColumns = Collections.unmodifiableList(vcols);
+ }
+
+ public List getColumns(final String columns) {
+ final List result = (List) StringUtils.getStringCollection(columns);
+ result.removeAll(virtualColumns);
+ return result;
+ }
+
+ /**
+ * Sets the mrwork variable based on the current JobConf in order to get all partitions.
+ *
+ * @param job
+ */
+ private void updateMrWork(final JobConf job) {
+ final String plan = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
+ if (mapWork == null && plan != null && plan.length() > 0) {
+ mapWork = Utilities.getMapWork(job);
+ pathToPartitionInfo.clear();
+ for (final Map.Entry entry : mapWork.getPathToPartitionInfo().entrySet()) {
+ pathToPartitionInfo.put(new Path(entry.getKey()).toUri().getPath().toString(), entry.getValue());
+ }
+ }
+ }
+
+ private void pushProjectionsAndFilters(final JobConf jobConf,
+ final String splitPath, final String splitPathWithNoSchema) {
+
+ if (mapWork == null) {
+ //LOG.debug("Not pushing projections and filters because MapredWork is null");
+ return;
+ } else if (mapWork.getPathToAliases() == null) {
+ //LOG.debug("Not pushing projections and filters because pathToAliases is null");
+ return;
+ }
+
+ final ArrayList aliases = new ArrayList();
+ final Iterator>> iterator = mapWork.getPathToAliases().entrySet().iterator();
+
+ while (iterator.hasNext()) {
+ final Entry> entry = iterator.next();
+ final String key = new Path(entry.getKey()).toUri().getPath();
+
+ if (splitPath.equals(key) || splitPathWithNoSchema.equals(key)) {
+ final ArrayList list = entry.getValue();
+ for (final String val : list) {
+ aliases.add(val);
+ }
+ }
+ }
+
+ for (final String alias : aliases) {
+ final Operator extends Serializable> op = mapWork.getAliasToWork().get(
+ alias);
+ if (op != null && op instanceof TableScanOperator) {
+ final TableScanOperator tableScan = (TableScanOperator) op;
+
+ // push down projections
+ final List list = tableScan.getNeededColumnIDs();
+
+ if (list != null) {
+ ColumnProjectionUtils.appendReadColumnIDs(jobConf, list);
+ } else {
+ ColumnProjectionUtils.setFullyReadColumns(jobConf);
+ }
+
+ pushFilters(jobConf, tableScan);
+ }
+ }
+ }
+
+ private void pushFilters(final JobConf jobConf, final TableScanOperator tableScan) {
+
+ final TableScanDesc scanDesc = tableScan.getConf();
+ if (scanDesc == null) {
+ LOG.debug("Not pushing filters because TableScanDesc is null");
+ return;
+ }
+
+ // construct column name list for reference by filter push down
+ Utilities.setColumnNameList(jobConf, tableScan);
+
+ // push down filters
+ final ExprNodeGenericFuncDesc filterExpr = scanDesc.getFilterExpr();
+ if (filterExpr == null) {
+ LOG.debug("Not pushing filters because FilterExpr is null");
+ return;
+ }
+
+ final String filterText = filterExpr.getExprString();
+ final String filterExprSerialized = Utilities.serializeExpression(filterExpr);
+ jobConf.set(
+ TableScanDesc.FILTER_TEXT_CONF_STR,
+ filterText);
+ jobConf.set(
+ TableScanDesc.FILTER_EXPR_CONF_STR,
+ filterExprSerialized);
+ }
+
+
+ public JobConf pushProjectionsAndFilters(JobConf jobConf, Path path)
+ throws IOException {
+ updateMrWork(jobConf); // TODO: refactor this
+ final JobConf cloneJobConf = new JobConf(jobConf);
+ final PartitionDesc part = pathToPartitionInfo.get(path.toString());
+
+ if ((part != null) && (part.getTableDesc() != null)) {
+ Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), cloneJobConf);
+ }
+
+ pushProjectionsAndFilters(cloneJobConf, path.toString(), path.toUri().toString());
+ return cloneJobConf;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java
new file mode 100644
index 0000000..68fc99e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java
@@ -0,0 +1,95 @@
+/**
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.convert;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+
+import parquet.io.ParquetDecodingException;
+import parquet.io.api.Converter;
+import parquet.schema.GroupType;
+
+/**
+ *
+ * A ArrayWritableGroupConverter
+ *
+ */
+public class ArrayWritableGroupConverter extends HiveGroupConverter {
+
+ private final Converter[] converters;
+ private final HiveGroupConverter parent;
+ private final int index;
+ private final boolean isMap;
+ private Writable currentValue;
+ private Writable[] mapPairContainer;
+
+ public ArrayWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent, final int index) {
+ this.parent = parent;
+ this.index = index;
+
+ if (groupType.getFieldCount() == 2) {
+ converters = new Converter[2];
+ converters[0] = getConverterFromDescription(groupType.getType(0), 0, this);
+ converters[1] = getConverterFromDescription(groupType.getType(1), 1, this);
+ isMap = true;
+ } else if (groupType.getFieldCount() == 1) {
+ converters = new Converter[1];
+ converters[0] = getConverterFromDescription(groupType.getType(0), 0, this);
+ isMap = false;
+ } else {
+ throw new RuntimeException("Invalid parquet hive schema: " + groupType);
+ }
+
+ }
+
+ @Override
+ public Converter getConverter(final int fieldIndex) {
+ return converters[fieldIndex];
+ }
+
+ @Override
+ public void start() {
+ if (isMap) {
+ mapPairContainer = new Writable[2];
+ }
+ }
+
+ @Override
+ public void end() {
+ if (isMap) {
+ currentValue = new ArrayWritable(Writable.class, mapPairContainer);
+ }
+ parent.add(index, currentValue);
+ }
+
+ @Override
+ protected void set(final int index, final Writable value) {
+ if (index != 0 && mapPairContainer == null || index > 1) {
+ throw new ParquetDecodingException("Repeated group can only have one or two fields for maps. Not allowed to set for the index : " + index);
+ }
+
+ if (isMap) {
+ mapPairContainer[index] = value;
+ } else {
+ currentValue = value;
+ }
+ }
+
+ @Override
+ protected void add(final int index, final Writable value) {
+ set(index, value);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java
new file mode 100644
index 0000000..700f3b3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java
@@ -0,0 +1,140 @@
+/**
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.convert;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+
+import parquet.io.api.Converter;
+import parquet.schema.GroupType;
+import parquet.schema.Type;
+
+/**
+ *
+ * A MapWritableGroupConverter, real converter between hive and parquet types recursively for complex types.
+ *
+ */
+public class DataWritableGroupConverter extends HiveGroupConverter {
+
+ private final Converter[] converters;
+ private final HiveGroupConverter parent;
+ private final int index;
+ private final Object[] currentArr;
+ private Writable[] rootMap;
+
+ public DataWritableGroupConverter(final GroupType requestedSchema, final GroupType tableSchema) {
+ this(requestedSchema, null, 0, tableSchema);
+ final int fieldCount = tableSchema.getFieldCount();
+ this.rootMap = new Writable[fieldCount];
+ }
+
+ public DataWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent, final int index) {
+ this(groupType, parent, index, groupType);
+ }
+
+ public DataWritableGroupConverter(final GroupType selectedGroupType, final HiveGroupConverter parent, final int index, final GroupType containingGroupType) {
+ this.parent = parent;
+ this.index = index;
+ final int totalFieldCount = containingGroupType.getFieldCount();
+ final int selectedFieldCount = selectedGroupType.getFieldCount();
+
+ currentArr = new Object[totalFieldCount];
+ converters = new Converter[selectedFieldCount];
+
+ int i = 0;
+ for (final Type subtype : selectedGroupType.getFields()) {
+ if (containingGroupType.getFields().contains(subtype)) {
+ converters[i] = getConverterFromDescription(subtype, containingGroupType.getFieldIndex(subtype.getName()), this);
+ } else {
+ throw new RuntimeException("Group type [" + containingGroupType + "] does not contain requested field: " + subtype);
+ }
+ ++i;
+ }
+ }
+
+ final public ArrayWritable getCurrentArray() {
+ final Writable[] writableArr;
+ if (this.rootMap != null) { // We're at the root : we can safely re-use the same map to save perf
+ writableArr = this.rootMap;
+ } else {
+ writableArr = new Writable[currentArr.length];
+ }
+
+ for (int i = 0; i < currentArr.length; i++) {
+ final Object obj = currentArr[i];
+ if (obj instanceof List) {
+ final List> objList = (List>)obj;
+ final ArrayWritable arr = new ArrayWritable(Writable.class, objList.toArray(new Writable[objList.size()]));
+ writableArr[i] = arr;
+ } else {
+ writableArr[i] = (Writable) obj;
+ }
+ }
+ return new ArrayWritable(Writable.class, writableArr);
+ }
+
+ @Override
+ final protected void set(final int index, final Writable value) {
+ currentArr[index] = value;
+ }
+
+ @Override
+ public Converter getConverter(final int fieldIndex) {
+ return converters[fieldIndex];
+ }
+
+ @Override
+ public void start() {
+ for (int i = 0; i < currentArr.length; i++) {
+ currentArr[i] = null;
+ }
+ }
+
+ @Override
+ public void end() {
+ if (parent != null) {
+ parent.set(index, getCurrentArray());
+ }
+ }
+
+ @Override
+ protected void add(final int index, final Writable value) {
+
+ if (currentArr[index] != null) {
+
+ final Object obj = currentArr[index];
+ if (obj instanceof List) {
+ final List list = (List) obj;
+ list.add(value);
+ } else {
+ throw new RuntimeException("This should be a List: " + obj);
+ }
+
+ } else {
+ // create a list here because we don't know the final length of the object
+ // and it is more flexible than ArrayWritable.
+ //
+ // converted to ArrayWritable by getCurrentArray().
+ final List buffer = new ArrayList();
+ buffer.add(value);
+ currentArr[index] = (Object) buffer;
+ }
+
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java
new file mode 100644
index 0000000..7087f16
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java
@@ -0,0 +1,46 @@
+/**
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.convert;
+
+import org.apache.hadoop.io.ArrayWritable;
+
+import parquet.io.api.GroupConverter;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.GroupType;
+
+/**
+ *
+ * A MapWritableReadSupport, encapsulates the tuples
+ *
+ */
+public class DataWritableRecordConverter extends RecordMaterializer {
+
+ private final DataWritableGroupConverter root;
+
+ public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema) {
+ this.root = new DataWritableGroupConverter(requestedSchema, tableSchema);
+ }
+
+ @Override
+ public ArrayWritable getCurrentRecord() {
+ return root.getCurrentArray();
+ }
+
+ @Override
+ public GroupConverter getRootConverter() {
+ return root;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java
new file mode 100644
index 0000000..f977df2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java
@@ -0,0 +1,163 @@
+/**
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.convert;
+
+import java.math.BigDecimal;
+
+import org.apache.hadoop.hive.ql.io.parquet.writable.BinaryWritable;
+import org.apache.hadoop.hive.ql.io.parquet.writable.BinaryWritable.DicBinaryWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import parquet.column.Dictionary;
+import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.PrimitiveConverter;
+
+/**
+ *
+ * ETypeConverter is an easy way to set the converter for the right type.
+ *
+ *
+ */
+public enum ETypeConverter {
+
+ EDOUBLE_CONVERTER(Double.TYPE) {
+ @Override
+ Converter getConverter(final Class> type, final int index, final HiveGroupConverter parent) {
+ return new PrimitiveConverter() {
+ @Override
+ final public void addDouble(final double value) {
+ parent.set(index, new DoubleWritable(value));
+ }
+ };
+ }
+ },
+ EBOOLEAN_CONVERTER(Boolean.TYPE) {
+ @Override
+ Converter getConverter(final Class> type, final int index, final HiveGroupConverter parent) {
+ return new PrimitiveConverter() {
+ @Override
+ final public void addBoolean(final boolean value) {
+ parent.set(index, new BooleanWritable(value));
+ }
+ };
+ }
+ },
+ EFLOAT_CONVERTER(Float.TYPE) {
+ @Override
+ Converter getConverter(final Class> type, final int index, final HiveGroupConverter parent) {
+ return new PrimitiveConverter() {
+ @Override
+ final public void addFloat(final float value) {
+ parent.set(index, new FloatWritable(value));
+ }
+ };
+ }
+ },
+ EINT32_CONVERTER(Integer.TYPE) {
+ @Override
+ Converter getConverter(final Class> type, final int index, final HiveGroupConverter parent) {
+ return new PrimitiveConverter() {
+ @Override
+ final public void addInt(final int value) {
+ parent.set(index, new IntWritable(value));
+ }
+ };
+ }
+ },
+ EINT64_CONVERTER(Long.TYPE) {
+ @Override
+ Converter getConverter(final Class> type, final int index, final HiveGroupConverter parent) {
+ return new PrimitiveConverter() {
+ @Override
+ final public void addLong(final long value) {
+ parent.set(index, new LongWritable(value));
+ }
+ };
+ }
+ },
+ EINT96_CONVERTER(BigDecimal.class) {
+ @Override
+ Converter getConverter(final Class> type, final int index, final HiveGroupConverter parent) {
+ return new PrimitiveConverter() {
+ @Override
+ final public void addDouble(final double value) {
+ parent.set(index, new DoubleWritable(value));
+ }
+ };
+ }
+ },
+ EBINARY_CONVERTER(Binary.class) {
+ @Override
+ Converter getConverter(final Class> type, final int index, final HiveGroupConverter parent) {
+ return new PrimitiveConverter() {
+ private Binary[] dictBinary;
+ private String[] dict;
+
+ @Override
+ public boolean hasDictionarySupport() {
+ return true;
+ }
+
+ @Override
+ public void setDictionary(Dictionary dictionary) {
+ dictBinary = new Binary[dictionary.getMaxId() + 1];
+ dict = new String[dictionary.getMaxId() + 1];
+ for (int i = 0; i <= dictionary.getMaxId(); i++) {
+ Binary binary = dictionary.decodeToBinary(i);
+ dictBinary[i] = binary;
+ dict[i] = binary.toStringUsingUTF8();
+ }
+ }
+
+ @Override
+ public void addValueFromDictionary(int dictionaryId) {
+ parent.set(index, new DicBinaryWritable(dictBinary[dictionaryId], dict[dictionaryId]));
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ parent.set(index, new BinaryWritable(value));
+ }
+ };
+ }
+ };
+ final Class> _type;
+
+ private ETypeConverter(final Class> type) {
+ this._type = type;
+ }
+
+ private Class> getType() {
+ return _type;
+ }
+
+ abstract Converter getConverter(final Class> type, final int index, final HiveGroupConverter parent);
+
+ static public Converter getNewConverter(final Class> type, final int index, final HiveGroupConverter parent) {
+ for (final ETypeConverter eConverter : values()) {
+ if (eConverter.getType() == type) {
+ return eConverter.getConverter(type, index, parent);
+ }
+ }
+ throw new RuntimeException("Converter not found ... for type : " + type);
+ }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java
new file mode 100644
index 0000000..6138424
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java
@@ -0,0 +1,47 @@
+/**
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.convert;
+
+import org.apache.hadoop.io.Writable;
+
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.schema.Type;
+import parquet.schema.Type.Repetition;
+
+public abstract class HiveGroupConverter extends GroupConverter {
+
+ static protected Converter getConverterFromDescription(final Type type, final int index, final HiveGroupConverter parent) {
+ if (type == null) {
+ return null;
+ }
+
+ if (type.isPrimitive()) {
+ return ETypeConverter.getNewConverter(type.asPrimitiveType().getPrimitiveTypeName().javaType, index, parent);
+ } else {
+ if (type.asGroupType().getRepetition() == Repetition.REPEATED) {
+ return new ArrayWritableGroupConverter(type.asGroupType(), parent, index);
+ } else {
+ return new DataWritableGroupConverter(type.asGroupType(), parent, index);
+ }
+ }
+ }
+
+ abstract protected void set(int index, Writable value);
+
+ abstract protected void add(int index, Writable value);
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java
new file mode 100644
index 0000000..7ec91fe
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java
@@ -0,0 +1,136 @@
+/**
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.convert;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import parquet.Log;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.Type;
+import parquet.schema.Type.Repetition;
+
+/**
+ *
+ * A HiveSchemaConverter
+ *
+ *
+ */
+public class HiveSchemaConverter {
+
+ private static final Log LOG = Log.getLog(HiveSchemaConverter.class);
+
+ static public MessageType convert(final List columnNames, final List columnTypes) {
+ final MessageType schema = new MessageType("hive_schema", convertTypes(columnNames, columnTypes));
+ return schema;
+ }
+
+ static private Type[] convertTypes(final List columnNames, final List columnTypes) {
+ if (columnNames.size() != columnTypes.size()) {
+ throw new RuntimeException("Mismatched Hive columns and types. Hive columns names found : " + columnNames
+ + " . And Hive types found : " + columnTypes);
+ }
+
+ final Type[] types = new Type[columnNames.size()];
+
+ for (int i = 0; i < columnNames.size(); ++i) {
+ types[i] = convertType(columnNames.get(i), columnTypes.get(i));
+ }
+
+ return types;
+ }
+
+ static private Type convertType(final String name, final TypeInfo typeInfo) {
+ return convertType(name, typeInfo, Repetition.OPTIONAL);
+ }
+
+ static private Type convertType(final String name, final TypeInfo typeInfo, final Repetition repetition) {
+ if (typeInfo.getCategory().equals(Category.PRIMITIVE)) {
+ if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.BINARY, name);
+ } else if (typeInfo.equals(TypeInfoFactory.intTypeInfo) || typeInfo.equals(TypeInfoFactory.shortTypeInfo) || typeInfo.equals(TypeInfoFactory.byteTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT32, name);
+ } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT64, name);
+ } else if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.DOUBLE, name);
+ } else if (typeInfo.equals(TypeInfoFactory.floatTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.FLOAT, name);
+ } else if (typeInfo.equals(TypeInfoFactory.booleanTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.BOOLEAN, name);
+ } else if (typeInfo.equals(TypeInfoFactory.binaryTypeInfo)) {
+ // TODO : binaryTypeInfo is a byte array. Need to map it
+ throw new UnsupportedOperationException("Binary type not implemented");
+ } else if (typeInfo.equals(TypeInfoFactory.timestampTypeInfo)) {
+ throw new UnsupportedOperationException("Timestamp type not implemented");
+ } else if (typeInfo.equals(TypeInfoFactory.voidTypeInfo)) {
+ throw new UnsupportedOperationException("Void type not implemented");
+ } else if (typeInfo.equals(TypeInfoFactory.unknownTypeInfo)) {
+ throw new UnsupportedOperationException("Unknown type not implemented");
+ } else {
+ throw new RuntimeException("Unknown type: " + typeInfo);
+ }
+ } else if (typeInfo.getCategory().equals(Category.LIST)) {
+ return convertArrayType(name, (ListTypeInfo) typeInfo);
+ } else if (typeInfo.getCategory().equals(Category.STRUCT)) {
+ return convertStructType(name, (StructTypeInfo) typeInfo);
+ } else if (typeInfo.getCategory().equals(Category.MAP)) {
+ return convertMapType(name, (MapTypeInfo) typeInfo);
+ } else if (typeInfo.getCategory().equals(Category.UNION)) {
+ throw new UnsupportedOperationException("Union type not implemented");
+ } else {
+ throw new RuntimeException("Unknown type: " + typeInfo);
+ }
+ }
+
+ // An optional group containing a repeated anonymous group "bag", containing
+ // 1 anonymous element "array_element"
+ static private GroupType convertArrayType(final String name, final ListTypeInfo typeInfo) {
+ final TypeInfo subType = typeInfo.getListElementTypeInfo();
+ return listWrapper(name, OriginalType.LIST, new GroupType(Repetition.REPEATED, ParquetHiveSerDe.ARRAY.toString(), convertType("array_element", subType)));
+ }
+
+ // An optional group containing multiple elements
+ static private GroupType convertStructType(final String name, final StructTypeInfo typeInfo) {
+ final List columnNames = typeInfo.getAllStructFieldNames();
+ final List columnTypes = typeInfo.getAllStructFieldTypeInfos();
+ return new GroupType(Repetition.OPTIONAL, name, convertTypes(columnNames, columnTypes));
+
+ }
+
+ // An optional group containing a repeated anonymous group "map", containing
+ // 2 elements: "key", "value"
+ static private GroupType convertMapType(final String name, final MapTypeInfo typeInfo) {
+ final Type keyType = convertType(ParquetHiveSerDe.MAP_KEY.toString(), typeInfo.getMapKeyTypeInfo(), Repetition.REQUIRED);
+ final Type valueType = convertType(ParquetHiveSerDe.MAP_VALUE.toString(), typeInfo.getMapValueTypeInfo());
+ return listWrapper(name, OriginalType.MAP_KEY_VALUE, new GroupType(Repetition.REPEATED, ParquetHiveSerDe.MAP.toString(), keyType, valueType));
+ }
+
+ static private GroupType listWrapper(final String name, final OriginalType originalType, final GroupType groupType) {
+ return new GroupType(Repetition.OPTIONAL, name, originalType, groupType);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
new file mode 100644
index 0000000..ff16988
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
@@ -0,0 +1,135 @@
+/**
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.read;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.util.StringUtils;
+
+import parquet.hadoop.api.ReadSupport;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+import parquet.schema.PrimitiveType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.Type;
+import parquet.schema.Type.Repetition;
+
+/**
+ *
+ * A MapWritableReadSupport
+ *
+ * Manages the translation between Hive and Parquet
+ *
+ */
+public class DataWritableReadSupport extends ReadSupport {
+
+ public static final String HIVE_SCHEMA_KEY = "HIVE_TABLE_SCHEMA";
+ private static final List virtualColumns;
+
+ static {
+ List vcols = new ArrayList();
+ vcols.add("INPUT__FILE__NAME");
+ vcols.add("BLOCK__OFFSET__INSIDE__FILE");
+ vcols.add("ROW__OFFSET__INSIDE__BLOCK");
+ vcols.add("RAW__DATA__SIZE");
+ virtualColumns = Collections.unmodifiableList(vcols);
+ }
+
+ /**
+ * From a string which columns names (including hive column), return a list
+ * of string columns
+ *
+ * @param comma separated list of columns
+ * @return list with virtual columns removed
+ */
+ private static List getColumns(final String columns) {
+ final List result = (List) StringUtils.getStringCollection(columns);
+ result.removeAll(virtualColumns);
+ return result;
+ }
+ /**
+ *
+ * It creates the readContext for Parquet side with the requested schema during the init phase.
+ *
+ * @param configuration needed to get the wanted columns
+ * @param keyValueMetaData // unused
+ * @param fileSchema parquet file schema
+ * @return the parquet ReadContext
+ */
+ @Override
+ public parquet.hadoop.api.ReadSupport.ReadContext init(final Configuration configuration, final Map keyValueMetaData, final MessageType fileSchema) {
+ final String columns = configuration.get("columns");
+ final Map contextMetadata = new HashMap();
+ if (columns != null) {
+ final List listColumns = getColumns(columns);
+
+ final List typeListTable = new ArrayList();
+ for (final String col : listColumns) {
+ if (fileSchema.containsField(col)) {
+ typeListTable.add(fileSchema.getType(col));
+ } else { // dummy type, should not be called
+ typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, col));
+ }
+ }
+ MessageType tableSchema = new MessageType("table_schema", typeListTable);
+ contextMetadata.put(HIVE_SCHEMA_KEY, tableSchema.toString());
+
+ MessageType requestedSchemaByUser = tableSchema;
+ final List indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
+
+ final List typeListWanted = new ArrayList();
+ for (final Integer idx : indexColumnsWanted) {
+ typeListWanted.add(tableSchema.getType(listColumns.get(idx)));
+ }
+ requestedSchemaByUser = new MessageType(fileSchema.getName(), typeListWanted);
+
+ return new ReadContext(requestedSchemaByUser, contextMetadata);
+ } else {
+ contextMetadata.put(HIVE_SCHEMA_KEY, fileSchema.toString());
+ return new ReadContext(fileSchema, contextMetadata);
+ }
+ }
+
+ /**
+ *
+ * It creates the hive read support to interpret data from parquet to hive
+ *
+ * @param configuration // unused
+ * @param keyValueMetaData
+ * @param fileSchema // unused
+ * @param readContext containing the requested schema and the schema of the hive table
+ * @return Record Materialize for Hive
+ */
+ @Override
+ public RecordMaterializer prepareForRead(final Configuration configuration, final Map keyValueMetaData, final MessageType fileSchema,
+ final parquet.hadoop.api.ReadSupport.ReadContext readContext) {
+ final Map metadata = readContext.getReadSupportMetadata();
+ if (metadata == null) {
+ throw new RuntimeException("ReadContext not initialized properly. Don't know the Hive Schema.");
+ }
+ final MessageType tableSchema = MessageTypeParser.parseMessageType(metadata.get(HIVE_SCHEMA_KEY));
+ return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
new file mode 100644
index 0000000..bbdeca2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.read;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.parquet.ParquetInputSplitWrapper;
+import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.ParquetInputFormat;
+import parquet.hadoop.ParquetInputSplit;
+import parquet.hadoop.api.ReadSupport.ReadContext;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.FileMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.hadoop.util.ContextUtil;
+import parquet.schema.MessageTypeParser;
+
+public class ParquetRecordReaderWrapper implements RecordReader {
+ public static final Log LOG = LogFactory.getLog(ParquetRecordReaderWrapper.class);
+
+ private final long splitLen; // for getPos()
+
+ private org.apache.hadoop.mapreduce.RecordReader realReader;
+ // expect readReader return same Key & Value objects (common case)
+ // this avoids extra serialization & deserialization of these objects
+ private ArrayWritable valueObj = null;
+ private boolean firstRecord = false;
+ private boolean eof = false;
+ private int schemaSize;
+
+ private final ProjectionPusher projectionPusher;
+
+ public ParquetRecordReaderWrapper(
+ final ParquetInputFormat newInputFormat,
+ final InputSplit oldSplit,
+ final JobConf oldJobConf,
+ final Reporter reporter,
+ final ProjectionPusher pusher)
+ throws IOException, InterruptedException {
+ this.splitLen = oldSplit.getLength();
+ this.projectionPusher = pusher;
+
+ final ParquetInputSplit split = getSplit(oldSplit, oldJobConf);
+
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName(oldJobConf.get("mapred.task.id"));
+ if (taskAttemptID == null) {
+ taskAttemptID = new TaskAttemptID();
+ }
+
+ // create a TaskInputOutputContext
+ final TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(oldJobConf, taskAttemptID);
+
+ if (split != null) {
+ try {
+ realReader = newInputFormat.createRecordReader(split, taskContext);
+ realReader.initialize(split, taskContext);
+
+ // read once to gain access to key and value objects
+ if (realReader.nextKeyValue()) {
+ firstRecord = true;
+ valueObj = realReader.getCurrentValue();
+ } else {
+ eof = true;
+ }
+ } catch (final InterruptedException e) {
+ throw new IOException(e);
+ }
+ } else {
+ realReader = null;
+ eof = true;
+ if (valueObj == null) { // Should initialize the value for createValue
+ valueObj = new ArrayWritable(Writable.class, new Writable[schemaSize]);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (realReader != null) {
+ realReader.close();
+ }
+ }
+
+ @Override
+ public Void createKey() {
+ return null;
+ }
+
+ @Override
+ public ArrayWritable createValue() {
+ return valueObj;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return (long) (splitLen * getProgress());
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ if (realReader == null) {
+ return 1f;
+ } else {
+ try {
+ return realReader.getProgress();
+ } catch (final InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ @Override
+ public boolean next(final Void key, final ArrayWritable value) throws IOException {
+ if (eof) {
+ return false;
+ }
+
+ try {
+ if (firstRecord) { // key & value are already read.
+ firstRecord = false;
+ } else if (!realReader.nextKeyValue()) {
+ eof = true; // strictly not required, just for consistency
+ return false;
+ }
+
+ final ArrayWritable tmpCurValue = realReader.getCurrentValue();
+
+ if (value != tmpCurValue) {
+ final Writable[] arrValue = value.get();
+ final Writable[] arrCurrent = tmpCurValue.get();
+ if (value != null && arrValue.length == arrCurrent.length) {
+ System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length);
+ } else {
+ if (arrValue.length != arrCurrent.length) {
+ throw new IOException("DeprecatedParquetHiveInput : size of object differs. Value size : " + arrValue.length + ", Current Object size : "
+ + arrCurrent.length);
+ } else {
+ throw new IOException("DeprecatedParquetHiveInput can not support RecordReaders that don't return same key & value & value is null");
+ }
+ }
+ }
+ return true;
+
+ } catch (final InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * gets a ParquetInputSplit corresponding to a split given by Hive
+ *
+ * @param oldSplit The split given by Hive
+ * @param conf The JobConf of the Hive job
+ * @return a ParquetInputSplit corresponding to the oldSplit
+ * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file
+ */
+ protected ParquetInputSplit getSplit(
+ final InputSplit oldSplit,
+ final JobConf conf
+ ) throws IOException {
+
+ ParquetInputSplit split;
+
+ if (oldSplit instanceof ParquetInputSplitWrapper) {
+ split = ((ParquetInputSplitWrapper) oldSplit).getRealSplit();
+ } else if (oldSplit instanceof FileSplit) {
+ final Path finalPath = ((FileSplit) oldSplit).getPath();
+ final JobConf cloneJob = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent());
+
+ final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath);
+ final List blocks = parquetMetadata.getBlocks();
+ final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
+
+ final ReadContext readContext = new DataWritableReadSupport().init(cloneJob, fileMetaData.getKeyValueMetaData(), fileMetaData.getSchema());
+ schemaSize = MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata().get(DataWritableReadSupport.HIVE_SCHEMA_KEY)).getFieldCount();
+
+ final List splitGroup = new ArrayList();
+ final long splitStart = ((FileSplit) oldSplit).getStart();
+ final long splitLength = ((FileSplit) oldSplit).getLength();
+ for (final BlockMetaData block : blocks) {
+ final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
+ if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) {
+ splitGroup.add(block);
+ }
+ }
+
+ if (splitGroup.isEmpty()) {
+ LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit);
+ split = null;
+ } else {
+ split = new ParquetInputSplit(finalPath,
+ splitStart,
+ splitLength,
+ ((FileSplit) oldSplit).getLocations(),
+ splitGroup,
+ readContext.getRequestedSchema().toString(),
+ fileMetaData.getSchema().toString(),
+ fileMetaData.getKeyValueMetaData(),
+ readContext.getReadSupportMetadata());
+ }
+
+ } else {
+ throw new IllegalArgumentException("Unknown split type: " + oldSplit);
+ }
+
+ return split;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/AbstractParquetMapInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/AbstractParquetMapInspector.java
new file mode 100644
index 0000000..7e00c4b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/AbstractParquetMapInspector.java
@@ -0,0 +1,162 @@
+/**
+ *
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.serde;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableMapObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+
+public abstract class AbstractParquetMapInspector implements SettableMapObjectInspector {
+
+ protected final ObjectInspector keyInspector;
+ protected final ObjectInspector valueInspector;
+
+ public AbstractParquetMapInspector(final ObjectInspector keyInspector, final ObjectInspector valueInspector) {
+ this.keyInspector = keyInspector;
+ this.valueInspector = valueInspector;
+ }
+
+ @Override
+ public String getTypeName() {
+ return "map<" + keyInspector.getTypeName() + "," + valueInspector.getTypeName() + ">";
+ }
+
+ @Override
+ public Category getCategory() {
+ return Category.MAP;
+ }
+
+ @Override
+ public ObjectInspector getMapKeyObjectInspector() {
+ return keyInspector;
+ }
+
+ @Override
+ public ObjectInspector getMapValueObjectInspector() {
+ return valueInspector;
+ }
+
+ @Override
+ public Map, ?> getMap(final Object data) {
+ if (data == null) {
+ return null;
+ }
+
+ if (data instanceof ArrayWritable) {
+ final Writable[] mapContainer = ((ArrayWritable) data).get();
+
+ if (mapContainer == null || mapContainer.length == 0) {
+ return null;
+ }
+
+ final Writable[] mapArray = ((ArrayWritable) mapContainer[0]).get();
+ final Map map = new HashMap();
+
+ for (final Writable obj : mapArray) {
+ final ArrayWritable mapObj = (ArrayWritable) obj;
+ final Writable[] arr = mapObj.get();
+ map.put(arr[0], arr[1]);
+ }
+
+ return map;
+ }
+
+ if (data instanceof Map) {
+ return (Map) data;
+ }
+
+ throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName());
+ }
+
+ @Override
+ public int getMapSize(final Object data) {
+ if (data == null) {
+ return -1;
+ }
+
+ if (data instanceof ArrayWritable) {
+ final Writable[] mapContainer = ((ArrayWritable) data).get();
+
+ if (mapContainer == null || mapContainer.length == 0) {
+ return -1;
+ } else {
+ return ((ArrayWritable) mapContainer[0]).get().length;
+ }
+ }
+
+ if (data instanceof Map) {
+ return ((Map) data).size();
+ }
+
+ throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName());
+ }
+
+ @Override
+ public Object create() {
+ Map