diff --git ql/src/java/org/apache/hadoop/hive/ql/Driver.java ql/src/java/org/apache/hadoop/hive/ql/Driver.java index c228189..62fc150 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1413,6 +1413,7 @@ public int execute() throws CommandNeedRetryException { try { SessionState.get().getHiveHistory().logPlanProgress(plan); } catch (Exception e) { + // ignore } } console.printInfo("OK"); @@ -1480,7 +1481,6 @@ public void launchTask(Task tsk, String queryId, boolean tskRun.runSequential(); } running.put(tskRes, tskRun); - return; } /** @@ -1524,14 +1524,17 @@ public TaskResult pollTasks(Set results) { Thread.sleep(SLEEP_TIME); } catch (InterruptedException ie) { // Do Nothing - ; } resultIterator = results.iterator(); } } - public boolean getResults(ArrayList res) throws IOException, CommandNeedRetryException { - if (plan != null && plan.getFetchTask() != null) { + public boolean isFetchingTable() { + return plan != null && plan.getFetchTask() != null; + } + + public boolean getResults(List res) throws IOException, CommandNeedRetryException { + if (isFetchingTable()) { FetchTask ft = plan.getFetchTask(); ft.setMaxRows(maxRows); return ft.fetch(res); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java new file mode 100644 index 0000000..25385ba --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.IOException; +import java.util.Properties; + +import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; +import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * serialize row by user specified serde and call toString() to make string type result + */ +public class DefaultFetchFormatter implements FetchFormatter { + + private SerDe mSerde; + + @Override + public void initialize(Configuration hconf, Properties props) throws HiveException { + try { + mSerde = initializeSerde(hconf, props); + } catch (Exception e) { + throw new HiveException(e); + } + } + + private SerDe initializeSerde(Configuration conf, Properties props) throws Exception { + String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE); + Class serdeClass = Class.forName(serdeName, true, + JavaUtils.getClassLoader()).asSubclass(SerDe.class); + // cast only needed for Hadoop 0.17 compatibility + SerDe serde = ReflectionUtils.newInstance(serdeClass, null); + + Properties serdeProps = new Properties(); + if (serde instanceof DelimitedJSONSerDe) { + serdeProps.put(SERIALIZATION_FORMAT, props.getProperty(SERIALIZATION_FORMAT)); + serdeProps.put(SERIALIZATION_NULL_FORMAT, props.getProperty(SERIALIZATION_NULL_FORMAT)); + } + serde.initialize(conf, serdeProps); + return serde; + } + + @Override + public String convert(Object row, ObjectInspector rowOI) throws Exception { + return mSerde.serialize(row, rowOI).toString(); + } + + @Override + public void close() throws IOException { + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java new file mode 100644 index 0000000..d369e0e --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +/** + * internal-use only + * + * Used in ListSinkOperator for formatting final output + */ +public interface FetchFormatter extends Closeable { + + void initialize(Configuration hconf, Properties props) throws Exception; + + T convert(Object row, ObjectInspector rowOI) throws Exception; + + public static class ThriftFormatter implements FetchFormatter { + + @Override + public void initialize(Configuration hconf, Properties props) throws Exception { + } + + @Override + public Object convert(Object row, ObjectInspector rowOI) throws Exception { + StructObjectInspector structOI = (StructObjectInspector) rowOI; + List fields = structOI.getAllStructFieldRefs(); + + Object[] converted = new Object[fields.size()]; + for (int i = 0 ; i < converted.length; i++) { + StructField fieldRef = fields.get(i); + Object field = structOI.getStructFieldData(row, fieldRef); + converted[i] = field == null ? null : + SerDeUtils.toThriftPayload(field, fieldRef.getFieldObjectInspector()); + } + return converted; + } + + @Override + public void close() throws IOException { + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 3021328..770d904 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; @@ -122,14 +121,13 @@ public void setMaxRows(int maxRows) { this.maxRows = maxRows; } - @Override - public boolean fetch(ArrayList res) throws IOException, CommandNeedRetryException { + public boolean fetch(List res) throws IOException, CommandNeedRetryException { sink.reset(res); + int rowsRet = work.getLeastNumRows(); + if (rowsRet <= 0) { + rowsRet = work.getLimit() >= 0 ? Math.min(work.getLimit() - totalRows, maxRows) : maxRows; + } try { - int rowsRet = work.getLeastNumRows(); - if (rowsRet <= 0) { - rowsRet = work.getLimit() >= 0 ? Math.min(work.getLimit() - totalRows, maxRows) : maxRows; - } if (rowsRet <= 0) { fetch.clearFetchContext(); return false; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java index 0174bcf..8e59fff 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java @@ -18,19 +18,15 @@ package org.apache.hadoop.hive.ql.exec; -import java.util.ArrayList; +import java.util.List; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ListSinkDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.util.ReflectionUtils; /** @@ -39,45 +35,43 @@ */ public class ListSinkOperator extends Operator { - private transient SerDe mSerde; + public static final String OUTPUT_FORMATTER = "output.formatter"; - private transient ArrayList res; + private transient List res; + private transient FetchFormatter fetcher; private transient int numRows; @Override protected void initializeOp(Configuration hconf) throws HiveException { try { - mSerde = initializeSerde(hconf); - initializeChildren(hconf); + fetcher = initializefetcher(hconf); } catch (Exception e) { throw new HiveException(e); } + super.initializeOp(hconf); } - private SerDe initializeSerde(Configuration conf) throws Exception { - String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE); - Class serdeClass = Class.forName(serdeName, true, - JavaUtils.getClassLoader()).asSubclass(SerDe.class); - // cast only needed for Hadoop 0.17 compatibility - SerDe serde = ReflectionUtils.newInstance(serdeClass, null); - - Properties serdeProp = new Properties(); - - // this is the default serialization format - if (serde instanceof DelimitedJSONSerDe) { - serdeProp.put(serdeConstants.SERIALIZATION_FORMAT, "" + Utilities.tabCode); - serdeProp.put(serdeConstants.SERIALIZATION_NULL_FORMAT, getConf().getSerializationNullFormat()); + private FetchFormatter initializefetcher(Configuration conf) throws Exception { + String converterName = conf.get(OUTPUT_FORMATTER); + FetchFormatter fetcher; + if (converterName != null) { + Class fetcherClass = Class.forName(converterName, true, + JavaUtils.getClassLoader()).asSubclass(FetchFormatter.class); + fetcher = ReflectionUtils.newInstance(fetcherClass, null); + } else { + fetcher = new DefaultFetchFormatter(); } - serde.initialize(conf, serdeProp); - return serde; - } - public ListSinkOperator initialize(SerDe mSerde) { - this.mSerde = mSerde; - return this; + // selectively used by fetch formatter + Properties props = new Properties(); + props.put(serdeConstants.SERIALIZATION_FORMAT, "" + Utilities.tabCode); + props.put(serdeConstants.SERIALIZATION_NULL_FORMAT, getConf().getSerializationNullFormat()); + + fetcher.initialize(conf, props); + return fetcher; } - public void reset(ArrayList res) { + public void reset(List res) { this.res = res; this.numRows = 0; } @@ -88,9 +82,9 @@ public int getNumRows() { public void processOp(Object row, int tag) throws HiveException { try { - res.add(mSerde.serialize(row, outputObjInspector).toString()); + res.add(fetcher.convert(row, inputObjInspectors[0])); numRows++; - } catch (SerDeException e) { + } catch (Exception e) { throw new HiveException(e); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index c5fc529..85d19b9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -450,6 +450,10 @@ public void setInputObjInspectors(ObjectInspector[] inputObjInspectors) { this.inputObjInspectors = inputObjInspectors; } + public ObjectInspector getOutputObjInspector() { + return outputObjInspector; + } + /** * Process the row. * diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 5aa7f06..655395c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -30,7 +30,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.lib.Node; @@ -89,7 +88,7 @@ public static enum FeedType { DYNAMIC_PARTITIONS, // list of dynamic partitions - }; + } // Bean methods @@ -168,12 +167,6 @@ public int executeTask() { */ protected abstract int execute(DriverContext driverContext); - // dummy method - FetchTask overwrites this - public boolean fetch(ArrayList res) throws IOException, CommandNeedRetryException { - assert false; - return false; - } - public boolean isRootTask() { return rootTask; } @@ -251,8 +244,6 @@ public void removeFromChildrenTasks() { childTsk.removeFromChildrenTasks(); } } - - return; } diff --git serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java index ea007ee..4938dab 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -191,6 +192,28 @@ public static String lightEscapeString(String str) { return (escape.toString()); } + /** + * Convert a Object to a standard Java object in compliance with JDBC 3.0 (see JDBC 3.0 + * Specification, Table B-3: Mapping from JDBC Types to Java Object Types). + * + * This method is kept consistent with {@link HiveResultSetMetaData#hiveTypeToSqlType}. + */ + public static Object toThriftPayload(Object row, ObjectInspector rowOI) { + if (rowOI.getCategory() == ObjectInspector.Category.PRIMITIVE) { + Object obj = ObjectInspectorUtils.copyToStandardObject(row, rowOI, + ObjectInspectorUtils.ObjectInspectorCopyOption.JAVA); + if (((PrimitiveObjectInspector)rowOI).getPrimitiveCategory() == + PrimitiveObjectInspector.PrimitiveCategory.BINARY) { + // todo HIVE-5269 + return new String((byte[])obj); + } + return obj; + } + // for now, expose non-primitive as a string + // TODO: expose non-primitive as a structured object while maintaining JDBC compliance + return SerDeUtils.getJSONString(row, rowOI); + } + public static String getJSONString(Object o, ObjectInspector oi) { return getJSONString(o, oi, JSON_NULL); } diff --git service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index 156f609..296f8b3 100644 --- service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.sql.SQLException; import java.util.ArrayList; -import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -41,11 +40,10 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; @@ -226,75 +224,75 @@ public TableSchema getResultSetSchema() throws HiveSQLException { return resultSchema; } + private transient final List convey = new ArrayList(); @Override public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { - assertState(OperationState.FINISHED); validateDefaultFetchOrientation(orientation); - ArrayList rows = new ArrayList(); - driver.setMaxRows((int)maxRows); + assertState(OperationState.FINISHED); try { /* if client is requesting fetch-from-start and its not the first time reading from this operation - * then reset the fetch position to beginging + * then reset the fetch position to beginning */ if (orientation.equals(FetchOrientation.FETCH_FIRST) && fetchStarted) { driver.resetFetch(); } fetchStarted = true; - driver.getResults(rows); - - getSerDe(); - StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector(); - List fieldRefs = soi.getAllStructFieldRefs(); - RowSet rowSet = new RowSet(); - - Object[] deserializedFields = new Object[fieldRefs.size()]; - Object rowObj; - ObjectInspector fieldOI; - - for (String rowString : rows) { - rowObj = serde.deserialize(new BytesWritable(rowString.getBytes())); - for (int i = 0; i < fieldRefs.size(); i++) { - StructField fieldRef = fieldRefs.get(i); - fieldOI = fieldRef.getFieldObjectInspector(); - deserializedFields[i] = convertLazyToJava(soi.getStructFieldData(rowObj, fieldRef), fieldOI); - } - rowSet.addRow(resultSchema, deserializedFields); + driver.setMaxRows((int) maxRows); + if (driver.getResults(convey)) { + return decode(convey); } - return rowSet; + return new RowSet(); } catch (IOException e) { throw new HiveSQLException(e); } catch (CommandNeedRetryException e) { throw new HiveSQLException(e); } catch (Exception e) { throw new HiveSQLException(e); + } finally { + convey.clear(); } } - /** - * Convert a LazyObject to a standard Java object in compliance with JDBC 3.0 (see JDBC 3.0 - * Specification, Table B-3: Mapping from JDBC Types to Java Object Types). - * - * This method is kept consistent with {@link HiveResultSetMetaData#hiveTypeToSqlType}. - */ - private static Object convertLazyToJava(Object o, ObjectInspector oi) { - Object obj = ObjectInspectorUtils.copyToStandardObject(o, oi, ObjectInspectorCopyOption.JAVA); - - if (obj == null) { - return null; + private RowSet decode(List rows) throws Exception { + if (driver.isFetchingTable()) { + return prepareFromRow(rows); } - if(oi.getTypeName().equals(serdeConstants.BINARY_TYPE_NAME)) { - return new String((byte[])obj); - } - // for now, expose non-primitive as a string - // TODO: expose non-primitive as a structured object while maintaining JDBC compliance - if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE) { - return SerDeUtils.getJSONString(o, oi); + return decodeFromString(rows); + } + + // already encoded to thrift-able object in ThriftFormatter + private RowSet prepareFromRow(List rows) throws Exception { + RowSet rowSet = new RowSet(); + for (Object row : rows) { + rowSet.addRow(resultSchema, (Object[]) row); } - return obj; + return rowSet; } + private RowSet decodeFromString(List rows) throws SQLException, SerDeException { + getSerDe(); + StructObjectInspector soi = (StructObjectInspector) serde.getObjectInspector(); + List fieldRefs = soi.getAllStructFieldRefs(); + RowSet rowSet = new RowSet(); + + Object[] deserializedFields = new Object[fieldRefs.size()]; + Object rowObj; + ObjectInspector fieldOI; + + for (Object rowString : rows) { + rowObj = serde.deserialize(new BytesWritable(((String)rowString).getBytes())); + for (int i = 0; i < fieldRefs.size(); i++) { + StructField fieldRef = fieldRefs.get(i); + fieldOI = fieldRef.getFieldObjectInspector(); + Object fieldData = soi.getStructFieldData(rowObj, fieldRef); + deserializedFields[i] = SerDeUtils.toThriftPayload(fieldData, fieldOI); + } + rowSet.addRow(resultSchema, deserializedFields); + } + return rowSet; + } private SerDe getSerDe() throws SQLException { if (serde != null) { @@ -302,8 +300,6 @@ private SerDe getSerDe() throws SQLException { } try { List fieldSchemas = mResultSchema.getFieldSchemas(); - List columnNames = new ArrayList(); - List columnTypes = new ArrayList(); StringBuilder namesSb = new StringBuilder(); StringBuilder typesSb = new StringBuilder(); @@ -313,8 +309,6 @@ private SerDe getSerDe() throws SQLException { namesSb.append(","); typesSb.append(","); } - columnNames.add(fieldSchemas.get(pos).getName()); - columnTypes.add(fieldSchemas.get(pos).getType()); namesSb.append(fieldSchemas.get(pos).getName()); typesSb.append(fieldSchemas.get(pos).getType()); } diff --git service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index cfda752..c0e6151 100644 --- service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.exec.FetchFormatter; +import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.history.HiveHistory; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.common.util.HiveVersionInfo; @@ -88,6 +90,9 @@ public HiveSessionImpl(String username, String password, Map ses // set an explicit session name to control the download directory name hiveConf.set(ConfVars.HIVESESSIONID.varname, sessionHandle.getHandleIdentifier().toString()); + // use thrift transportable formatter + hiveConf.set(ListSinkOperator.OUTPUT_FORMATTER, + FetchFormatter.ThriftFormatter.class.getName()); sessionState = new SessionState(hiveConf); SessionState.start(sessionState); }