diff --git a/beeline/src/java/org/apache/hive/beeline/Commands.java b/beeline/src/java/org/apache/hive/beeline/Commands.java index 22e2066..0178333 100644 --- a/beeline/src/java/org/apache/hive/beeline/Commands.java +++ b/beeline/src/java/org/apache/hive/beeline/Commands.java @@ -1179,7 +1179,7 @@ public void run() { private void showRemainingLogsIfAny(Statement statement) { if (statement instanceof HiveStatement) { HiveStatement hiveStatement = (HiveStatement) statement; - List logs; + List logs = null; do { try { logs = hiveStatement.getQueryLog(); diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5cf1609..c52b9d9 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2037,6 +2037,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.SECONDS), "Number of seconds a request will wait to acquire the compile lock before giving up. " + "Setting it to 0s disables the timeout."), + // HiveServer2 WebUI HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"), HIVE_SERVER2_WEBUI_PORT("hive.server2.webui.port", 10002, "The port the HiveServer2 WebUI will listen on. This can be" @@ -2167,6 +2168,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal new TimeValidator(TimeUnit.SECONDS), "Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, " + "excessive threads are killed after this time interval."), + // Configuration for async thread pool in SessionManager HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100, "Number of threads in the async thread pool for HiveServer2"), @@ -2330,6 +2332,14 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SERVER2_THRIFT_CLIENT_PASSWORD("hive.server2.thrift.client.password", "anonymous","Password to use against " + "thrift client"), + // ResultSet serialization settings + HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS("hive.server2.thrift.resultset.serialize.in.tasks", false, + "Whether we should serialize the Thrift structures used in JDBC ResultSet RPC in task nodes.\n " + + "We use SequenceFile and ThriftJDBCBinarySerDe to read and write the final results if this is true."), + // TODO: Make use of this config to configure fetch size + HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE("hive.server2.thrift.resultset.max.fetch.size", 1000, + "Max number of rows sent in one Fetch RPC call by the server to the client."), + HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), @@ -3646,6 +3656,7 @@ private static String getSQLStdAuthDefaultWhiteListPattern() { ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES.varname, ConfVars.HIVE_STATS_COLLECT_PART_LEVEL_STATS.varname, ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL.varname, + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS.varname, ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname, ConfVars.JOB_DEBUG_CAPTURE_STACKTRACES.varname, ConfVars.JOB_DEBUG_TIMEOUT.varname, diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 10c8ff2..815ccfa 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -431,6 +431,112 @@ private void verifyConfProperty(Statement stmt, String property, } } + private void setSerializeInTasksInConf(HiveConf conf) { + conf.setBoolean("hive.server2.thrift.resultset.serialize.in.tasks", true); + conf.setInt("hive.server2.thrift.resultset.max.fetch.size", 1000); + } + + @Test + public void testMetadataQueriesWithSerializeThriftInTasks() throws Exception { + //stop HiveServer2 + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + + HiveConf conf = new HiveConf(); + String userName; + setSerializeInTasksInConf(conf); + miniHS2 = new MiniHS2(conf); + Map confOverlay = new HashMap(); + miniHS2.start(confOverlay); + + userName = System.getProperty("user.name"); + hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); + Statement stmt = hs2Conn.createStatement(); + stmt.execute("drop table if exists testThriftSerializeShow"); + stmt.execute("create table testThriftSerializeShow (a int)"); + ResultSet rs = stmt.executeQuery("show tables"); + assertTrue(rs.next()); + stmt.execute("describe testThriftSerializeShow"); + stmt.execute("explain select a from testThriftSerializeShow"); + stmt.execute("drop table testThriftSerializeShow"); + stmt.close(); + } + + @Test + public void testSelectThriftSerializeInTasks() throws Exception { + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + + HiveConf conf = new HiveConf(); + String userName; + setSerializeInTasksInConf(conf); + miniHS2 = new MiniHS2(conf); + Map confOverlay = new HashMap(); + miniHS2.start(confOverlay); + + userName = System.getProperty("user.name"); + hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); + Statement stmt = hs2Conn.createStatement(); + + stmt.execute("drop table if exists testSelectThriftOrders"); + stmt.execute("drop table if exists testSelectThriftCustomers"); + stmt.execute("create table testSelectThriftOrders (orderid int, orderdate string, customerid int)"); + stmt.execute("create table testSelectThriftCustomers (customerid int, customername string, customercountry string)"); + stmt.execute("insert into testSelectThriftOrders values (1, '2015-09-09', 123), (2, '2015-10-10', 246), (3, '2015-11-11', 356)"); + stmt.execute("insert into testSelectThriftCustomers values (123, 'David', 'America'), (246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')"); + ResultSet countOrders = stmt.executeQuery("select count(*) from testSelectThriftOrders"); + while (countOrders.next()) { + assertEquals(3, countOrders.getInt(1)); + } + ResultSet maxOrders = stmt.executeQuery("select max(customerid) from testSelectThriftCustomers"); + while (maxOrders.next()) { + assertEquals(356, maxOrders.getInt(1)); + } + stmt.execute("drop table testSelectThriftOrders"); + stmt.execute("drop table testSelectThriftCustomers"); + stmt.close(); + } + + @Test + public void testJoinThriftSerializeInTasks() throws Exception { + //stop HiveServer2 + if (miniHS2.isStarted()) { + miniHS2.stop(); + } + HiveConf conf = new HiveConf(); + String userName; + + setSerializeInTasksInConf(conf); + + miniHS2 = new MiniHS2(conf); + Map confOverlay = new HashMap(); + miniHS2.start(confOverlay); + + userName = System.getProperty("user.name"); + hs2Conn = getConnection(miniHS2.getJdbcURL(), userName, "password"); + Statement stmt = hs2Conn.createStatement(); + stmt.execute("drop table if exists testThriftJoinOrders"); + stmt.execute("drop table if exists testThriftJoinCustomers"); + stmt.execute("create table testThriftJoinOrders (orderid int, orderdate string, customerid int)"); + stmt.execute("create table testThriftJoinCustomers (customerid int, customername string, customercountry string)"); + stmt.execute("insert into testThriftJoinOrders values (1, '2015-09-09', 123), (2, '2015-10-10', 246), (3, '2015-11-11', 356)"); + stmt.execute("insert into testThriftJoinCustomers values (123, 'David', 'America'), (246, 'John', 'Canada'), (356, 'Mary', 'CostaRica')"); + ResultSet joinResultSet = stmt.executeQuery("select testThriftJoinOrders.orderid, testThriftJoinCustomers.customername from testThriftJoinOrders inner join testThriftJoinCustomers where testThriftJoinOrders.customerid=testThriftJoinCustomers.customerid"); + Map expectedResult = new HashMap(); + expectedResult.put(1, "David"); + expectedResult.put(2, "John"); + expectedResult.put(3, "Mary"); + for (int i = 1; i < 4; i++) { + assertTrue(joinResultSet.next()); + assertEquals(joinResultSet.getString(2), expectedResult.get(i)); + } + stmt.execute("drop table testThriftJoinOrders"); + stmt.execute("drop table testThriftJoinCustomers"); + stmt.close(); + } + /** * Tests the creation of the 3 scratch dirs: hdfs, local, downloaded resources (which is also local). * 1. Test with doAs=false: open a new JDBC session and verify the presence of directories/permissions @@ -810,4 +916,4 @@ private int getNucleusClassLoaderResolverMapSize() { } return -1; } -} \ No newline at end of file +} diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java index 88ba853..93f093f 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveBaseResultSet.java @@ -46,8 +46,8 @@ import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; +import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hive.service.cli.TableSchema; -import org.apache.hive.service.cli.Type; /** * Data independent base class which implements the common part of diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java b/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java index 16a0894..f6c38d8 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveResultSetMetaData.java @@ -22,7 +22,7 @@ import java.sql.SQLException; import java.util.List; -import org.apache.hive.service.cli.Type; +import org.apache.hadoop.hive.serde2.thrift.Type; /** * HiveResultSetMetaData. diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index f5b9672..3cc6b74 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -35,6 +35,7 @@ import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp; import org.apache.hive.service.rpc.thrift.TOperationHandle; import org.apache.hive.service.rpc.thrift.TSessionHandle; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -881,15 +882,22 @@ public boolean hasMoreLogs() { } } catch (SQLException e) { throw e; + } catch (TException e) { + throw new SQLException("Error when getting query log: " + e, e); } catch (Exception e) { throw new SQLException("Error when getting query log: " + e, e); } - RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), - connection.getProtocol()); - for (Object[] row : rowSet) { - logs.add(String.valueOf(row[0])); + try { + RowSet rowSet; + rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), connection.getProtocol()); + for (Object[] row : rowSet) { + logs.add(String.valueOf(row[0])); + } + } catch (TException e) { + throw new SQLException("Error building result set for query log: " + e, e); } + return logs; } diff --git a/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java b/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java index 691fd0e..5aed679 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java +++ b/jdbc/src/java/org/apache/hive/jdbc/JdbcColumn.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hive.service.cli.Type; +import org.apache.hadoop.hive.serde2.thrift.Type; /** diff --git a/ql/pom.xml b/ql/pom.xml index ebb9599..aaa3271 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -70,6 +70,11 @@ org.apache.hive + hive-service-rpc + ${project.version} + + + org.apache.hive hive-llap-client ${project.version} @@ -803,6 +808,7 @@ org.apache.hive:hive-serde org.apache.hive:hive-llap-client org.apache.hive:hive-metastore + org.apache.hive:hive-service-rpc com.esotericsoftware:kryo-shaded com.esotericsoftware:minlog org.objenesis:objenesis diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 65744ac..48fb060 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -108,6 +108,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; @@ -932,6 +933,13 @@ public QueryPlan getPlan() { return plan; } + /** + * @return The current FetchTask associated with the Driver's plan, if any. + */ + public FetchTask getFetchTask() { + return fetchTask; + } + // Write the current set of valid transactions into the conf file so that it can be read by // the input format. private void recordValidTxns() throws LockException { @@ -1880,6 +1888,17 @@ public boolean getResults(List res) throws IOException, CommandNeedRetryExceptio throw new IOException("FAILED: Operation cancelled"); } if (isFetchingTable()) { + /** + * If resultset serialization to thrift object is enabled, and if the destination table is + * indeed written using ThriftJDBCBinarySerDe, read one row from the output sequence file, + * since it is a blob of row batches. + */ + if (fetchTask.getWork().isHiveServerQuery() && HiveConf.getBoolVar(conf, + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS) + && (fetchTask.getTblDesc().getSerdeClassName() + .equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()))) { + maxRows = 1; + } fetchTask.setMaxRows(maxRows); return fetchTask.fetch(res); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java deleted file mode 100644 index b8be3a5..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DefaultFetchFormatter.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec; - -import java.io.IOException; -import java.util.Properties; - -import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; -import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.DelimitedJSONSerDe; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hive.common.util.ReflectionUtil; - -/** - * serialize row by user specified serde and call toString() to make string type result - */ -public class DefaultFetchFormatter implements FetchFormatter { - - private SerDe mSerde; - - @Override - public void initialize(Configuration hconf, Properties props) throws HiveException { - try { - mSerde = initializeSerde(hconf, props); - } catch (Exception e) { - throw new HiveException(e); - } - } - - private SerDe initializeSerde(Configuration conf, Properties props) throws Exception { - String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE); - Class serdeClass = Class.forName(serdeName, true, - Utilities.getSessionSpecifiedClassLoader()).asSubclass(SerDe.class); - // cast only needed for Hadoop 0.17 compatibility - SerDe serde = ReflectionUtil.newInstance(serdeClass, null); - - Properties serdeProps = new Properties(); - if (serde instanceof DelimitedJSONSerDe) { - serdeProps.put(SERIALIZATION_FORMAT, props.getProperty(SERIALIZATION_FORMAT)); - serdeProps.put(SERIALIZATION_NULL_FORMAT, props.getProperty(SERIALIZATION_NULL_FORMAT)); - } - SerDeUtils.initializeSerDe(serde, conf, serdeProps, null); - return serde; - } - - @Override - public String convert(Object row, ObjectInspector rowOI) throws Exception { - return mSerde.serialize(row, rowOI).toString(); - } - - @Override - public void close() throws IOException { - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java deleted file mode 100644 index c2ed0d6..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchFormatter.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; - -/** - * internal-use only - * - * Used in ListSinkOperator for formatting final output - */ -public interface FetchFormatter extends Closeable { - - void initialize(Configuration hconf, Properties props) throws Exception; - - T convert(Object row, ObjectInspector rowOI) throws Exception; - - public static class ThriftFormatter implements FetchFormatter { - - int protocol; - - @Override - public void initialize(Configuration hconf, Properties props) throws Exception { - protocol = hconf.getInt(ListSinkOperator.OUTPUT_PROTOCOL, 0); - } - - @Override - public Object convert(Object row, ObjectInspector rowOI) throws Exception { - StructObjectInspector structOI = (StructObjectInspector) rowOI; - List fields = structOI.getAllStructFieldRefs(); - - Object[] converted = new Object[fields.size()]; - for (int i = 0 ; i < converted.length; i++) { - StructField fieldRef = fields.get(i); - Object field = structOI.getStructFieldData(row, fieldRef); - converted[i] = field == null ? null : - SerDeUtils.toThriftPayload(field, fieldRef.getFieldObjectInspector(), protocol); - } - return converted; - } - - @Override - public void close() throws IOException { - } - } -} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 0b0c336..b96ea04 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -48,12 +48,10 @@ **/ public class FetchTask extends Task implements Serializable { private static final long serialVersionUID = 1L; - private int maxRows = 100; private FetchOperator fetch; private ListSinkOperator sink; private int totalRows; - private static transient final Logger LOG = LoggerFactory.getLogger(FetchTask.class); public FetchTask() { @@ -186,4 +184,5 @@ public void clearFetch() throws HiveException { fetch.clearFetchContext(); } } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index ec6381b..3ec63ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim; import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue; import org.apache.hadoop.hive.shims.ShimLoader; @@ -122,7 +123,7 @@ protected transient long numRows = 0; protected transient long cntr = 1; protected transient long logEveryNRows = 0; - + protected transient int rowIndex = 0; /** * Counters. */ @@ -374,7 +375,6 @@ protected void initializeOp(Configuration hconf) throws HiveException { // half of the script.timeout but less than script.timeout, we will still // be able to report progress. timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000) / 2; - if (hconf instanceof JobConf) { jc = (JobConf) hconf; } else { @@ -656,6 +656,7 @@ protected boolean updateProgress() { protected Writable recordValue; + @Override public void process(Object row, int tag) throws HiveException { /* Create list bucketing sub-directory only if stored-as-directories is on. */ @@ -717,8 +718,12 @@ public void process(Object row, int tag) throws HiveException { } else { fpaths = fsp; } - // use SerDe to serialize r, and write it out recordValue = serializer.serialize(row, inputObjInspectors[0]); + // if serializer is ThriftJDBCBinarySerDe, then recordValue is null if the buffer is not full (the size of buffer + // is kept track of in the SerDe) + if (recordValue == null) { + return; + } } rowOutWriters = fpaths.outWriters; @@ -745,6 +750,7 @@ public void process(Object row, int tag) throws HiveException { LOG.info(toString() + ": records written - " + numRows); } + // This should always be 0 for the final result file int writerOffset = findWriterOffset(row); // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same // for a given operator branch prediction should work quite nicely on it. @@ -1012,9 +1018,22 @@ public void closeOp(boolean abort) throws HiveException { lastProgressReport = System.currentTimeMillis(); if (!abort) { + // If serializer is ThriftJDBCBinarySerDe, then it buffers rows to a certain limit (hive.server2.thrift.resultset.max.fetch.size) + // and serializes the whole batch when the buffer is full. The serialize returns null if the buffer is not full + // (the size of buffer is kept track of in the ThriftJDBCBinarySerDe). + if (conf.isHiveServerQuery() && HiveConf.getBoolVar(hconf, + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS) && + serializer.getClass().getName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())) { + try { + recordValue = serializer.serialize(null, inputObjInspectors[0]); + rowOutWriters = fpaths.outWriters; + rowOutWriters[0].write(recordValue); + } catch (SerDeException | IOException e) { + throw new HiveException(e); + } + } for (FSPaths fsp : valToPaths.values()) { fsp.closeWriters(abort); - // before closing the operator check if statistics gathering is requested // and is provided by record writer. this is different from the statistics // gathering done in processOp(). In processOp(), for each row added diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java index b081cd0..9bf363c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java @@ -27,6 +27,9 @@ import org.apache.hadoop.hive.ql.plan.ListSinkDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.DefaultFetchFormatter; +import org.apache.hadoop.hive.serde2.FetchFormatter; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.util.ReflectionUtils; /** @@ -34,10 +37,6 @@ * and finally arrives to this operator. */ public class ListSinkOperator extends Operator { - - public static final String OUTPUT_FORMATTER = "output.formatter"; - public static final String OUTPUT_PROTOCOL = "output.protocol"; - private transient List res; private transient FetchFormatter fetcher; private transient int numRows; @@ -62,7 +61,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { } private FetchFormatter initializeFetcher(Configuration conf) throws Exception { - String formatterName = conf.get(OUTPUT_FORMATTER); + String formatterName = conf.get(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER); FetchFormatter fetcher; if (formatterName != null && !formatterName.isEmpty()) { Class fetcherClass = Class.forName(formatterName, true, @@ -71,12 +70,10 @@ private FetchFormatter initializeFetcher(Configuration conf) throws Exception { } else { fetcher = new DefaultFetchFormatter(); } - // selectively used by fetch formatter Properties props = new Properties(); props.put(serdeConstants.SERIALIZATION_FORMAT, "" + Utilities.tabCode); props.put(serdeConstants.SERIALIZATION_NULL_FORMAT, getConf().getSerializationNullFormat()); - fetcher.initialize(conf, props); return fetcher; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java index cf3bbf0..de7b151 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java @@ -263,6 +263,11 @@ public void setIsQuery(boolean isQuery) { this.isQuery = isQuery; } + /** + * Set to true in SemanticAnalyzer.getMetadataForDestFile, + * if destination is a file and query is not CTAS + * @return + */ public boolean getIsQuery() { return isQuery; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 96df189..005b53f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -203,6 +203,7 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; +import org.apache.hadoop.hive.serde2.NoOpFetchFormatter; import org.apache.hadoop.hive.serde2.NullStructSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -214,6 +215,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -6834,8 +6836,23 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) if (tblDesc == null) { if (qb.getIsQuery()) { - String fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); - table_desc = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat); + String fileFormat; + if (SessionState.get().isHiveServerQuery() && + conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) { + fileFormat = "SequenceFile"; + HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, fileFormat); + table_desc= + PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, + ThriftJDBCBinarySerDe.class); + // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll + // write out formatted thrift objects to SequenceFile + conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName()); + } else { + fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); + table_desc = + PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, + LazySimpleSerDe.class); + } } else { table_desc = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes); } @@ -6907,6 +6924,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) dpCtx, dest_path); + fileSinkDesc.setHiveServerQuery(SessionState.get().isHiveServerQuery()); // If this is an insert, update, or delete on an ACID table then mark that so the // FileSinkOperator knows how to properly write to it. if (destTableIsAcid) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index f7d7a40..75ca5f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -58,7 +58,15 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.DefaultFetchFormatter; +import org.apache.hadoop.hive.serde2.NoOpFetchFormatter; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter; +import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; import com.google.common.collect.Interner; import com.google.common.collect.Interners; @@ -97,6 +105,20 @@ public void compile(final ParseContext pCtx, final List serdeClass) { + TableDesc tblDesc = + getTableDesc(serdeClass, "" + Utilities.ctrlaCode, cols, colTypes, false, false, fileFormat); + // enable escaping tblDesc.getProperties().setProperty(serdeConstants.ESCAPE_CHAR, "\\"); tblDesc.getProperties().setProperty(serdeConstants.SERIALIZATION_ESCAPE_CRLF, "true"); - //enable extended nesting levels + // enable extended nesting levels tblDesc.getProperties().setProperty( LazySerDeParameters.SERIALIZATION_EXTEND_ADDITIONAL_NESTING_LEVELS, "true"); return tblDesc; diff --git a/serde/pom.xml b/serde/pom.xml index cea7fce..9f50764 100644 --- a/serde/pom.xml +++ b/serde/pom.xml @@ -41,6 +41,11 @@ org.apache.hive + hive-service-rpc + ${project.version} + + + org.apache.hive hive-shims ${project.version} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java new file mode 100644 index 0000000..3038037 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/DefaultFetchFormatter.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import java.io.IOException; +import java.util.Properties; + +import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_FORMAT; +import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hive.common.util.ReflectionUtil; + +/** + * serialize row by user specified serde and call toString() to make string type result + */ +public class DefaultFetchFormatter implements FetchFormatter { + + private SerDe mSerde; + + @Override + public void initialize(Configuration hconf, Properties props) throws SerDeException { + mSerde = initializeSerde(hconf, props); + } + + private SerDe initializeSerde(Configuration conf, Properties props) throws SerDeException { + String serdeName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEFETCHOUTPUTSERDE); + Class serdeClass; + try { + serdeClass = + Class.forName(serdeName, true, JavaUtils.getClassLoader()).asSubclass(SerDe.class); + } catch (ClassNotFoundException e) { + throw new SerDeException(e); + } + // cast only needed for Hadoop 0.17 compatibility + SerDe serde = ReflectionUtil.newInstance(serdeClass, null); + Properties serdeProps = new Properties(); + if (serde instanceof DelimitedJSONSerDe) { + serdeProps.put(SERIALIZATION_FORMAT, props.getProperty(SERIALIZATION_FORMAT)); + serdeProps.put(SERIALIZATION_NULL_FORMAT, props.getProperty(SERIALIZATION_NULL_FORMAT)); + } + SerDeUtils.initializeSerDe(serde, conf, serdeProps, null); + return serde; + } + + @Override + public String convert(Object row, ObjectInspector rowOI) throws Exception { + return mSerde.serialize(row, rowOI).toString(); + } + + @Override + public void close() throws IOException { + } +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java new file mode 100644 index 0000000..5cc93aa --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/FetchFormatter.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import java.io.Closeable; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * internal-use only + * + * Used in ListSinkOperator for formatting final output + */ +public interface FetchFormatter extends Closeable { + + void initialize(Configuration hconf, Properties props) throws Exception; + + T convert(Object row, ObjectInspector rowOI) throws Exception; +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java new file mode 100644 index 0000000..91929f1 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/NoOpFetchFormatter.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +/** + * A No-op fetch formatter. + * ListSinkOperator uses this when reading from the destination table which has data serialized by + * ThriftJDBCBinarySerDe to a SequenceFile. + */ +public class NoOpFetchFormatter implements FetchFormatter { + + @Override + public void initialize(Configuration hconf, Properties props) throws SerDeException { + } + + // this returns the row as is because this formatter is only called when + // the ThriftJDBCBinarySerDe was used to serialize the rows to thrift-able objects. + @Override + public Object convert(Object row, ObjectInspector rowOI) throws Exception { + return new Object[] { row }; + } + + @Override + public void close() throws IOException { + } +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java index 90439a2..6e08dfd 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java @@ -71,7 +71,8 @@ // lower case null is used within json objects private static final String JSON_NULL = "null"; - + public static final String LIST_SINK_OUTPUT_FORMATTER = "list.sink.output.formatter"; + public static final String LIST_SINK_OUTPUT_PROTOCOL = "list.sink.output.protocol"; public static final Logger LOG = LoggerFactory.getLogger(SerDeUtils.class.getName()); /** diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java new file mode 100644 index 0000000..929c405 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java @@ -0,0 +1,439 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.thrift; + +import java.nio.ByteBuffer; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.List; + +import org.apache.hive.service.rpc.thrift.TBinaryColumn; +import org.apache.hive.service.rpc.thrift.TBoolColumn; +import org.apache.hive.service.rpc.thrift.TByteColumn; +import org.apache.hive.service.rpc.thrift.TColumn; +import org.apache.hive.service.rpc.thrift.TDoubleColumn; +import org.apache.hive.service.rpc.thrift.TI16Column; +import org.apache.hive.service.rpc.thrift.TI32Column; +import org.apache.hive.service.rpc.thrift.TI64Column; +import org.apache.hive.service.rpc.thrift.TStringColumn; + +import com.google.common.primitives.Booleans; +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import com.google.common.primitives.Shorts; + +/** + * ColumnBuffer + */ +public class ColumnBuffer extends AbstractList { + + private static final int DEFAULT_SIZE = 100; + + private final Type type; + + private BitSet nulls; + + private int size; + private boolean[] boolVars; + private byte[] byteVars; + private short[] shortVars; + private int[] intVars; + private long[] longVars; + private double[] doubleVars; + private List stringVars; + private List binaryVars; + + public ColumnBuffer(Type type, BitSet nulls, Object values) { + this.type = type; + this.nulls = nulls; + if (type == Type.BOOLEAN_TYPE) { + boolVars = (boolean[]) values; + size = boolVars.length; + } else if (type == Type.TINYINT_TYPE) { + byteVars = (byte[]) values; + size = byteVars.length; + } else if (type == Type.SMALLINT_TYPE) { + shortVars = (short[]) values; + size = shortVars.length; + } else if (type == Type.INT_TYPE) { + intVars = (int[]) values; + size = intVars.length; + } else if (type == Type.BIGINT_TYPE) { + longVars = (long[]) values; + size = longVars.length; + } else if (type == Type.DOUBLE_TYPE) { + doubleVars = (double[]) values; + size = doubleVars.length; + } else if (type == Type.BINARY_TYPE) { + binaryVars = (List) values; + size = binaryVars.size(); + } else if (type == Type.STRING_TYPE) { + stringVars = (List) values; + size = stringVars.size(); + } else { + throw new IllegalStateException("invalid union object"); + } + } + + public ColumnBuffer(Type type) { + nulls = new BitSet(); + switch (type) { + case BOOLEAN_TYPE: + boolVars = new boolean[DEFAULT_SIZE]; + break; + case TINYINT_TYPE: + byteVars = new byte[DEFAULT_SIZE]; + break; + case SMALLINT_TYPE: + shortVars = new short[DEFAULT_SIZE]; + break; + case INT_TYPE: + intVars = new int[DEFAULT_SIZE]; + break; + case BIGINT_TYPE: + longVars = new long[DEFAULT_SIZE]; + break; + case FLOAT_TYPE: + case DOUBLE_TYPE: + type = Type.DOUBLE_TYPE; + doubleVars = new double[DEFAULT_SIZE]; + break; + case BINARY_TYPE: + binaryVars = new ArrayList(); + break; + default: + type = Type.STRING_TYPE; + stringVars = new ArrayList(); + } + this.type = type; + } + + public ColumnBuffer(TColumn colValues) { + if (colValues.isSetBoolVal()) { + type = Type.BOOLEAN_TYPE; + nulls = toBitset(colValues.getBoolVal().getNulls()); + boolVars = Booleans.toArray(colValues.getBoolVal().getValues()); + size = boolVars.length; + } else if (colValues.isSetByteVal()) { + type = Type.TINYINT_TYPE; + nulls = toBitset(colValues.getByteVal().getNulls()); + byteVars = Bytes.toArray(colValues.getByteVal().getValues()); + size = byteVars.length; + } else if (colValues.isSetI16Val()) { + type = Type.SMALLINT_TYPE; + nulls = toBitset(colValues.getI16Val().getNulls()); + shortVars = Shorts.toArray(colValues.getI16Val().getValues()); + size = shortVars.length; + } else if (colValues.isSetI32Val()) { + type = Type.INT_TYPE; + nulls = toBitset(colValues.getI32Val().getNulls()); + intVars = Ints.toArray(colValues.getI32Val().getValues()); + size = intVars.length; + } else if (colValues.isSetI64Val()) { + type = Type.BIGINT_TYPE; + nulls = toBitset(colValues.getI64Val().getNulls()); + longVars = Longs.toArray(colValues.getI64Val().getValues()); + size = longVars.length; + } else if (colValues.isSetDoubleVal()) { + type = Type.DOUBLE_TYPE; + nulls = toBitset(colValues.getDoubleVal().getNulls()); + doubleVars = Doubles.toArray(colValues.getDoubleVal().getValues()); + size = doubleVars.length; + } else if (colValues.isSetBinaryVal()) { + type = Type.BINARY_TYPE; + nulls = toBitset(colValues.getBinaryVal().getNulls()); + binaryVars = colValues.getBinaryVal().getValues(); + size = binaryVars.size(); + } else if (colValues.isSetStringVal()) { + type = Type.STRING_TYPE; + nulls = toBitset(colValues.getStringVal().getNulls()); + stringVars = colValues.getStringVal().getValues(); + size = stringVars.size(); + } else { + throw new IllegalStateException("invalid union object"); + } + } + + public ColumnBuffer extractSubset(int start, int end) { + BitSet subNulls = nulls.get(start, end); + if (type == Type.BOOLEAN_TYPE) { + ColumnBuffer subset = + new ColumnBuffer(type, subNulls, Arrays.copyOfRange(boolVars, start, end)); + boolVars = Arrays.copyOfRange(boolVars, end, size); + nulls = nulls.get(start, size); + size = boolVars.length; + return subset; + } + if (type == Type.TINYINT_TYPE) { + ColumnBuffer subset = + new ColumnBuffer(type, subNulls, Arrays.copyOfRange(byteVars, start, end)); + byteVars = Arrays.copyOfRange(byteVars, end, size); + nulls = nulls.get(start, size); + size = byteVars.length; + return subset; + } + if (type == Type.SMALLINT_TYPE) { + ColumnBuffer subset = + new ColumnBuffer(type, subNulls, Arrays.copyOfRange(shortVars, start, end)); + shortVars = Arrays.copyOfRange(shortVars, end, size); + nulls = nulls.get(start, size); + size = shortVars.length; + return subset; + } + if (type == Type.INT_TYPE) { + ColumnBuffer subset = + new ColumnBuffer(type, subNulls, Arrays.copyOfRange(intVars, start, end)); + intVars = Arrays.copyOfRange(intVars, end, size); + nulls = nulls.get(start, size); + size = intVars.length; + return subset; + } + if (type == Type.BIGINT_TYPE) { + ColumnBuffer subset = + new ColumnBuffer(type, subNulls, Arrays.copyOfRange(longVars, start, end)); + longVars = Arrays.copyOfRange(longVars, end, size); + nulls = nulls.get(start, size); + size = longVars.length; + return subset; + } + if (type == Type.DOUBLE_TYPE) { + ColumnBuffer subset = + new ColumnBuffer(type, subNulls, Arrays.copyOfRange(doubleVars, start, end)); + doubleVars = Arrays.copyOfRange(doubleVars, end, size); + nulls = nulls.get(start, size); + size = doubleVars.length; + return subset; + } + if (type == Type.BINARY_TYPE) { + ColumnBuffer subset = new ColumnBuffer(type, subNulls, binaryVars.subList(start, end)); + binaryVars = binaryVars.subList(end, binaryVars.size()); + nulls = nulls.get(start, size); + size = binaryVars.size(); + return subset; + } + if (type == Type.STRING_TYPE) { + ColumnBuffer subset = new ColumnBuffer(type, subNulls, stringVars.subList(start, end)); + stringVars = stringVars.subList(end, stringVars.size()); + nulls = nulls.get(start, size); + size = stringVars.size(); + return subset; + } + throw new IllegalStateException("invalid union object"); + } + + private static final byte[] MASKS = new byte[] { + 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, (byte)0x80 + }; + + private static BitSet toBitset(byte[] nulls) { + BitSet bitset = new BitSet(); + int bits = nulls.length * 8; + for (int i = 0; i < bits; i++) { + bitset.set(i, (nulls[i / 8] & MASKS[i % 8]) != 0); + } + return bitset; + } + + private static byte[] toBinary(BitSet bitset) { + byte[] nulls = new byte[1 + (bitset.length() / 8)]; + for (int i = 0; i < bitset.length(); i++) { + nulls[i / 8] |= bitset.get(i) ? MASKS[i % 8] : 0; + } + return nulls; + } + + public Type getType() { + return type; + } + + @Override + public Object get(int index) { + if (nulls.get(index)) { + return null; + } + switch (type) { + case BOOLEAN_TYPE: + return boolVars[index]; + case TINYINT_TYPE: + return byteVars[index]; + case SMALLINT_TYPE: + return shortVars[index]; + case INT_TYPE: + return intVars[index]; + case BIGINT_TYPE: + return longVars[index]; + case DOUBLE_TYPE: + return doubleVars[index]; + case STRING_TYPE: + return stringVars.get(index); + case BINARY_TYPE: + return binaryVars.get(index).array(); + } + return null; + } + + @Override + public int size() { + return size; + } + + public TColumn toTColumn() { + TColumn value = new TColumn(); + ByteBuffer nullMasks = ByteBuffer.wrap(toBinary(nulls)); + switch (type) { + case BOOLEAN_TYPE: + value.setBoolVal(new TBoolColumn(Booleans.asList(Arrays.copyOfRange(boolVars, 0, size)), + nullMasks)); + break; + case TINYINT_TYPE: + value.setByteVal(new TByteColumn(Bytes.asList(Arrays.copyOfRange(byteVars, 0, size)), + nullMasks)); + break; + case SMALLINT_TYPE: + value.setI16Val(new TI16Column(Shorts.asList(Arrays.copyOfRange(shortVars, 0, size)), + nullMasks)); + break; + case INT_TYPE: + value.setI32Val(new TI32Column(Ints.asList(Arrays.copyOfRange(intVars, 0, size)), nullMasks)); + break; + case BIGINT_TYPE: + value + .setI64Val(new TI64Column(Longs.asList(Arrays.copyOfRange(longVars, 0, size)), nullMasks)); + break; + case DOUBLE_TYPE: + value.setDoubleVal(new TDoubleColumn(Doubles.asList(Arrays.copyOfRange(doubleVars, 0, size)), + nullMasks)); + break; + case STRING_TYPE: + value.setStringVal(new TStringColumn(stringVars, nullMasks)); + break; + case BINARY_TYPE: + value.setBinaryVal(new TBinaryColumn(binaryVars, nullMasks)); + break; + } + return value; + } + + private static final ByteBuffer EMPTY_BINARY = ByteBuffer.allocate(0); + private static final String EMPTY_STRING = ""; + + public void addValue(Object field) { + addValue(this.type, field); + } + + public void addValue(Type type, Object field) { + switch (type) { + case BOOLEAN_TYPE: + nulls.set(size, field == null); + boolVars()[size] = field == null ? true : (Boolean) field; + break; + case TINYINT_TYPE: + nulls.set(size, field == null); + byteVars()[size] = field == null ? 0 : (Byte) field; + break; + case SMALLINT_TYPE: + nulls.set(size, field == null); + shortVars()[size] = field == null ? 0 : (Short) field; + break; + case INT_TYPE: + nulls.set(size, field == null); + intVars()[size] = field == null ? 0 : (Integer) field; + break; + case BIGINT_TYPE: + nulls.set(size, field == null); + longVars()[size] = field == null ? 0 : (Long) field; + break; + case FLOAT_TYPE: + nulls.set(size, field == null); + doubleVars()[size] = field == null ? 0 : new Double(field.toString()); + break; + case DOUBLE_TYPE: + nulls.set(size, field == null); + doubleVars()[size] = field == null ? 0 : (Double) field; + break; + case BINARY_TYPE: + nulls.set(binaryVars.size(), field == null); + binaryVars.add(field == null ? EMPTY_BINARY : ByteBuffer.wrap((byte[]) field)); + break; + default: + nulls.set(stringVars.size(), field == null); + stringVars.add(field == null ? EMPTY_STRING : String.valueOf(field)); + break; + } + size++; + } + + private boolean[] boolVars() { + if (boolVars.length == size) { + boolean[] newVars = new boolean[size << 1]; + System.arraycopy(boolVars, 0, newVars, 0, size); + return boolVars = newVars; + } + return boolVars; + } + + private byte[] byteVars() { + if (byteVars.length == size) { + byte[] newVars = new byte[size << 1]; + System.arraycopy(byteVars, 0, newVars, 0, size); + return byteVars = newVars; + } + return byteVars; + } + + private short[] shortVars() { + if (shortVars.length == size) { + short[] newVars = new short[size << 1]; + System.arraycopy(shortVars, 0, newVars, 0, size); + return shortVars = newVars; + } + return shortVars; + } + + private int[] intVars() { + if (intVars.length == size) { + int[] newVars = new int[size << 1]; + System.arraycopy(intVars, 0, newVars, 0, size); + return intVars = newVars; + } + return intVars; + } + + private long[] longVars() { + if (longVars.length == size) { + long[] newVars = new long[size << 1]; + System.arraycopy(longVars, 0, newVars, 0, size); + return longVars = newVars; + } + return longVars; + } + + private double[] doubleVars() { + if (doubleVars.length == size) { + double[] newVars = new double[size << 1]; + System.arraycopy(doubleVars, 0, newVars, 0, size); + return doubleVars = newVars; + } + return doubleVars; + } +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java new file mode 100644 index 0000000..a4c120e --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftFormatter.java @@ -0,0 +1,40 @@ +package org.apache.hadoop.hive.serde2.thrift; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.FetchFormatter; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +public class ThriftFormatter implements FetchFormatter { + + int protocol; + + @Override + public void initialize(Configuration hconf, Properties props) throws Exception { + protocol = hconf.getInt(SerDeUtils.LIST_SINK_OUTPUT_PROTOCOL, 0); + } + + @Override + public Object convert(Object row, ObjectInspector rowOI) throws Exception { + StructObjectInspector structOI = (StructObjectInspector) rowOI; + List fields = structOI.getAllStructFieldRefs(); + Object[] converted = new Object[fields.size()]; + for (int i = 0 ; i < converted.length; i++) { + StructField fieldRef = fields.get(i); + Object field = structOI.getStructFieldData(row, fieldRef); + converted[i] = field == null ? null : + SerDeUtils.toThriftPayload(field, fieldRef.getFieldObjectInspector(), protocol); + } + return converted; + } + + @Override + public void close() throws IOException { + } +} \ No newline at end of file diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java new file mode 100644 index 0000000..5c31974 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.thrift; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.service.rpc.thrift.TColumn; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This SerDe is used to serialize the final output to thrift-able objects directly in the SerDe. Use this SerDe only for final output resultSets. + * It is used if HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS is set to true. It buffers rows that come in from FileSink till it reaches max_buffer_size (also configurable) + * or all rows are finished and FileSink.closeOp() is called. + */ +public class ThriftJDBCBinarySerDe extends AbstractSerDe { + public static final Logger LOG = LoggerFactory.getLogger(ThriftJDBCBinarySerDe.class.getName()); + private List columnNames; + private List columnTypes; + private ColumnBuffer[] columnBuffers; + private TypeInfo rowTypeInfo; + private ArrayList row; + private BytesWritable serializedBytesWritable = new BytesWritable(); + private ByteStream.Output output = new ByteStream.Output(); + private TProtocol protocol = new TCompactProtocol(new TIOStreamTransport(output)); + private ThriftFormatter thriftFormatter = new ThriftFormatter(); + private int MAX_BUFFERED_ROWS; + private int count; + private StructObjectInspector rowObjectInspector; + + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + // Get column names + MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); + String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); + String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + rowObjectInspector = + (StructObjectInspector) TypeInfoUtils + .getStandardWritableObjectInspectorFromTypeInfo(rowTypeInfo); + + initializeRowAndColumns(); + try { + thriftFormatter.initialize(conf, tbl); + } catch (Exception e) { + new SerDeException(e); + } + } + + @Override + public Class getSerializedClass() { + return BytesWritable.class; + } + + private Writable serializeBatch() throws SerDeException { + output.reset(); + for (int i = 0; i < columnBuffers.length; i++) { + TColumn tColumn = columnBuffers[i].toTColumn(); + try { + tColumn.write(protocol); + } catch(TException e) { + throw new SerDeException(e); + } + } + initializeRowAndColumns(); + serializedBytesWritable.set(output.getData(), 0, output.getLength()); + return serializedBytesWritable; + } + + // use the columnNames to initialize the reusable row object and the columnBuffers. reason this is being done is if buffer is full, we should reinitialize the + // column buffers, otherwise at the end when closeOp() is called, things get printed multiple times. + private void initializeRowAndColumns() { + row = new ArrayList(columnNames.size()); + for (int i = 0; i < columnNames.size(); i++) { + row.add(null); + } + // Initialize column buffers + columnBuffers = new ColumnBuffer[columnNames.size()]; + for (int i = 0; i < columnBuffers.length; i++) { + columnBuffers[i] = new ColumnBuffer(Type.getType(columnTypes.get(i))); + } + } + + /** + * Write TColumn objects to the underlying stream of TProtocol + */ + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + //if row is null, it means there are no more rows (closeOp()). another case can be that the buffer is full. + if (obj == null) + return serializeBatch(); + count += 1; + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + try { + Object[] formattedRow = (Object[]) thriftFormatter.convert(obj, objInspector); + for (int i = 0; i < columnNames.size(); i++) { + columnBuffers[i].addValue(formattedRow[i]); + } + } catch (Exception e) { + throw new SerDeException(e); + } + if (count == MAX_BUFFERED_ROWS) { + count = 0; + return serializeBatch(); + } + return null; + } + + @Override + public SerDeStats getSerDeStats() { + return null; + } + + /** + * Return the bytes from this writable blob. + * Eventually the client of this method will interpret the byte using the Thrift Protocol + */ + @Override + public Object deserialize(Writable blob) throws SerDeException { + return ((BytesWritable) blob).getBytes(); + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return rowObjectInspector; + } + +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/Type.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/Type.java new file mode 100644 index 0000000..0ad8c02 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/Type.java @@ -0,0 +1,438 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.thrift; + +import java.sql.DatabaseMetaData; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hive.service.rpc.thrift.TTypeId; +/** + * Type. + * + */ +public enum Type { + NULL_TYPE("VOID", + java.sql.Types.NULL, + TTypeId.NULL_TYPE), + BOOLEAN_TYPE("BOOLEAN", + java.sql.Types.BOOLEAN, + TTypeId.BOOLEAN_TYPE), + TINYINT_TYPE("TINYINT", + java.sql.Types.TINYINT, + TTypeId.TINYINT_TYPE), + SMALLINT_TYPE("SMALLINT", + java.sql.Types.SMALLINT, + TTypeId.SMALLINT_TYPE), + INT_TYPE("INT", + java.sql.Types.INTEGER, + TTypeId.INT_TYPE), + BIGINT_TYPE("BIGINT", + java.sql.Types.BIGINT, + TTypeId.BIGINT_TYPE), + FLOAT_TYPE("FLOAT", + java.sql.Types.FLOAT, + TTypeId.FLOAT_TYPE), + DOUBLE_TYPE("DOUBLE", + java.sql.Types.DOUBLE, + TTypeId.DOUBLE_TYPE), + STRING_TYPE("STRING", + java.sql.Types.VARCHAR, + TTypeId.STRING_TYPE), + CHAR_TYPE("CHAR", + java.sql.Types.CHAR, + TTypeId.CHAR_TYPE, + true, false, false), + VARCHAR_TYPE("VARCHAR", + java.sql.Types.VARCHAR, + TTypeId.VARCHAR_TYPE, + true, false, false), + DATE_TYPE("DATE", + java.sql.Types.DATE, + TTypeId.DATE_TYPE), + TIMESTAMP_TYPE("TIMESTAMP", + java.sql.Types.TIMESTAMP, + TTypeId.TIMESTAMP_TYPE), + INTERVAL_YEAR_MONTH_TYPE("INTERVAL_YEAR_MONTH", + java.sql.Types.OTHER, + TTypeId.INTERVAL_YEAR_MONTH_TYPE), + INTERVAL_DAY_TIME_TYPE("INTERVAL_DAY_TIME", + java.sql.Types.OTHER, + TTypeId.INTERVAL_DAY_TIME_TYPE), + BINARY_TYPE("BINARY", + java.sql.Types.BINARY, + TTypeId.BINARY_TYPE), + DECIMAL_TYPE("DECIMAL", + java.sql.Types.DECIMAL, + TTypeId.DECIMAL_TYPE, + true, false, false), + ARRAY_TYPE("ARRAY", + java.sql.Types.ARRAY, + TTypeId.ARRAY_TYPE, + true, true), + MAP_TYPE("MAP", + java.sql.Types.JAVA_OBJECT, + TTypeId.MAP_TYPE, + true, true), + STRUCT_TYPE("STRUCT", + java.sql.Types.STRUCT, + TTypeId.STRUCT_TYPE, + true, false), + UNION_TYPE("UNIONTYPE", + java.sql.Types.OTHER, + TTypeId.UNION_TYPE, + true, false), + USER_DEFINED_TYPE("USER_DEFINED", + java.sql.Types.OTHER, + TTypeId.USER_DEFINED_TYPE, + true, false); + + private final String name; + private final TTypeId tType; + private final int javaSQLType; + private final boolean isQualified; + private final boolean isComplex; + private final boolean isCollection; + + Type(String name, int javaSQLType, TTypeId tType, boolean isQualified, boolean isComplex, + boolean isCollection) { + this.name = name; + this.javaSQLType = javaSQLType; + this.tType = tType; + this.isQualified = isQualified; + this.isComplex = isComplex; + this.isCollection = isCollection; + } + + Type(String name, int javaSQLType, TTypeId tType, boolean isComplex, boolean isCollection) { + this(name, javaSQLType, tType, false, isComplex, isCollection); + } + + Type(String name, int javaSqlType, TTypeId tType) { + this(name, javaSqlType, tType, false, false, false); + } + + public boolean isPrimitiveType() { + return !isComplex; + } + + public boolean isQualifiedType() { + return isQualified; + } + + public boolean isComplexType() { + return isComplex; + } + + public boolean isCollectionType() { + return isCollection; + } + + public static Type getType(TTypeId tType) { + for (Type type : values()) { + if (tType.equals(type.tType)) { + return type; + } + } + throw new IllegalArgumentException("Unrecognized Thrift TTypeId value: " + tType); + } + + public static Type getType(String name) { + if (name == null) { + throw new IllegalArgumentException("Invalid type name: null"); + } + for (Type type : values()) { + if (name.equalsIgnoreCase(type.name)) { + return type; + } else if (type.isQualifiedType() || type.isComplexType()) { + if (name.toUpperCase().startsWith(type.name)) { + return type; + } + } + } + throw new IllegalArgumentException("Unrecognized type name: " + name); + } + + /** + * Convert TypeInfo to appropriate Type + * @param typeInfo + * @return + */ + public static Type getType(TypeInfo typeInfo) { + switch (typeInfo.getCategory()) { + case PRIMITIVE: { + PrimitiveTypeInfo pTypeInfo = (PrimitiveTypeInfo) typeInfo; + switch (pTypeInfo.getPrimitiveCategory()) { + case VOID: { + return Type.NULL_TYPE; + } + case BOOLEAN: { + return Type.BOOLEAN_TYPE; + } + // Double check if this is the right mapping + case BYTE: { + return Type.BINARY_TYPE; + } + // Double check if this is the right mapping + case SHORT: { + return Type.SMALLINT_TYPE; + } + case INT: { + return Type.INT_TYPE; + } + // Double check if this is the right mapping + case LONG: { + return Type.BIGINT_TYPE; + } + case FLOAT: { + return Type.FLOAT_TYPE; + } + case DOUBLE: { + return Type.DOUBLE_TYPE; + } + case STRING: { + return Type.STRING_TYPE; + } + case CHAR: { + return Type.CHAR_TYPE; + } + case VARCHAR: { + return Type.VARCHAR_TYPE; + } + case BINARY: { + return Type.BINARY_TYPE; + } + case DATE: { + return Type.DATE_TYPE; + } + case TIMESTAMP: { + return Type.TIMESTAMP_TYPE; + } + case INTERVAL_YEAR_MONTH: { + return Type.INTERVAL_YEAR_MONTH_TYPE; + } + case INTERVAL_DAY_TIME: { + return Type.INTERVAL_DAY_TIME_TYPE; + } + case DECIMAL: { + return Type.DECIMAL_TYPE; + } + default: { + throw new RuntimeException("Unrecognized type: " + pTypeInfo.getPrimitiveCategory()); + } + } + } + // Double check if this is the right mapping + case LIST: { + return Type.STRING_TYPE; + } + case MAP: { + return Type.MAP_TYPE; + } + case STRUCT: { + return Type.STRUCT_TYPE; + } + case UNION: { + return Type.UNION_TYPE; + } + default: { + throw new RuntimeException("Unrecognized type: " + typeInfo.getCategory()); + } + } + } + + /** + * Radix for this type (typically either 2 or 10) + * Null is returned for data types where this is not applicable. + */ + public Integer getNumPrecRadix() { + if (this.isNumericType()) { + return 10; + } + return null; + } + + /** + * Maximum precision for numeric types. + * Returns null for non-numeric types. + * @return + */ + public Integer getMaxPrecision() { + switch (this) { + case TINYINT_TYPE: + return 3; + case SMALLINT_TYPE: + return 5; + case INT_TYPE: + return 10; + case BIGINT_TYPE: + return 19; + case FLOAT_TYPE: + return 7; + case DOUBLE_TYPE: + return 15; + case DECIMAL_TYPE: + return HiveDecimal.MAX_PRECISION; + default: + return null; + } + } + + public boolean isNumericType() { + switch (this) { + case TINYINT_TYPE: + case SMALLINT_TYPE: + case INT_TYPE: + case BIGINT_TYPE: + case FLOAT_TYPE: + case DOUBLE_TYPE: + case DECIMAL_TYPE: + return true; + default: + return false; + } + } + + /** + * Prefix used to quote a literal of this type (may be null) + */ + public String getLiteralPrefix() { + return null; + } + + /** + * Suffix used to quote a literal of this type (may be null) + * @return + */ + public String getLiteralSuffix() { + return null; + } + + /** + * Can you use NULL for this type? + * @return + * DatabaseMetaData.typeNoNulls - does not allow NULL values + * DatabaseMetaData.typeNullable - allows NULL values + * DatabaseMetaData.typeNullableUnknown - nullability unknown + */ + public Short getNullable() { + // All Hive types are nullable + return DatabaseMetaData.typeNullable; + } + + /** + * Is the type case sensitive? + * @return + */ + public Boolean isCaseSensitive() { + switch (this) { + case STRING_TYPE: + return true; + default: + return false; + } + } + + /** + * Parameters used in creating the type (may be null) + * @return + */ + public String getCreateParams() { + return null; + } + + /** + * Can you use WHERE based on this type? + * @return + * DatabaseMetaData.typePredNone - No support + * DatabaseMetaData.typePredChar - Only support with WHERE .. LIKE + * DatabaseMetaData.typePredBasic - Supported except for WHERE .. LIKE + * DatabaseMetaData.typeSearchable - Supported for all WHERE .. + */ + public Short getSearchable() { + if (isPrimitiveType()) { + return DatabaseMetaData.typeSearchable; + } + return DatabaseMetaData.typePredNone; + } + + /** + * Is this type unsigned? + * @return + */ + public Boolean isUnsignedAttribute() { + if (isNumericType()) { + return false; + } + return true; + } + + /** + * Can this type represent money? + * @return + */ + public Boolean isFixedPrecScale() { + return false; + } + + /** + * Can this type be used for an auto-increment value? + * @return + */ + public Boolean isAutoIncrement() { + return false; + } + + /** + * Localized version of type name (may be null). + * @return + */ + public String getLocalizedName() { + return null; + } + + /** + * Minimum scale supported for this type + * @return + */ + public Short getMinimumScale() { + return 0; + } + + /** + * Maximum scale supported for this type + * @return + */ + public Short getMaximumScale() { + return 0; + } + + public TTypeId toTType() { + return tType; + } + + public int toJavaSQLType() { + return javaSQLType; + } + + public String getName() { + return name; + } +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfo.java b/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfo.java index 70dc181..e015e06 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfo.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfo.java @@ -26,11 +26,15 @@ * Stores information about a type. Always use the TypeInfoFactory to create new * TypeInfo objects. * - * We support 8 categories of types: 1. Primitive objects (String, Number, etc) - * 2. List objects (a list of objects of a single type) 3. Map objects (a map - * from objects of one type to objects of another type) 4. Struct objects (a - * list of fields with names and their own types) 5. Union objects - * 6. Decimal objects 7. Char objects 8. Varchar objects + * We support 8 categories of types: + * 1. Primitive objects (String, Number, etc) + * 2. List objects (a list of objects of a single type) + * 3. Map objects (a map from objects of one type to objects of another type) + * 4. Struct objects (a list of fields with names and their own types) + * 5. Union objects + * 6. Decimal objects + * 7. Char objects + * 8. Varchar objects */ public abstract class TypeInfo implements Serializable { diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift index aa28b6e..674530d 100644 --- a/service-rpc/if/TCLIService.thrift +++ b/service-rpc/if/TCLIService.thrift @@ -402,6 +402,8 @@ struct TRowSet { 1: required i64 startRowOffset 2: required list rows 3: optional list columns + 4: optional binary binaryColumns + 5: optional i32 columnCount } // The return status code contained in each response. diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp index 3a27a60..395af2c 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp @@ -3858,6 +3858,16 @@ void TRowSet::__set_columns(const std::vector & val) { __isset.columns = true; } +void TRowSet::__set_binaryColumns(const std::string& val) { + this->binaryColumns = val; +__isset.binaryColumns = true; +} + +void TRowSet::__set_columnCount(const int32_t val) { + this->columnCount = val; +__isset.columnCount = true; +} + uint32_t TRowSet::read(::apache::thrift::protocol::TProtocol* iprot) { apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); @@ -3929,6 +3939,22 @@ uint32_t TRowSet::read(::apache::thrift::protocol::TProtocol* iprot) { xfer += iprot->skip(ftype); } break; + case 4: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readBinary(this->binaryColumns); + this->__isset.binaryColumns = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 5: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->columnCount); + this->__isset.columnCount = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -3979,6 +4005,16 @@ uint32_t TRowSet::write(::apache::thrift::protocol::TProtocol* oprot) const { } xfer += oprot->writeFieldEnd(); } + if (this->__isset.binaryColumns) { + xfer += oprot->writeFieldBegin("binaryColumns", ::apache::thrift::protocol::T_STRING, 4); + xfer += oprot->writeBinary(this->binaryColumns); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.columnCount) { + xfer += oprot->writeFieldBegin("columnCount", ::apache::thrift::protocol::T_I32, 5); + xfer += oprot->writeI32(this->columnCount); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -3989,6 +4025,8 @@ void swap(TRowSet &a, TRowSet &b) { swap(a.startRowOffset, b.startRowOffset); swap(a.rows, b.rows); swap(a.columns, b.columns); + swap(a.binaryColumns, b.binaryColumns); + swap(a.columnCount, b.columnCount); swap(a.__isset, b.__isset); } @@ -3996,12 +4034,16 @@ TRowSet::TRowSet(const TRowSet& other163) { startRowOffset = other163.startRowOffset; rows = other163.rows; columns = other163.columns; + binaryColumns = other163.binaryColumns; + columnCount = other163.columnCount; __isset = other163.__isset; } TRowSet& TRowSet::operator=(const TRowSet& other164) { startRowOffset = other164.startRowOffset; rows = other164.rows; columns = other164.columns; + binaryColumns = other164.binaryColumns; + columnCount = other164.columnCount; __isset = other164.__isset; return *this; } @@ -4011,6 +4053,8 @@ void TRowSet::printTo(std::ostream& out) const { out << "startRowOffset=" << to_string(startRowOffset); out << ", " << "rows=" << to_string(rows); out << ", " << "columns="; (__isset.columns ? (out << to_string(columns)) : (out << "")); + out << ", " << "binaryColumns="; (__isset.binaryColumns ? (out << to_string(binaryColumns)) : (out << "")); + out << ", " << "columnCount="; (__isset.columnCount ? (out << to_string(columnCount)) : (out << "")); out << ")"; } diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h index 7f1d9dd..d4b401c 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h @@ -1809,8 +1809,10 @@ inline std::ostream& operator<<(std::ostream& out, const TColumn& obj) } typedef struct _TRowSet__isset { - _TRowSet__isset() : columns(false) {} + _TRowSet__isset() : columns(false), binaryColumns(false), columnCount(false) {} bool columns :1; + bool binaryColumns :1; + bool columnCount :1; } _TRowSet__isset; class TRowSet { @@ -1818,13 +1820,15 @@ class TRowSet { TRowSet(const TRowSet&); TRowSet& operator=(const TRowSet&); - TRowSet() : startRowOffset(0) { + TRowSet() : startRowOffset(0), binaryColumns(), columnCount(0) { } virtual ~TRowSet() throw(); int64_t startRowOffset; std::vector rows; std::vector columns; + std::string binaryColumns; + int32_t columnCount; _TRowSet__isset __isset; @@ -1834,6 +1838,10 @@ class TRowSet { void __set_columns(const std::vector & val); + void __set_binaryColumns(const std::string& val); + + void __set_columnCount(const int32_t val); + bool operator == (const TRowSet & rhs) const { if (!(startRowOffset == rhs.startRowOffset)) @@ -1844,6 +1852,14 @@ class TRowSet { return false; else if (__isset.columns && !(columns == rhs.columns)) return false; + if (__isset.binaryColumns != rhs.__isset.binaryColumns) + return false; + else if (__isset.binaryColumns && !(binaryColumns == rhs.binaryColumns)) + return false; + if (__isset.columnCount != rhs.__isset.columnCount) + return false; + else if (__isset.columnCount && !(columnCount == rhs.columnCount)) + return false; return true; } bool operator != (const TRowSet &rhs) const { diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TRowSet.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TRowSet.java index 2f6e31c..da3d9d3 100644 --- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TRowSet.java +++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TRowSet.java @@ -41,6 +41,8 @@ private static final org.apache.thrift.protocol.TField START_ROW_OFFSET_FIELD_DESC = new org.apache.thrift.protocol.TField("startRowOffset", org.apache.thrift.protocol.TType.I64, (short)1); private static final org.apache.thrift.protocol.TField ROWS_FIELD_DESC = new org.apache.thrift.protocol.TField("rows", org.apache.thrift.protocol.TType.LIST, (short)2); private static final org.apache.thrift.protocol.TField COLUMNS_FIELD_DESC = new org.apache.thrift.protocol.TField("columns", org.apache.thrift.protocol.TType.LIST, (short)3); + private static final org.apache.thrift.protocol.TField BINARY_COLUMNS_FIELD_DESC = new org.apache.thrift.protocol.TField("binaryColumns", org.apache.thrift.protocol.TType.STRING, (short)4); + private static final org.apache.thrift.protocol.TField COLUMN_COUNT_FIELD_DESC = new org.apache.thrift.protocol.TField("columnCount", org.apache.thrift.protocol.TType.I32, (short)5); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -51,12 +53,16 @@ private long startRowOffset; // required private List rows; // required private List columns; // optional + private ByteBuffer binaryColumns; // optional + private int columnCount; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { START_ROW_OFFSET((short)1, "startRowOffset"), ROWS((short)2, "rows"), - COLUMNS((short)3, "columns"); + COLUMNS((short)3, "columns"), + BINARY_COLUMNS((short)4, "binaryColumns"), + COLUMN_COUNT((short)5, "columnCount"); private static final Map byName = new HashMap(); @@ -77,6 +83,10 @@ public static _Fields findByThriftId(int fieldId) { return ROWS; case 3: // COLUMNS return COLUMNS; + case 4: // BINARY_COLUMNS + return BINARY_COLUMNS; + case 5: // COLUMN_COUNT + return COLUMN_COUNT; default: return null; } @@ -118,8 +128,9 @@ public String getFieldName() { // isset id assignments private static final int __STARTROWOFFSET_ISSET_ID = 0; + private static final int __COLUMNCOUNT_ISSET_ID = 1; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.COLUMNS}; + private static final _Fields optionals[] = {_Fields.COLUMNS,_Fields.BINARY_COLUMNS,_Fields.COLUMN_COUNT}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -131,6 +142,10 @@ public String getFieldName() { tmpMap.put(_Fields.COLUMNS, new org.apache.thrift.meta_data.FieldMetaData("columns", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TColumn.class)))); + tmpMap.put(_Fields.BINARY_COLUMNS, new org.apache.thrift.meta_data.FieldMetaData("binaryColumns", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true))); + tmpMap.put(_Fields.COLUMN_COUNT, new org.apache.thrift.meta_data.FieldMetaData("columnCount", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TRowSet.class, metaDataMap); } @@ -168,6 +183,10 @@ public TRowSet(TRowSet other) { } this.columns = __this__columns; } + if (other.isSetBinaryColumns()) { + this.binaryColumns = org.apache.thrift.TBaseHelper.copyBinary(other.binaryColumns); + } + this.columnCount = other.columnCount; } public TRowSet deepCopy() { @@ -180,6 +199,9 @@ public void clear() { this.startRowOffset = 0; this.rows = null; this.columns = null; + this.binaryColumns = null; + setColumnCountIsSet(false); + this.columnCount = 0; } public long getStartRowOffset() { @@ -280,6 +302,60 @@ public void setColumnsIsSet(boolean value) { } } + public byte[] getBinaryColumns() { + setBinaryColumns(org.apache.thrift.TBaseHelper.rightSize(binaryColumns)); + return binaryColumns == null ? null : binaryColumns.array(); + } + + public ByteBuffer bufferForBinaryColumns() { + return org.apache.thrift.TBaseHelper.copyBinary(binaryColumns); + } + + public void setBinaryColumns(byte[] binaryColumns) { + this.binaryColumns = binaryColumns == null ? (ByteBuffer)null : ByteBuffer.wrap(Arrays.copyOf(binaryColumns, binaryColumns.length)); + } + + public void setBinaryColumns(ByteBuffer binaryColumns) { + this.binaryColumns = org.apache.thrift.TBaseHelper.copyBinary(binaryColumns); + } + + public void unsetBinaryColumns() { + this.binaryColumns = null; + } + + /** Returns true if field binaryColumns is set (has been assigned a value) and false otherwise */ + public boolean isSetBinaryColumns() { + return this.binaryColumns != null; + } + + public void setBinaryColumnsIsSet(boolean value) { + if (!value) { + this.binaryColumns = null; + } + } + + public int getColumnCount() { + return this.columnCount; + } + + public void setColumnCount(int columnCount) { + this.columnCount = columnCount; + setColumnCountIsSet(true); + } + + public void unsetColumnCount() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __COLUMNCOUNT_ISSET_ID); + } + + /** Returns true if field columnCount is set (has been assigned a value) and false otherwise */ + public boolean isSetColumnCount() { + return EncodingUtils.testBit(__isset_bitfield, __COLUMNCOUNT_ISSET_ID); + } + + public void setColumnCountIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __COLUMNCOUNT_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case START_ROW_OFFSET: @@ -306,6 +382,22 @@ public void setFieldValue(_Fields field, Object value) { } break; + case BINARY_COLUMNS: + if (value == null) { + unsetBinaryColumns(); + } else { + setBinaryColumns((ByteBuffer)value); + } + break; + + case COLUMN_COUNT: + if (value == null) { + unsetColumnCount(); + } else { + setColumnCount((Integer)value); + } + break; + } } @@ -320,6 +412,12 @@ public Object getFieldValue(_Fields field) { case COLUMNS: return getColumns(); + case BINARY_COLUMNS: + return getBinaryColumns(); + + case COLUMN_COUNT: + return getColumnCount(); + } throw new IllegalStateException(); } @@ -337,6 +435,10 @@ public boolean isSet(_Fields field) { return isSetRows(); case COLUMNS: return isSetColumns(); + case BINARY_COLUMNS: + return isSetBinaryColumns(); + case COLUMN_COUNT: + return isSetColumnCount(); } throw new IllegalStateException(); } @@ -381,6 +483,24 @@ public boolean equals(TRowSet that) { return false; } + boolean this_present_binaryColumns = true && this.isSetBinaryColumns(); + boolean that_present_binaryColumns = true && that.isSetBinaryColumns(); + if (this_present_binaryColumns || that_present_binaryColumns) { + if (!(this_present_binaryColumns && that_present_binaryColumns)) + return false; + if (!this.binaryColumns.equals(that.binaryColumns)) + return false; + } + + boolean this_present_columnCount = true && this.isSetColumnCount(); + boolean that_present_columnCount = true && that.isSetColumnCount(); + if (this_present_columnCount || that_present_columnCount) { + if (!(this_present_columnCount && that_present_columnCount)) + return false; + if (this.columnCount != that.columnCount) + return false; + } + return true; } @@ -403,6 +523,16 @@ public int hashCode() { if (present_columns) list.add(columns); + boolean present_binaryColumns = true && (isSetBinaryColumns()); + list.add(present_binaryColumns); + if (present_binaryColumns) + list.add(binaryColumns); + + boolean present_columnCount = true && (isSetColumnCount()); + list.add(present_columnCount); + if (present_columnCount) + list.add(columnCount); + return list.hashCode(); } @@ -444,6 +574,26 @@ public int compareTo(TRowSet other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetBinaryColumns()).compareTo(other.isSetBinaryColumns()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetBinaryColumns()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.binaryColumns, other.binaryColumns); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetColumnCount()).compareTo(other.isSetColumnCount()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetColumnCount()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.columnCount, other.columnCount); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -485,6 +635,22 @@ public String toString() { } first = false; } + if (isSetBinaryColumns()) { + if (!first) sb.append(", "); + sb.append("binaryColumns:"); + if (this.binaryColumns == null) { + sb.append("null"); + } else { + org.apache.thrift.TBaseHelper.toString(this.binaryColumns, sb); + } + first = false; + } + if (isSetColumnCount()) { + if (!first) sb.append(", "); + sb.append("columnCount:"); + sb.append(this.columnCount); + first = false; + } sb.append(")"); return sb.toString(); } @@ -584,6 +750,22 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TRowSet struct) thr org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 4: // BINARY_COLUMNS + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.binaryColumns = iprot.readBinary(); + struct.setBinaryColumnsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 5: // COLUMN_COUNT + if (schemeField.type == org.apache.thrift.protocol.TType.I32) { + struct.columnCount = iprot.readI32(); + struct.setColumnCountIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -626,6 +808,18 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TRowSet struct) th oprot.writeFieldEnd(); } } + if (struct.binaryColumns != null) { + if (struct.isSetBinaryColumns()) { + oprot.writeFieldBegin(BINARY_COLUMNS_FIELD_DESC); + oprot.writeBinary(struct.binaryColumns); + oprot.writeFieldEnd(); + } + } + if (struct.isSetColumnCount()) { + oprot.writeFieldBegin(COLUMN_COUNT_FIELD_DESC); + oprot.writeI32(struct.columnCount); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -655,7 +849,13 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TRowSet struct) thr if (struct.isSetColumns()) { optionals.set(0); } - oprot.writeBitSet(optionals, 1); + if (struct.isSetBinaryColumns()) { + optionals.set(1); + } + if (struct.isSetColumnCount()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); if (struct.isSetColumns()) { { oprot.writeI32(struct.columns.size()); @@ -665,6 +865,12 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TRowSet struct) thr } } } + if (struct.isSetBinaryColumns()) { + oprot.writeBinary(struct.binaryColumns); + } + if (struct.isSetColumnCount()) { + oprot.writeI32(struct.columnCount); + } } @Override @@ -684,7 +890,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TRowSet struct) thro } } struct.setRowsIsSet(true); - BitSet incoming = iprot.readBitSet(1); + BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { { org.apache.thrift.protocol.TList _list131 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); @@ -699,6 +905,14 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TRowSet struct) thro } struct.setColumnsIsSet(true); } + if (incoming.get(1)) { + struct.binaryColumns = iprot.readBinary(); + struct.setBinaryColumnsIsSet(true); + } + if (incoming.get(2)) { + struct.columnCount = iprot.readI32(); + struct.setColumnCountIsSet(true); + } } } diff --git a/service-rpc/src/gen/thrift/gen-php/Types.php b/service-rpc/src/gen/thrift/gen-php/Types.php index b7df50a..fc12770 100644 --- a/service-rpc/src/gen/thrift/gen-php/Types.php +++ b/service-rpc/src/gen/thrift/gen-php/Types.php @@ -3772,6 +3772,14 @@ class TRowSet { * @var \TColumn[] */ public $columns = null; + /** + * @var string + */ + public $binaryColumns = null; + /** + * @var int + */ + public $columnCount = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -3798,6 +3806,14 @@ class TRowSet { 'class' => '\TColumn', ), ), + 4 => array( + 'var' => 'binaryColumns', + 'type' => TType::STRING, + ), + 5 => array( + 'var' => 'columnCount', + 'type' => TType::I32, + ), ); } if (is_array($vals)) { @@ -3810,6 +3826,12 @@ class TRowSet { if (isset($vals['columns'])) { $this->columns = $vals['columns']; } + if (isset($vals['binaryColumns'])) { + $this->binaryColumns = $vals['binaryColumns']; + } + if (isset($vals['columnCount'])) { + $this->columnCount = $vals['columnCount']; + } } } @@ -3875,6 +3897,20 @@ class TRowSet { $xfer += $input->skip($ftype); } break; + case 4: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->binaryColumns); + } else { + $xfer += $input->skip($ftype); + } + break; + case 5: + if ($ftype == TType::I32) { + $xfer += $input->readI32($this->columnCount); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -3927,6 +3963,16 @@ class TRowSet { } $xfer += $output->writeFieldEnd(); } + if ($this->binaryColumns !== null) { + $xfer += $output->writeFieldBegin('binaryColumns', TType::STRING, 4); + $xfer += $output->writeString($this->binaryColumns); + $xfer += $output->writeFieldEnd(); + } + if ($this->columnCount !== null) { + $xfer += $output->writeFieldBegin('columnCount', TType::I32, 5); + $xfer += $output->writeI32($this->columnCount); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py index c691781..231d001 100644 --- a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py +++ b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py @@ -2965,6 +2965,8 @@ class TRowSet: - startRowOffset - rows - columns + - binaryColumns + - columnCount """ thrift_spec = ( @@ -2972,12 +2974,16 @@ class TRowSet: (1, TType.I64, 'startRowOffset', None, None, ), # 1 (2, TType.LIST, 'rows', (TType.STRUCT,(TRow, TRow.thrift_spec)), None, ), # 2 (3, TType.LIST, 'columns', (TType.STRUCT,(TColumn, TColumn.thrift_spec)), None, ), # 3 + (4, TType.STRING, 'binaryColumns', None, None, ), # 4 + (5, TType.I32, 'columnCount', None, None, ), # 5 ) - def __init__(self, startRowOffset=None, rows=None, columns=None,): + def __init__(self, startRowOffset=None, rows=None, columns=None, binaryColumns=None, columnCount=None,): self.startRowOffset = startRowOffset self.rows = rows self.columns = columns + self.binaryColumns = binaryColumns + self.columnCount = columnCount def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -3015,6 +3021,16 @@ def read(self, iprot): iprot.readListEnd() else: iprot.skip(ftype) + elif fid == 4: + if ftype == TType.STRING: + self.binaryColumns = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.I32: + self.columnCount = iprot.readI32() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -3043,6 +3059,14 @@ def write(self, oprot): iter117.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() + if self.binaryColumns is not None: + oprot.writeFieldBegin('binaryColumns', TType.STRING, 4) + oprot.writeString(self.binaryColumns) + oprot.writeFieldEnd() + if self.columnCount is not None: + oprot.writeFieldBegin('columnCount', TType.I32, 5) + oprot.writeI32(self.columnCount) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -3059,6 +3083,8 @@ def __hash__(self): value = (value * 31) ^ hash(self.startRowOffset) value = (value * 31) ^ hash(self.rows) value = (value * 31) ^ hash(self.columns) + value = (value * 31) ^ hash(self.binaryColumns) + value = (value * 31) ^ hash(self.columnCount) return value def __repr__(self): diff --git a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb index 07ed97c..28cae72 100644 --- a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb +++ b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb @@ -832,11 +832,15 @@ class TRowSet STARTROWOFFSET = 1 ROWS = 2 COLUMNS = 3 + BINARYCOLUMNS = 4 + COLUMNCOUNT = 5 FIELDS = { STARTROWOFFSET => {:type => ::Thrift::Types::I64, :name => 'startRowOffset'}, ROWS => {:type => ::Thrift::Types::LIST, :name => 'rows', :element => {:type => ::Thrift::Types::STRUCT, :class => ::TRow}}, - COLUMNS => {:type => ::Thrift::Types::LIST, :name => 'columns', :element => {:type => ::Thrift::Types::STRUCT, :class => ::TColumn}, :optional => true} + COLUMNS => {:type => ::Thrift::Types::LIST, :name => 'columns', :element => {:type => ::Thrift::Types::STRUCT, :class => ::TColumn}, :optional => true}, + BINARYCOLUMNS => {:type => ::Thrift::Types::STRING, :name => 'binaryColumns', :binary => true, :optional => true}, + COLUMNCOUNT => {:type => ::Thrift::Types::I32, :name => 'columnCount', :optional => true} } def struct_fields; FIELDS; end diff --git a/service/src/java/org/apache/hive/service/cli/Column.java b/service/src/java/org/apache/hive/service/cli/Column.java deleted file mode 100644 index 102d920..0000000 --- a/service/src/java/org/apache/hive/service/cli/Column.java +++ /dev/null @@ -1,434 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hive.service.cli; - -import java.nio.ByteBuffer; -import java.util.AbstractList; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.BitSet; -import java.util.List; - -import org.apache.hive.service.rpc.thrift.TBinaryColumn; -import org.apache.hive.service.rpc.thrift.TBoolColumn; -import org.apache.hive.service.rpc.thrift.TByteColumn; -import org.apache.hive.service.rpc.thrift.TColumn; -import org.apache.hive.service.rpc.thrift.TDoubleColumn; -import org.apache.hive.service.rpc.thrift.TI16Column; -import org.apache.hive.service.rpc.thrift.TI32Column; -import org.apache.hive.service.rpc.thrift.TI64Column; -import org.apache.hive.service.rpc.thrift.TStringColumn; - -import com.google.common.primitives.Booleans; -import com.google.common.primitives.Bytes; -import com.google.common.primitives.Doubles; -import com.google.common.primitives.Ints; -import com.google.common.primitives.Longs; -import com.google.common.primitives.Shorts; - -import org.apache.hadoop.hive.common.type.HiveDecimal; - -/** - * Column. - */ -public class Column extends AbstractList { - - private static final int DEFAULT_SIZE = 100; - - private final Type type; - - private BitSet nulls; - - private int size; - private boolean[] boolVars; - private byte[] byteVars; - private short[] shortVars; - private int[] intVars; - private long[] longVars; - private double[] doubleVars; - private List stringVars; - private List binaryVars; - - public Column(Type type, BitSet nulls, Object values) { - this.type = type; - this.nulls = nulls; - if (type == Type.BOOLEAN_TYPE) { - boolVars = (boolean[]) values; - size = boolVars.length; - } else if (type == Type.TINYINT_TYPE) { - byteVars = (byte[]) values; - size = byteVars.length; - } else if (type == Type.SMALLINT_TYPE) { - shortVars = (short[]) values; - size = shortVars.length; - } else if (type == Type.INT_TYPE) { - intVars = (int[]) values; - size = intVars.length; - } else if (type == Type.BIGINT_TYPE) { - longVars = (long[]) values; - size = longVars.length; - } else if (type == Type.DOUBLE_TYPE) { - doubleVars = (double[]) values; - size = doubleVars.length; - } else if (type == Type.BINARY_TYPE) { - binaryVars = (List) values; - size = binaryVars.size(); - } else if (type == Type.STRING_TYPE) { - stringVars = (List) values; - size = stringVars.size(); - } else { - throw new IllegalStateException("invalid union object"); - } - } - - public Column(Type type) { - nulls = new BitSet(); - switch (type) { - case BOOLEAN_TYPE: - boolVars = new boolean[DEFAULT_SIZE]; - break; - case TINYINT_TYPE: - byteVars = new byte[DEFAULT_SIZE]; - break; - case SMALLINT_TYPE: - shortVars = new short[DEFAULT_SIZE]; - break; - case INT_TYPE: - intVars = new int[DEFAULT_SIZE]; - break; - case BIGINT_TYPE: - longVars = new long[DEFAULT_SIZE]; - break; - case FLOAT_TYPE: - case DOUBLE_TYPE: - type = Type.DOUBLE_TYPE; - doubleVars = new double[DEFAULT_SIZE]; - break; - case BINARY_TYPE: - binaryVars = new ArrayList(); - break; - default: - type = Type.STRING_TYPE; - stringVars = new ArrayList(); - } - this.type = type; - } - - public Column(TColumn colValues) { - if (colValues.isSetBoolVal()) { - type = Type.BOOLEAN_TYPE; - nulls = toBitset(colValues.getBoolVal().getNulls()); - boolVars = Booleans.toArray(colValues.getBoolVal().getValues()); - size = boolVars.length; - } else if (colValues.isSetByteVal()) { - type = Type.TINYINT_TYPE; - nulls = toBitset(colValues.getByteVal().getNulls()); - byteVars = Bytes.toArray(colValues.getByteVal().getValues()); - size = byteVars.length; - } else if (colValues.isSetI16Val()) { - type = Type.SMALLINT_TYPE; - nulls = toBitset(colValues.getI16Val().getNulls()); - shortVars = Shorts.toArray(colValues.getI16Val().getValues()); - size = shortVars.length; - } else if (colValues.isSetI32Val()) { - type = Type.INT_TYPE; - nulls = toBitset(colValues.getI32Val().getNulls()); - intVars = Ints.toArray(colValues.getI32Val().getValues()); - size = intVars.length; - } else if (colValues.isSetI64Val()) { - type = Type.BIGINT_TYPE; - nulls = toBitset(colValues.getI64Val().getNulls()); - longVars = Longs.toArray(colValues.getI64Val().getValues()); - size = longVars.length; - } else if (colValues.isSetDoubleVal()) { - type = Type.DOUBLE_TYPE; - nulls = toBitset(colValues.getDoubleVal().getNulls()); - doubleVars = Doubles.toArray(colValues.getDoubleVal().getValues()); - size = doubleVars.length; - } else if (colValues.isSetBinaryVal()) { - type = Type.BINARY_TYPE; - nulls = toBitset(colValues.getBinaryVal().getNulls()); - binaryVars = colValues.getBinaryVal().getValues(); - size = binaryVars.size(); - } else if (colValues.isSetStringVal()) { - type = Type.STRING_TYPE; - nulls = toBitset(colValues.getStringVal().getNulls()); - stringVars = colValues.getStringVal().getValues(); - size = stringVars.size(); - } else { - throw new IllegalStateException("invalid union object"); - } - } - - public Column extractSubset(int start, int end) { - BitSet subNulls = nulls.get(start, end); - if (type == Type.BOOLEAN_TYPE) { - Column subset = new Column(type, subNulls, Arrays.copyOfRange(boolVars, start, end)); - boolVars = Arrays.copyOfRange(boolVars, end, size); - nulls = nulls.get(start, size); - size = boolVars.length; - return subset; - } - if (type == Type.TINYINT_TYPE) { - Column subset = new Column(type, subNulls, Arrays.copyOfRange(byteVars, start, end)); - byteVars = Arrays.copyOfRange(byteVars, end, size); - nulls = nulls.get(start, size); - size = byteVars.length; - return subset; - } - if (type == Type.SMALLINT_TYPE) { - Column subset = new Column(type, subNulls, Arrays.copyOfRange(shortVars, start, end)); - shortVars = Arrays.copyOfRange(shortVars, end, size); - nulls = nulls.get(start, size); - size = shortVars.length; - return subset; - } - if (type == Type.INT_TYPE) { - Column subset = new Column(type, subNulls, Arrays.copyOfRange(intVars, start, end)); - intVars = Arrays.copyOfRange(intVars, end, size); - nulls = nulls.get(start, size); - size = intVars.length; - return subset; - } - if (type == Type.BIGINT_TYPE) { - Column subset = new Column(type, subNulls, Arrays.copyOfRange(longVars, start, end)); - longVars = Arrays.copyOfRange(longVars, end, size); - nulls = nulls.get(start, size); - size = longVars.length; - return subset; - } - if (type == Type.DOUBLE_TYPE) { - Column subset = new Column(type, subNulls, Arrays.copyOfRange(doubleVars, start, end)); - doubleVars = Arrays.copyOfRange(doubleVars, end, size); - nulls = nulls.get(start, size); - size = doubleVars.length; - return subset; - } - if (type == Type.BINARY_TYPE) { - Column subset = new Column(type, subNulls, binaryVars.subList(start, end)); - binaryVars = binaryVars.subList(end, binaryVars.size()); - nulls = nulls.get(start, size); - size = binaryVars.size(); - return subset; - } - if (type == Type.STRING_TYPE) { - Column subset = new Column(type, subNulls, stringVars.subList(start, end)); - stringVars = stringVars.subList(end, stringVars.size()); - nulls = nulls.get(start, size); - size = stringVars.size(); - return subset; - } - throw new IllegalStateException("invalid union object"); - } - - private static final byte[] MASKS = new byte[] { - 0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, (byte)0x80 - }; - - private static BitSet toBitset(byte[] nulls) { - BitSet bitset = new BitSet(); - int bits = nulls.length * 8; - for (int i = 0; i < bits; i++) { - bitset.set(i, (nulls[i / 8] & MASKS[i % 8]) != 0); - } - return bitset; - } - - private static byte[] toBinary(BitSet bitset) { - byte[] nulls = new byte[1 + (bitset.length() / 8)]; - for (int i = 0; i < bitset.length(); i++) { - nulls[i / 8] |= bitset.get(i) ? MASKS[i % 8] : 0; - } - return nulls; - } - - public Type getType() { - return type; - } - - @Override - public Object get(int index) { - if (nulls.get(index)) { - return null; - } - switch (type) { - case BOOLEAN_TYPE: - return boolVars[index]; - case TINYINT_TYPE: - return byteVars[index]; - case SMALLINT_TYPE: - return shortVars[index]; - case INT_TYPE: - return intVars[index]; - case BIGINT_TYPE: - return longVars[index]; - case DOUBLE_TYPE: - return doubleVars[index]; - case STRING_TYPE: - return stringVars.get(index); - case BINARY_TYPE: - return binaryVars.get(index).array(); - } - return null; - } - - @Override - public int size() { - return size; - } - - public TColumn toTColumn() { - TColumn value = new TColumn(); - ByteBuffer nullMasks = ByteBuffer.wrap(toBinary(nulls)); - switch (type) { - case BOOLEAN_TYPE: - value.setBoolVal(new TBoolColumn(Booleans.asList(Arrays.copyOfRange(boolVars, 0, size)), nullMasks)); - break; - case TINYINT_TYPE: - value.setByteVal(new TByteColumn(Bytes.asList(Arrays.copyOfRange(byteVars, 0, size)), nullMasks)); - break; - case SMALLINT_TYPE: - value.setI16Val(new TI16Column(Shorts.asList(Arrays.copyOfRange(shortVars, 0, size)), nullMasks)); - break; - case INT_TYPE: - value.setI32Val(new TI32Column(Ints.asList(Arrays.copyOfRange(intVars, 0, size)), nullMasks)); - break; - case BIGINT_TYPE: - value.setI64Val(new TI64Column(Longs.asList(Arrays.copyOfRange(longVars, 0, size)), nullMasks)); - break; - case DOUBLE_TYPE: - value.setDoubleVal(new TDoubleColumn(Doubles.asList(Arrays.copyOfRange(doubleVars, 0, size)), nullMasks)); - break; - case STRING_TYPE: - value.setStringVal(new TStringColumn(stringVars, nullMasks)); - break; - case BINARY_TYPE: - value.setBinaryVal(new TBinaryColumn(binaryVars, nullMasks)); - break; - } - return value; - } - - private static final ByteBuffer EMPTY_BINARY = ByteBuffer.allocate(0); - private static final String EMPTY_STRING = ""; - - public void addValue(TypeDescriptor typeDescriptor, Object field) { - if (field != null && typeDescriptor.getType() == Type.DECIMAL_TYPE) { - int scale = typeDescriptor.getDecimalDigits(); - field = ((HiveDecimal) field).toFormatString(scale); - } - addValue(typeDescriptor.getType(), field); - } - - public void addValue(Type type, Object field) { - switch (type) { - case BOOLEAN_TYPE: - nulls.set(size, field == null); - boolVars()[size] = field == null ? true : (Boolean)field; - break; - case TINYINT_TYPE: - nulls.set(size, field == null); - byteVars()[size] = field == null ? 0 : (Byte) field; - break; - case SMALLINT_TYPE: - nulls.set(size, field == null); - shortVars()[size] = field == null ? 0 : (Short)field; - break; - case INT_TYPE: - nulls.set(size, field == null); - intVars()[size] = field == null ? 0 : (Integer)field; - break; - case BIGINT_TYPE: - nulls.set(size, field == null); - longVars()[size] = field == null ? 0 : (Long)field; - break; - case FLOAT_TYPE: - nulls.set(size, field == null); - doubleVars()[size] = field == null ? 0 : new Double(field.toString()); - break; - case DOUBLE_TYPE: - nulls.set(size, field == null); - doubleVars()[size] = field == null ? 0 : (Double)field; - break; - case BINARY_TYPE: - nulls.set(binaryVars.size(), field == null); - binaryVars.add(field == null ? EMPTY_BINARY : ByteBuffer.wrap((byte[])field)); - break; - default: - nulls.set(stringVars.size(), field == null); - stringVars.add(field == null ? EMPTY_STRING : String.valueOf(field)); - break; - } - size++; - } - - private boolean[] boolVars() { - if (boolVars.length == size) { - boolean[] newVars = new boolean[size << 1]; - System.arraycopy(boolVars, 0, newVars, 0, size); - return boolVars = newVars; - } - return boolVars; - } - - private byte[] byteVars() { - if (byteVars.length == size) { - byte[] newVars = new byte[size << 1]; - System.arraycopy(byteVars, 0, newVars, 0, size); - return byteVars = newVars; - } - return byteVars; - } - - private short[] shortVars() { - if (shortVars.length == size) { - short[] newVars = new short[size << 1]; - System.arraycopy(shortVars, 0, newVars, 0, size); - return shortVars = newVars; - } - return shortVars; - } - - private int[] intVars() { - if (intVars.length == size) { - int[] newVars = new int[size << 1]; - System.arraycopy(intVars, 0, newVars, 0, size); - return intVars = newVars; - } - return intVars; - } - - private long[] longVars() { - if (longVars.length == size) { - long[] newVars = new long[size << 1]; - System.arraycopy(longVars, 0, newVars, 0, size); - return longVars = newVars; - } - return longVars; - } - - private double[] doubleVars() { - if (doubleVars.length == size) { - double[] newVars = new double[size << 1]; - System.arraycopy(doubleVars, 0, newVars, 0, size); - return doubleVars = newVars; - } - return doubleVars; - } -} diff --git a/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java b/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java index b7fe663..9cbe89c 100644 --- a/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java +++ b/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java @@ -18,13 +18,24 @@ package org.apache.hive.service.cli; +import java.io.ByteArrayInputStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer; +import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hive.service.rpc.thrift.TColumn; import org.apache.hive.service.rpc.thrift.TRow; import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TIOStreamTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * ColumnBasedSet. @@ -34,40 +45,79 @@ private long startOffset; private final TypeDescriptor[] descriptors; // non-null only for writing (server-side) - private final List columns; + private final List columns; + private byte[] blob; + private boolean isBlobBased = false; + public static final Logger LOG = LoggerFactory.getLogger(ColumnBasedSet.class); public ColumnBasedSet(TableSchema schema) { descriptors = schema.toTypeDescriptors(); - columns = new ArrayList(); + columns = new ArrayList(); for (ColumnDescriptor colDesc : schema.getColumnDescriptors()) { - columns.add(new Column(colDesc.getType())); + columns.add(new ColumnBuffer(colDesc.getType())); } } - public ColumnBasedSet(TRowSet tRowSet) { + public ColumnBasedSet(TRowSet tRowSet) throws TException { descriptors = null; - columns = new ArrayList(); - for (TColumn tvalue : tRowSet.getColumns()) { - columns.add(new Column(tvalue)); + columns = new ArrayList(); + // Use TCompactProtocol to read serialized TColumns + if (tRowSet.isSetBinaryColumns()) { + TProtocol protocol = + new TCompactProtocol(new TIOStreamTransport(new ByteArrayInputStream( + tRowSet.getBinaryColumns()))); + // Read from the stream using the protocol for each column in final schema + for (int i = 0; i < tRowSet.getColumnCount(); i++) { + TColumn tvalue = new TColumn(); + try { + tvalue.read(protocol); + } catch (TException e) { + LOG.error(e.getMessage(), e); + throw new TException("Error reading column value from the row set blob", e); + } + columns.add(new ColumnBuffer(tvalue)); + } + } + else { + if (tRowSet.getColumns() != null) { + for (TColumn tvalue : tRowSet.getColumns()) { + columns.add(new ColumnBuffer(tvalue)); + } + } } startOffset = tRowSet.getStartRowOffset(); } - private ColumnBasedSet(TypeDescriptor[] descriptors, List columns, long startOffset) { + private ColumnBasedSet(TypeDescriptor[] descriptors, List columns, long startOffset) { this.descriptors = descriptors; this.columns = columns; this.startOffset = startOffset; } + public ColumnBasedSet(TableSchema schema, boolean isBlobBased) { + this(schema); + this.isBlobBased = isBlobBased; + } + @Override public ColumnBasedSet addRow(Object[] fields) { - for (int i = 0; i < fields.length; i++) { - columns.get(i).addValue(descriptors[i], fields[i]); + if (isBlobBased) { + this.blob = (byte[]) fields[0]; + } else { + for (int i = 0; i < fields.length; i++) { + TypeDescriptor descriptor = descriptors[i]; + Object field = fields[i]; + if (field != null && descriptor.getType() == Type.DECIMAL_TYPE) { + int scale = descriptor.getDecimalDigits(); + field = ((HiveDecimal) field).toFormatString(scale); + } + columns.get(i).addValue(descriptor.getType(), field); + } } return this; } - public List getColumns() { + public List getColumns() { return columns; } @@ -85,7 +135,7 @@ public int numRows() { public ColumnBasedSet extractSubset(int maxRows) { int numRows = Math.min(numRows(), maxRows); - List subset = new ArrayList(); + List subset = new ArrayList(); for (int i = 0; i < columns.size(); i++) { subset.add(columns.get(i).extractSubset(0, numRows)); } @@ -106,8 +156,14 @@ public void setStartOffset(long startOffset) { public TRowSet toTRowSet() { TRowSet tRowSet = new TRowSet(startOffset, new ArrayList()); - for (int i = 0; i < columns.size(); i++) { - tRowSet.addToColumns(columns.get(i).toTColumn()); + if (isBlobBased) { + tRowSet.setColumns(null); + tRowSet.setBinaryColumns(blob); + tRowSet.setColumnCount(numColumns()); + } else { + for (int i = 0; i < columns.size(); i++) { + tRowSet.addToColumns(columns.get(i).toTColumn()); + } } return tRowSet; } diff --git a/service/src/java/org/apache/hive/service/cli/ColumnDescriptor.java b/service/src/java/org/apache/hive/service/cli/ColumnDescriptor.java index 7bd9f06..bfd7135 100644 --- a/service/src/java/org/apache/hive/service/cli/ColumnDescriptor.java +++ b/service/src/java/org/apache/hive/service/cli/ColumnDescriptor.java @@ -18,7 +18,7 @@ package org.apache.hive.service.cli; -import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hive.service.rpc.thrift.TColumnDesc; @@ -47,14 +47,8 @@ public ColumnDescriptor(TColumnDesc tColumnDesc) { position = tColumnDesc.getPosition(); } - public ColumnDescriptor(FieldSchema column, int position) { - name = column.getName(); - comment = column.getComment(); - type = new TypeDescriptor(column.getType()); - this.position = position; - } - - public static ColumnDescriptor newPrimitiveColumnDescriptor(String name, String comment, Type type, int position) { + public static ColumnDescriptor newPrimitiveColumnDescriptor(String name, String comment, + Type type, int position) { // Current usage looks like it's only for metadata columns, but if that changes then // this method may need to require a type qualifiers aruments. return new ColumnDescriptor(name, comment, new TypeDescriptor(type), position); diff --git a/service/src/java/org/apache/hive/service/cli/ColumnValue.java b/service/src/java/org/apache/hive/service/cli/ColumnValue.java index 37460e6..28149e1 100644 --- a/service/src/java/org/apache/hive/service/cli/ColumnValue.java +++ b/service/src/java/org/apache/hive/service/cli/ColumnValue.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hive.service.rpc.thrift.TBoolValue; import org.apache.hive.service.rpc.thrift.TByteValue; import org.apache.hive.service.rpc.thrift.TColumnValue; diff --git a/service/src/java/org/apache/hive/service/cli/RowSetFactory.java b/service/src/java/org/apache/hive/service/cli/RowSetFactory.java index 1c2ff7f..d9be6a0 100644 --- a/service/src/java/org/apache/hive/service/cli/RowSetFactory.java +++ b/service/src/java/org/apache/hive/service/cli/RowSetFactory.java @@ -20,22 +20,25 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion; import org.apache.hive.service.rpc.thrift.TRowSet; +import org.apache.thrift.TException; import static org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6; public class RowSetFactory { - public static RowSet create(TableSchema schema, TProtocolVersion version) { + // This call is accessed from server side + public static RowSet create(TableSchema schema, TProtocolVersion version, boolean isBlobBased) { if (version.getValue() >= HIVE_CLI_SERVICE_PROTOCOL_V6.getValue()) { - return new ColumnBasedSet(schema); + return new ColumnBasedSet(schema, isBlobBased); } return new RowBasedSet(schema); } - public static RowSet create(TRowSet results, TProtocolVersion version) { - if (version.getValue() >= HIVE_CLI_SERVICE_PROTOCOL_V6.getValue()) { - return new ColumnBasedSet(results); - } - return new RowBasedSet(results); + // This call is accessed from client (jdbc) side + public static RowSet create(TRowSet results, TProtocolVersion version) throws TException { + if (version.getValue() >= HIVE_CLI_SERVICE_PROTOCOL_V6.getValue()) { + return new ColumnBasedSet(results); + } + return new RowBasedSet(results); } } diff --git a/service/src/java/org/apache/hive/service/cli/TableSchema.java b/service/src/java/org/apache/hive/service/cli/TableSchema.java index 2206e2c..f5eda8a 100644 --- a/service/src/java/org/apache/hive/service/cli/TableSchema.java +++ b/service/src/java/org/apache/hive/service/cli/TableSchema.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; +import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hive.service.rpc.thrift.TColumnDesc; import org.apache.hive.service.rpc.thrift.TTableSchema; @@ -49,7 +50,8 @@ public TableSchema(TTableSchema tTableSchema) { public TableSchema(List fieldSchemas) { int pos = 1; for (FieldSchema field : fieldSchemas) { - columns.add(new ColumnDescriptor(field, pos++)); + columns.add(new ColumnDescriptor(field.getName(), field.getComment(), new TypeDescriptor( + field.getType()), pos++)); } } diff --git a/service/src/java/org/apache/hive/service/cli/Type.java b/service/src/java/org/apache/hive/service/cli/Type.java deleted file mode 100644 index f067b3d..0000000 --- a/service/src/java/org/apache/hive/service/cli/Type.java +++ /dev/null @@ -1,348 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hive.service.cli; - -import java.sql.DatabaseMetaData; - -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hive.service.rpc.thrift.TTypeId; - -/** - * Type. - * - */ -public enum Type { - NULL_TYPE("VOID", - java.sql.Types.NULL, - TTypeId.NULL_TYPE), - BOOLEAN_TYPE("BOOLEAN", - java.sql.Types.BOOLEAN, - TTypeId.BOOLEAN_TYPE), - TINYINT_TYPE("TINYINT", - java.sql.Types.TINYINT, - TTypeId.TINYINT_TYPE), - SMALLINT_TYPE("SMALLINT", - java.sql.Types.SMALLINT, - TTypeId.SMALLINT_TYPE), - INT_TYPE("INT", - java.sql.Types.INTEGER, - TTypeId.INT_TYPE), - BIGINT_TYPE("BIGINT", - java.sql.Types.BIGINT, - TTypeId.BIGINT_TYPE), - FLOAT_TYPE("FLOAT", - java.sql.Types.FLOAT, - TTypeId.FLOAT_TYPE), - DOUBLE_TYPE("DOUBLE", - java.sql.Types.DOUBLE, - TTypeId.DOUBLE_TYPE), - STRING_TYPE("STRING", - java.sql.Types.VARCHAR, - TTypeId.STRING_TYPE), - CHAR_TYPE("CHAR", - java.sql.Types.CHAR, - TTypeId.CHAR_TYPE, - true, false, false), - VARCHAR_TYPE("VARCHAR", - java.sql.Types.VARCHAR, - TTypeId.VARCHAR_TYPE, - true, false, false), - DATE_TYPE("DATE", - java.sql.Types.DATE, - TTypeId.DATE_TYPE), - TIMESTAMP_TYPE("TIMESTAMP", - java.sql.Types.TIMESTAMP, - TTypeId.TIMESTAMP_TYPE), - INTERVAL_YEAR_MONTH_TYPE("INTERVAL_YEAR_MONTH", - java.sql.Types.OTHER, - TTypeId.INTERVAL_YEAR_MONTH_TYPE), - INTERVAL_DAY_TIME_TYPE("INTERVAL_DAY_TIME", - java.sql.Types.OTHER, - TTypeId.INTERVAL_DAY_TIME_TYPE), - BINARY_TYPE("BINARY", - java.sql.Types.BINARY, - TTypeId.BINARY_TYPE), - DECIMAL_TYPE("DECIMAL", - java.sql.Types.DECIMAL, - TTypeId.DECIMAL_TYPE, - true, false, false), - ARRAY_TYPE("ARRAY", - java.sql.Types.ARRAY, - TTypeId.ARRAY_TYPE, - true, true), - MAP_TYPE("MAP", - java.sql.Types.JAVA_OBJECT, - TTypeId.MAP_TYPE, - true, true), - STRUCT_TYPE("STRUCT", - java.sql.Types.STRUCT, - TTypeId.STRUCT_TYPE, - true, false), - UNION_TYPE("UNIONTYPE", - java.sql.Types.OTHER, - TTypeId.UNION_TYPE, - true, false), - USER_DEFINED_TYPE("USER_DEFINED", - java.sql.Types.OTHER, - TTypeId.USER_DEFINED_TYPE, - true, false); - - private final String name; - private final TTypeId tType; - private final int javaSQLType; - private final boolean isQualified; - private final boolean isComplex; - private final boolean isCollection; - - Type(String name, int javaSQLType, TTypeId tType, boolean isQualified, boolean isComplex, boolean isCollection) { - this.name = name; - this.javaSQLType = javaSQLType; - this.tType = tType; - this.isQualified = isQualified; - this.isComplex = isComplex; - this.isCollection = isCollection; - } - - Type(String name, int javaSQLType, TTypeId tType, boolean isComplex, boolean isCollection) { - this(name, javaSQLType, tType, false, isComplex, isCollection); - } - - Type(String name, int javaSqlType, TTypeId tType) { - this(name, javaSqlType, tType, false, false, false); - } - - public boolean isPrimitiveType() { - return !isComplex; - } - - public boolean isQualifiedType() { - return isQualified; - } - - public boolean isComplexType() { - return isComplex; - } - - public boolean isCollectionType() { - return isCollection; - } - - public static Type getType(TTypeId tType) { - for (Type type : values()) { - if (tType.equals(type.tType)) { - return type; - } - } - throw new IllegalArgumentException("Unrecognized Thrift TTypeId value: " + tType); - } - - public static Type getType(String name) { - if (name == null) { - throw new IllegalArgumentException("Invalid type name: null"); - } - for (Type type : values()) { - if (name.equalsIgnoreCase(type.name)) { - return type; - } else if (type.isQualifiedType() || type.isComplexType()) { - if (name.toUpperCase().startsWith(type.name)) { - return type; - } - } - } - throw new IllegalArgumentException("Unrecognized type name: " + name); - } - - /** - * Radix for this type (typically either 2 or 10) - * Null is returned for data types where this is not applicable. - */ - public Integer getNumPrecRadix() { - if (this.isNumericType()) { - return 10; - } - return null; - } - - /** - * Maximum precision for numeric types. - * Returns null for non-numeric types. - * @return - */ - public Integer getMaxPrecision() { - switch (this) { - case TINYINT_TYPE: - return 3; - case SMALLINT_TYPE: - return 5; - case INT_TYPE: - return 10; - case BIGINT_TYPE: - return 19; - case FLOAT_TYPE: - return 7; - case DOUBLE_TYPE: - return 15; - case DECIMAL_TYPE: - return HiveDecimal.MAX_PRECISION; - default: - return null; - } - } - - public boolean isNumericType() { - switch (this) { - case TINYINT_TYPE: - case SMALLINT_TYPE: - case INT_TYPE: - case BIGINT_TYPE: - case FLOAT_TYPE: - case DOUBLE_TYPE: - case DECIMAL_TYPE: - return true; - default: - return false; - } - } - - /** - * Prefix used to quote a literal of this type (may be null) - */ - public String getLiteralPrefix() { - return null; - } - - /** - * Suffix used to quote a literal of this type (may be null) - * @return - */ - public String getLiteralSuffix() { - return null; - } - - /** - * Can you use NULL for this type? - * @return - * DatabaseMetaData.typeNoNulls - does not allow NULL values - * DatabaseMetaData.typeNullable - allows NULL values - * DatabaseMetaData.typeNullableUnknown - nullability unknown - */ - public Short getNullable() { - // All Hive types are nullable - return DatabaseMetaData.typeNullable; - } - - /** - * Is the type case sensitive? - * @return - */ - public Boolean isCaseSensitive() { - switch (this) { - case STRING_TYPE: - return true; - default: - return false; - } - } - - /** - * Parameters used in creating the type (may be null) - * @return - */ - public String getCreateParams() { - return null; - } - - /** - * Can you use WHERE based on this type? - * @return - * DatabaseMetaData.typePredNone - No support - * DatabaseMetaData.typePredChar - Only support with WHERE .. LIKE - * DatabaseMetaData.typePredBasic - Supported except for WHERE .. LIKE - * DatabaseMetaData.typeSearchable - Supported for all WHERE .. - */ - public Short getSearchable() { - if (isPrimitiveType()) { - return DatabaseMetaData.typeSearchable; - } - return DatabaseMetaData.typePredNone; - } - - /** - * Is this type unsigned? - * @return - */ - public Boolean isUnsignedAttribute() { - if (isNumericType()) { - return false; - } - return true; - } - - /** - * Can this type represent money? - * @return - */ - public Boolean isFixedPrecScale() { - return false; - } - - /** - * Can this type be used for an auto-increment value? - * @return - */ - public Boolean isAutoIncrement() { - return false; - } - - /** - * Localized version of type name (may be null). - * @return - */ - public String getLocalizedName() { - return null; - } - - /** - * Minimum scale supported for this type - * @return - */ - public Short getMinimumScale() { - return 0; - } - - /** - * Maximum scale supported for this type - * @return - */ - public Short getMaximumScale() { - return 0; - } - - public TTypeId toTType() { - return tType; - } - - public int toJavaSQLType() { - return javaSQLType; - } - - public String getName() { - return name; - } -} diff --git a/service/src/java/org/apache/hive/service/cli/TypeDescriptor.java b/service/src/java/org/apache/hive/service/cli/TypeDescriptor.java index b4a5b77..d634bef 100644 --- a/service/src/java/org/apache/hive/service/cli/TypeDescriptor.java +++ b/service/src/java/org/apache/hive/service/cli/TypeDescriptor.java @@ -20,6 +20,7 @@ import java.util.List; +import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hive.service.rpc.thrift.TPrimitiveTypeEntry; diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java index 2eeee47..3dd33f2 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetCatalogsOperation.java @@ -43,7 +43,7 @@ protected GetCatalogsOperation(HiveSession parentSession) { super(parentSession, OperationType.GET_CATALOGS); - rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); + rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false); } @Override diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java index 574a757..c075179 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetColumnsOperation.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hive.service.cli.ColumnDescriptor; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -44,7 +45,6 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; -import org.apache.hive.service.cli.Type; import org.apache.hive.service.cli.session.HiveSession; /** @@ -122,7 +122,7 @@ protected GetColumnsOperation(HiveSession parentSession, String catalogName, Str this.schemaName = schemaName; this.tableName = tableName; this.columnName = columnName; - this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); + this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false); } @Override diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java index d774f4f95..35b2e63 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetFunctionsOperation.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObjectUtils; +import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hive.service.cli.CLIServiceUtils; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; @@ -38,7 +39,6 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; -import org.apache.hive.service.cli.Type; import org.apache.hive.service.cli.session.HiveSession; import org.apache.thrift.TException; @@ -67,13 +67,13 @@ private final RowSet rowSet; - public GetFunctionsOperation(HiveSession parentSession, - String catalogName, String schemaName, String functionName) { + public GetFunctionsOperation(HiveSession parentSession, String catalogName, String schemaName, + String functionName) { super(parentSession, OperationType.GET_FUNCTIONS); this.catalogName = catalogName; this.schemaName = schemaName; this.functionName = functionName; - this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); + this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false); } @Override diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java index dc0a3dd..6013a3f 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetSchemasOperation.java @@ -46,12 +46,11 @@ private RowSet rowSet; - protected GetSchemasOperation(HiveSession parentSession, - String catalogName, String schemaName) { + protected GetSchemasOperation(HiveSession parentSession, String catalogName, String schemaName) { super(parentSession, OperationType.GET_SCHEMAS); this.catalogName = catalogName; this.schemaName = schemaName; - this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); + this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false); } @Override diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java index 13d5b37..874435b 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTableTypesOperation.java @@ -47,11 +47,10 @@ protected GetTableTypesOperation(HiveSession parentSession) { super(parentSession, OperationType.GET_TABLE_TYPES); - String tableMappingStr = getParentSession().getHiveConf(). - getVar(HiveConf.ConfVars.HIVE_SERVER2_TABLE_TYPE_MAPPING); - tableTypeMapping = - TableTypeMappingFactory.getTableTypeMapping(tableMappingStr); - rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); + String tableMappingStr = + getParentSession().getHiveConf().getVar(HiveConf.ConfVars.HIVE_SERVER2_TABLE_TYPE_MAPPING); + tableTypeMapping = TableTypeMappingFactory.getTableTypeMapping(tableMappingStr); + rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false); } @Override diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java index aac3692..68d093a 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTablesOperation.java @@ -84,7 +84,7 @@ protected GetTablesOperation(HiveSession parentSession, } else { tableTypeList = null; } - this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); + this.rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false); } @Override diff --git a/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java b/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java index 53660af..db19024 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java @@ -22,6 +22,7 @@ import java.util.Arrays; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; +import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.OperationState; @@ -29,7 +30,6 @@ import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; -import org.apache.hive.service.cli.Type; import org.apache.hive.service.cli.session.HiveSession; /** @@ -80,7 +80,7 @@ protected GetTypeInfoOperation(HiveSession parentSession) { super(parentSession, OperationType.GET_TYPE_INFO); - rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion()); + rowSet = RowSetFactory.create(RESULT_SET_SCHEMA, getProtocolVersion(), false); } @Override diff --git a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java index f5a9771..f18dc67 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java @@ -173,10 +173,10 @@ public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws H resetResultReader(); } List rows = readResults((int) maxRows); - RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion()); + RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion(), false); for (String row : rows) { - rowSet.addRow(new String[] {row}); + rowSet.addRow(new String[] { row }); } return rowSet; } diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java index 56a9c18..eb3ab21 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -280,11 +280,11 @@ public RowSet getOperationNextRowSet(OperationHandle opHandle, return getOperation(opHandle).getNextRowSet(orientation, maxRows); } - public RowSet getOperationLogRowSet(OperationHandle opHandle, - FetchOrientation orientation, long maxRows, HiveConf hConf) - throws HiveSQLException { + public RowSet getOperationLogRowSet(OperationHandle opHandle, FetchOrientation orientation, + long maxRows, HiveConf hConf) throws HiveSQLException { TableSchema tableSchema = new TableSchema(getLogSchema()); - RowSet rowSet = RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion()); + RowSet rowSet = + RowSetFactory.create(tableSchema, getOperation(opHandle).getProtocolVersion(), false); if (hConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED) == false) { LOG.warn("Try to get operation log when hive.server2.logging.operation.enabled is false, no log will be returned. "); @@ -296,7 +296,6 @@ public RowSet getOperationLogRowSet(OperationHandle opHandle, throw new HiveSQLException("Couldn't find log associated with operation handle: " + opHandle); } - // read logs List logs; try { @@ -305,10 +304,9 @@ public RowSet getOperationLogRowSet(OperationHandle opHandle, throw new HiveSQLException(e.getMessage(), e.getCause()); } - // convert logs to RowSet for (String log : logs) { - rowSet.addRow(new String[] {log}); + rowSet.addRow(new String[] { log }); } return rowSet; diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java index ce06c1c..b921e6e 100644 --- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.QueryDisplay; import org.apache.hadoop.hive.ql.exec.ExplainTask; +import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -52,6 +53,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.security.UserGroupInformation; @@ -356,12 +358,26 @@ public TableSchema getResultSetSchema() throws HiveSQLException { private transient final List convey = new ArrayList(); @Override - public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException { + public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) + throws HiveSQLException { + + HiveConf hiveConf = getConfigForOperation(); validateDefaultFetchOrientation(orientation); assertState(new ArrayList(Arrays.asList(OperationState.FINISHED))); - RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion()); + FetchTask fetchTask = driver.getFetchTask(); + boolean isBlobBased = false; + if (fetchTask != null && fetchTask.getWork().isHiveServerQuery() && HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS) + && (fetchTask.getTblDesc().getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class + .getName()))) { + // Just fetch one blob if we've serialized thrift objects in final tasks + maxRows = 1; + isBlobBased = true; + } + driver.setMaxRows((int) maxRows); + RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion(), isBlobBased); try { /* if client is requesting fetch-from-start and its not the first time reading from this operation * then reset the fetch position to beginning diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 0f36cd6..ce50967 100644 --- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.QueryPlan; -import org.apache.hadoop.hive.ql.exec.FetchFormatter; import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.history.HiveHistory; @@ -47,6 +46,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.processors.SetProcessor; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hive.common.util.HiveVersionInfo; import org.apache.hive.service.auth.HiveAuthFactory; @@ -134,9 +135,8 @@ public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol, S hiveConf.set(ConfVars.HIVESESSIONID.varname, this.sessionHandle.getHandleIdentifier().toString()); // Use thrift transportable formatter - hiveConf.set(ListSinkOperator.OUTPUT_FORMATTER, - FetchFormatter.ThriftFormatter.class.getName()); - hiveConf.setInt(ListSinkOperator.OUTPUT_PROTOCOL, protocol.getValue()); + hiveConf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, ThriftFormatter.class.getName()); + hiveConf.setInt(SerDeUtils.LIST_SINK_OUTPUT_PROTOCOL, protocol.getValue()); } public HiveSessionImpl(TProtocolVersion protocol, String username, String password, diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index e789a38..74263e3 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -424,7 +424,7 @@ private String getDelegationToken(String userName) return cliService.getDelegationTokenFromMetaStore(userName); } catch (UnsupportedOperationException e) { // The delegation token is not applicable in the given deployment mode - // such as HMS is not kerberos secured + // such as HMS is not kerberos secured } return null; } diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index 698b13d..ff7e9a4 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -47,7 +47,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; - +import org.apache.hadoop.hive.serde2.thrift.Type; import org.junit.After; import org.junit.Before; import org.junit.Test; diff --git a/service/src/test/org/apache/hive/service/cli/TestColumn.java b/service/src/test/org/apache/hive/service/cli/TestColumn.java index 87bf848..9980aba 100644 --- a/service/src/test/org/apache/hive/service/cli/TestColumn.java +++ b/service/src/test/org/apache/hive/service/cli/TestColumn.java @@ -17,6 +17,8 @@ */ package org.apache.hive.service.cli; +import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer; +import org.apache.hadoop.hive.serde2.thrift.Type; import org.junit.Test; import java.util.Arrays; @@ -57,7 +59,7 @@ public void testAllIntegerTypeValues() { Type type = (Type)entry.getKey(); List values = (List)entry.getValue(); - Column c = new Column(type); + ColumnBuffer c = new ColumnBuffer(type); for (Object v : values) { c.addValue(type, v); } @@ -73,7 +75,7 @@ public void testAllIntegerTypeValues() { @Test public void testFloatAndDoubleValues() { - Column floatColumn = new Column(Type.FLOAT_TYPE); + ColumnBuffer floatColumn = new ColumnBuffer(Type.FLOAT_TYPE); floatColumn.addValue(Type.FLOAT_TYPE, 1.1f); floatColumn.addValue(Type.FLOAT_TYPE, 2.033f); @@ -83,7 +85,7 @@ public void testFloatAndDoubleValues() { assertEquals(1.1, floatColumn.get(0)); assertEquals(2.033, floatColumn.get(1)); - Column doubleColumn = new Column(Type.DOUBLE_TYPE); + ColumnBuffer doubleColumn = new ColumnBuffer(Type.DOUBLE_TYPE); doubleColumn.addValue(Type.DOUBLE_TYPE, 1.1); doubleColumn.addValue(Type.DOUBLE_TYPE, 2.033); @@ -95,7 +97,7 @@ public void testFloatAndDoubleValues() { @Test public void testBooleanValues() { - Column boolColumn = new Column(Type.BOOLEAN_TYPE); + ColumnBuffer boolColumn = new ColumnBuffer(Type.BOOLEAN_TYPE); boolColumn.addValue(Type.BOOLEAN_TYPE, true); boolColumn.addValue(Type.BOOLEAN_TYPE, false); @@ -107,7 +109,7 @@ public void testBooleanValues() { @Test public void testStringValues() { - Column stringColumn = new Column(Type.STRING_TYPE); + ColumnBuffer stringColumn = new ColumnBuffer(Type.STRING_TYPE); stringColumn.addValue(Type.STRING_TYPE, "12abc456"); stringColumn.addValue(Type.STRING_TYPE, "~special$&string"); @@ -119,7 +121,7 @@ public void testStringValues() { @Test public void testBinaryValues() { - Column binaryColumn = new Column(Type.BINARY_TYPE); + ColumnBuffer binaryColumn = new ColumnBuffer(Type.BINARY_TYPE); binaryColumn.addValue(Type.BINARY_TYPE, new byte[]{-1, 0, 3, 4}); assertEquals(Type.BINARY_TYPE, binaryColumn.getType());