diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 5308e2c..2a19ab0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1459,8 +1459,12 @@ public TaskResult pollTasks(Set results) { } } + public boolean isFetchingTable() { + return plan != null && plan.getFetchTask() != null; + } + public boolean getResults(ArrayList res) throws IOException, CommandNeedRetryException { - if (plan != null && plan.getFetchTask() != null) { + if (isFetchingTable()) { FetchTask ft = plan.getFetchTask(); ft.setMaxRows(maxRows); return ft.fetch(res); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java new file mode 100644 index 0000000..25385ba --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.IOException; +import java.util.Properties; + +import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; +import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * serialize row by user specified serde and call toString() to make string type result + */ +public class DefaultFetchFormatter implements FetchFormatter { + + private SerDe mSerde; + + @Override + public void initialize(Configuration hconf, Properties props) throws HiveException { + try { + mSerde = initializeSerde(hconf, props); + } catch (Exception e) { + throw new HiveException(e); + } + } + + private SerDe initializeSerde(Configuration conf, Properties props) throws Exception { + String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE); + Class serdeClass = Class.forName(serdeName, true, + JavaUtils.getClassLoader()).asSubclass(SerDe.class); + // cast only needed for Hadoop 0.17 compatibility + SerDe serde = ReflectionUtils.newInstance(serdeClass, null); + + Properties serdeProps = new Properties(); + if (serde instanceof DelimitedJSONSerDe) { + serdeProps.put(SERIALIZATION_FORMAT, props.getProperty(SERIALIZATION_FORMAT)); + serdeProps.put(SERIALIZATION_NULL_FORMAT, props.getProperty(SERIALIZATION_NULL_FORMAT)); + } + serde.initialize(conf, serdeProps); + return serde; + } + + @Override + public String convert(Object row, ObjectInspector rowOI) throws Exception { + return mSerde.serialize(row, rowOI).toString(); + } + + @Override + public void close() throws IOException { + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java new file mode 100644 index 0000000..ff095b2 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * internal-use only + * + * Used in ListSinkOperator for formatting final output + */ +public interface FetchFormatter extends Closeable { + + void initialize(Configuration hconf, Properties props) throws Exception; + + T convert(Object row, ObjectInspector rowOI) throws Exception; + + public static class AsIs implements FetchFormatter { + + @Override + public void initialize(Configuration hconf, Properties props) throws Exception { + } + + @Override + public Object convert(Object row, ObjectInspector rowOI) throws Exception { + return row; + } + + @Override + public void close() throws IOException { + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 343f760..f4c2c3a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -121,14 +121,32 @@ public void setMaxRows(int maxRows) { this.maxRows = maxRows; } - @Override + private final ArrayList convey = new ArrayList(1); + + public Object fetchRow() throws IOException, CommandNeedRetryException { + convey.clear(); + sink.reset(convey); + if (fetch(1)) { + return convey.get(0); + } + return null; + } + + public ObjectInspector getOutputOI() { + return sink.getOutputObjInspector(); + } + public boolean fetch(ArrayList res) throws IOException, CommandNeedRetryException { sink.reset(res); + int rowsRet = work.getLeastNumRows(); + if (rowsRet <= 0) { + rowsRet = work.getLimit() >= 0 ? Math.min(work.getLimit() - totalRows, maxRows) : maxRows; + } + return fetch(rowsRet); + } + + private boolean fetch(int rowsRet) throws IOException, CommandNeedRetryException { try { - int rowsRet = work.getLeastNumRows(); - if (rowsRet <= 0) { - rowsRet = work.getLimit() >= 0 ? Math.min(work.getLimit() - totalRows, maxRows) : maxRows; - } if (rowsRet <= 0) { fetch.clearFetchContext(); return false; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java index 0174bcf..6cc2a7f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java @@ -23,14 +23,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ListSinkDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.util.ReflectionUtils; /** @@ -39,45 +35,43 @@ */ public class ListSinkOperator extends Operator { - private transient SerDe mSerde; + public static final String OUTPUT_FORMATTER = "output.formatter"; - private transient ArrayList res; + private transient ArrayList res; + private transient FetchFormatter fetcher; private transient int numRows; @Override protected void initializeOp(Configuration hconf) throws HiveException { try { - mSerde = initializeSerde(hconf); + fetcher = initializefetcher(hconf); initializeChildren(hconf); } catch (Exception e) { throw new HiveException(e); } } - private SerDe initializeSerde(Configuration conf) throws Exception { - String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE); - Class serdeClass = Class.forName(serdeName, true, - JavaUtils.getClassLoader()).asSubclass(SerDe.class); - // cast only needed for Hadoop 0.17 compatibility - SerDe serde = ReflectionUtils.newInstance(serdeClass, null); - - Properties serdeProp = new Properties(); - - // this is the default serialization format - if (serde instanceof DelimitedJSONSerDe) { - serdeProp.put(serdeConstants.SERIALIZATION_FORMAT, "" + Utilities.tabCode); - serdeProp.put(serdeConstants.SERIALIZATION_NULL_FORMAT, getConf().getSerializationNullFormat()); + private FetchFormatter initializefetcher(Configuration conf) throws Exception { + String converterName = conf.get(OUTPUT_FORMATTER); + FetchFormatter fetcher; + if (converterName != null) { + Class fetcherClass = Class.forName(converterName, true, + JavaUtils.getClassLoader()).asSubclass(FetchFormatter.class); + fetcher = ReflectionUtils.newInstance(fetcherClass, null); + } else { + fetcher = new DefaultFetchFormatter(); } - serde.initialize(conf, serdeProp); - return serde; - } - public ListSinkOperator initialize(SerDe mSerde) { - this.mSerde = mSerde; - return this; + // selectively used by fetch formatter + Properties props = new Properties(); + props.put(serdeConstants.SERIALIZATION_FORMAT, "" + Utilities.tabCode); + props.put(serdeConstants.SERIALIZATION_NULL_FORMAT, getConf().getSerializationNullFormat()); + + fetcher.initialize(conf, props); + return fetcher; } - public void reset(ArrayList res) { + public void reset(ArrayList res) { this.res = res; this.numRows = 0; } @@ -88,9 +82,9 @@ public int getNumRows() { public void processOp(Object row, int tag) throws HiveException { try { - res.add(mSerde.serialize(row, outputObjInspector).toString()); + res.add(fetcher.convert(row, outputObjInspector)); numRows++; - } catch (SerDeException e) { + } catch (Exception e) { throw new HiveException(e); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 3b15667..9ebc14c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -461,6 +461,10 @@ public void setInputObjInspectors(ObjectInspector[] inputObjInspectors) { this.inputObjInspectors = inputObjInspectors; } + public ObjectInspector getOutputObjInspector() { + return outputObjInspector; + } + /** * Process the row. * diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 128ce77..58b32b0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -166,12 +166,6 @@ public int executeTask() { */ protected abstract int execute(DriverContext driverContext); - // dummy method - FetchTask overwrites this - public boolean fetch(ArrayList res) throws IOException, CommandNeedRetryException { - assert false; - return false; - } - public void setChildTasks(List> childTasks) { this.childTasks = childTasks; } diff --git service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index bb0f711..7fe4f48 100644 --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -34,6 +34,9 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.ExplainTask; +import org.apache.hadoop.hive.ql.exec.FetchFormatter; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.parse.VariableSubstitution; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -45,6 +48,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; @@ -69,14 +73,17 @@ private final boolean runAsync; private Future backgroundHandle; + private HiveConf hiveConf; + public SQLOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runInBackground) { // TODO: call setRemoteUser in ExecuteStatementOperation or higher. super(parentSession, statement, confOverlay); this.runAsync = runInBackground; + this.hiveConf = new HiveConf(getParentSession().getHiveConf()); + hiveConf.set(ListSinkOperator.OUTPUT_FORMATTER, FetchFormatter.AsIs.class.getName()); } - public void prepare() throws HiveSQLException { } @@ -91,13 +98,13 @@ private void runInternal() throws HiveSQLException { String SQLState = null; try { - driver = new Driver(getParentSession().getHiveConf()); + driver = new Driver(hiveConf); // In Hive server mode, we are not able to retry in the FetchTask // case, when calling fetch queries since execute() has returned. // For now, we disable the test attempts. driver.setTryCount(Integer.MAX_VALUE); - String subStatement = new VariableSubstitution().substitute(getParentSession().getHiveConf(), statement); + String subStatement = new VariableSubstitution().substitute(hiveConf, statement); response = driver.run(subStatement); if (0 != response.getResponseCode()) { @@ -210,31 +217,12 @@ public TableSchema getResultSetSchema() throws HiveSQLException { @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { assertState(OperationState.FINISHED); - ArrayList rows = new ArrayList(); - driver.setMaxRows((int)maxRows); - try { - driver.getResults(rows); - - getSerDe(); - StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector(); - List fieldRefs = soi.getAllStructFieldRefs(); - RowSet rowSet = new RowSet(); - - Object[] deserializedFields = new Object[fieldRefs.size()]; - Object rowObj; - ObjectInspector fieldOI; - - for (String rowString : rows) { - rowObj = serde.deserialize(new BytesWritable(rowString.getBytes())); - for (int i = 0; i < fieldRefs.size(); i++) { - StructField fieldRef = fieldRefs.get(i); - fieldOI = fieldRef.getFieldObjectInspector(); - deserializedFields[i] = convertLazyToJava(soi.getStructFieldData(rowObj, fieldRef), fieldOI); - } - rowSet.addRow(resultSchema, deserializedFields); + if (driver.isFetchingTable()) { + FetchTask fetchTask = driver.getPlan().getFetchTask(); + return prepareFromRow(fetchTask, maxRows); } - return rowSet; + return prepareFromString(maxRows); } catch (IOException e) { throw new HiveSQLException(e); } catch (CommandNeedRetryException e) { @@ -244,27 +232,78 @@ public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws H } } + private transient StructObjectInspector soi; + private transient List fieldRefs; + + private RowSet prepareFromRow(FetchTask fetchTask, long maxRows) throws Exception { + if (fieldRefs == null) { + soi = (StructObjectInspector) fetchTask.getOutputOI(); + fieldRefs = soi.getAllStructFieldRefs(); + } + RowSet rowSet = new RowSet(); + Object[] deserializedFields = new Object[fieldRefs.size()]; + while (maxRows-- > 0) { + Object rowObj = fetchTask.fetchRow(); + if (rowObj == null) { + break; + } + for (int i = 0; i < fieldRefs.size(); i++) { + StructField fieldRef = fieldRefs.get(i); + ObjectInspector fieldOI = fieldRef.getFieldObjectInspector(); + deserializedFields[i] = convertToJava(soi.getStructFieldData(rowObj, fieldRef), fieldOI); + } + rowSet.addRow(resultSchema, deserializedFields); + } + return rowSet; + } + + private RowSet prepareFromString(long maxRows) throws Exception { + driver.setMaxRows((int) maxRows); + ArrayList rows = new ArrayList(); + driver.getResults(rows); + + getSerDe(); + StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector(); + List fieldRefs = soi.getAllStructFieldRefs(); + RowSet rowSet = new RowSet(); + + Object[] deserializedFields = new Object[fieldRefs.size()]; + Object rowObj; + ObjectInspector fieldOI; + + for (String rowString : rows) { + rowObj = serde.deserialize(new BytesWritable(rowString.getBytes())); + for (int i = 0; i < fieldRefs.size(); i++) { + StructField fieldRef = fieldRefs.get(i); + fieldOI = fieldRef.getFieldObjectInspector(); + deserializedFields[i] = convertToJava(soi.getStructFieldData(rowObj, fieldRef), fieldOI); + } + rowSet.addRow(resultSchema, deserializedFields); + } + return rowSet; + } + /** - * Convert a LazyObject to a standard Java object in compliance with JDBC 3.0 (see JDBC 3.0 + * Convert a Object to a standard Java object in compliance with JDBC 3.0 (see JDBC 3.0 * Specification, Table B-3: Mapping from JDBC Types to Java Object Types). * * This method is kept consistent with {@link HiveResultSetMetaData#hiveTypeToSqlType}. */ - private static Object convertLazyToJava(Object o, ObjectInspector oi) { - Object obj = ObjectInspectorUtils.copyToStandardObject(o, oi, ObjectInspectorCopyOption.JAVA); - - if (obj == null) { + private static Object convertToJava(Object o, ObjectInspector oi) { + if (o == null) { return null; } - if(oi.getTypeName().equals(serdeConstants.BINARY_TYPE_NAME)) { - return new String((byte[])obj); + if (oi.getCategory() == ObjectInspector.Category.PRIMITIVE) { + Object obj = ObjectInspectorUtils.copyToStandardObject(o, oi, ObjectInspectorCopyOption.JAVA); + if (((PrimitiveObjectInspector)oi).getPrimitiveCategory() == + PrimitiveObjectInspector.PrimitiveCategory.BINARY) { + return new String((byte[])obj); + } + return obj; } // for now, expose non-primitive as a string // TODO: expose non-primitive as a structured object while maintaining JDBC compliance - if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE) { - return SerDeUtils.getJSONString(o, oi); - } - return obj; + return SerDeUtils.getJSONString(o, oi); }