From 94a53f7230e8bb50d07bdb0e2cea1627683e7c17 Mon Sep 17 00:00:00 2001 From: kliewkliew Date: Mon, 22 Aug 2016 04:08:27 -0700 Subject: [PATCH] HIVE-13680: HiveServer2: Provide a way to compress ResultSets --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 9 +- .../java/org/apache/hive/jdbc/HiveConnection.java | 22 ++ .../org/apache/hive/jdbc/HiveQueryResultSet.java | 5 +- .../java/org/apache/hive/jdbc/HiveStatement.java | 2 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 8 + .../hadoop/hive/ql/session/SessionState.java | 1 - .../apache/hadoop/hive/serde/serdeConstants.java | 4 + .../hadoop/hive/serde2/compression/CompDe.java | 81 +++++ .../serde2/compression/CompDeServiceLoader.java | 88 +++++ .../hive/serde2/thrift/ThriftJDBCBinarySerDe.java | 84 +++-- .../hive/serde2/compression/SnappyCompDe.java | 398 +++++++++++++++++++++ .../hive/serde2/compression/TestSnappyCompDe.java | 248 +++++++++++++ service-rpc/if/TCLIService.thrift | 7 +- .../src/gen/thrift/gen-cpp/TCLIService_types.cpp | 50 ++- .../hive/service/rpc/thrift/TOpenSessionResp.java | 244 +++++++++---- service-rpc/src/gen/thrift/gen-php/Types.php | 45 ++- .../src/gen/thrift/gen-py/TCLIService/ttypes.py | 35 +- .../src/gen/thrift/gen-rb/t_c_l_i_service_types.rb | 6 +- .../apache/hive/service/cli/ColumnBasedSet.java | 43 ++- .../org/apache/hive/service/cli/RowSetFactory.java | 12 +- .../hive/service/cli/thrift/ThriftCLIService.java | 66 +++- .../service/cli/thrift/ThriftCLIServiceClient.java | 2 +- .../cli/compression/TestCompDeNegotiation.java | 254 +++++++++++++ 23 files changed, 1551 insertions(+), 163 deletions(-) create mode 100644 serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDe.java create mode 100644 serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDeServiceLoader.java create mode 100644 serde/src/test/org/apache/hadoop/hive/serde2/compression/SnappyCompDe.java create mode 100644 serde/src/test/org/apache/hadoop/hive/serde2/compression/TestSnappyCompDe.java create mode 100644 service/src/test/org/apache/hive/service/cli/compression/TestCompDeNegotiation.java diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index da6e97a..db089c7 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2480,10 +2480,17 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS("hive.server2.thrift.resultset.serialize.in.tasks", false, "Whether we should serialize the Thrift structures used in JDBC ResultSet RPC in task nodes.\n " + "We use SequenceFile and ThriftJDBCBinarySerDe to read and write the final results if this is true."), - // TODO: Make use of this config to configure fetch size HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE("hive.server2.thrift.resultset.max.fetch.size", 1000, "Max number of rows sent in one Fetch RPC call by the server to the client."), + // ResultSet compression settings + HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST("hive.server2.thrift.resultset.compressor.list", "", + "A list of compressors ordered by the server's preference.\n " + + "This list will be used to negotiate a CompDe for each session."), + HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR("hive.server2.thrift.resultset.compressor", "", + "A CompDe that will be used for the session.\n " + + "This is determined by negotiation during OpenSession."), + HIVE_SERVER2_XSRF_FILTER_ENABLED("hive.server2.xsrf.filter.enabled",false, "If enabled, HiveServer2 will block any requests made to it over http " + "if an X-XSRF-HEADER header is not present"), diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index ad96a64..4b51eb5 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -19,6 +19,9 @@ package org.apache.hive.jdbc; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.serde2.compression.CompDe; +import org.apache.hadoop.hive.serde2.compression.CompDeServiceLoader; import org.apache.hive.jdbc.Utils.JdbcConnectionParams; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.KerberosSaslHelper; @@ -125,6 +128,7 @@ private int loginTimeout = 0; private TProtocolVersion protocol; private int fetchSize = HiveStatement.DEFAULT_FETCH_SIZE; + private CompDe sessCompDe; public HiveConnection(String uri, Properties info) throws SQLException { setupLoginTimeout(); @@ -575,6 +579,20 @@ private void openSession() throws SQLException { try { TOpenSessionResp openResp = client.OpenSession(openReq); + // Server initialized CompDe + if (openResp.isSetCompressorName() && CompDeServiceLoader.getInstance().hasCompDe(openResp.getCompressorName())) { + CompDe testCompDe = CompDeServiceLoader.getInstance().getCompDe(openResp.getCompressorName()); + // And the client initialized properly with the same config + if (testCompDe.init(openResp.getCompressorConfiguration()) + && testCompDe.getConfig().equals(openResp.getCompressorConfiguration())) { + sessCompDe = testCompDe; + } + else { + openReq.getConfiguration().remove("set:hiveconf:" + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST.varname); + openResp = client.OpenSession(openReq); + } + } + // validate connection Utils.verifySuccess(openResp.getStatus()); if (!supportedProtocols.contains(openResp.getServerProtocolVersion())) { @@ -917,6 +935,10 @@ public String getClientInfo(String name) throws SQLException { throw new SQLException("Method not supported"); } + public CompDe getCompDe() { + return sessCompDe; + } + /* * (non-Javadoc) * diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java index 92fdbca..6c5183d 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java @@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.compression.CompDe; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; @@ -78,6 +79,7 @@ private boolean fetchFirst = false; private final TProtocolVersion protocol; + private final CompDe compDe; public static class Builder { @@ -193,6 +195,7 @@ protected HiveQueryResultSet(Builder builder) throws SQLException { normalizedColumnNames = new ArrayList(); columnTypes = new ArrayList(); columnAttributes = new ArrayList(); + compDe = ((HiveConnection) builder.connection).getCompDe(); if (builder.retrieveSchema) { retrieveSchema(); } else { @@ -373,7 +376,7 @@ public boolean next() throws SQLException { Utils.verifySuccessWithInfo(fetchResp.getStatus()); TRowSet results = fetchResp.getResults(); - fetchedRows = RowSetFactory.create(results, protocol); + fetchedRows = RowSetFactory.create(results, protocol, compDe); fetchedRowsItr = fetchedRows.iterator(); } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index a242501..22342a4 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -916,7 +916,7 @@ public boolean hasMoreLogs() { try { RowSet rowSet; - rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), connection.getProtocol()); + rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), connection.getProtocol(), connection.getCompDe()); for (Object[] row : rowSet) { logs.add(String.valueOf(row[0])); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 66589fe..302678a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7117,6 +7117,14 @@ Operator genConversionSelectOperator(String dest, QB qb, Operator input, try { Deserializer deserializer = table_desc.getDeserializerClass() .newInstance(); + + String compDeName = SessionState.get().getConf().getVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR); + if (!compDeName.isEmpty()) { + table_desc.getProperties().put(serdeConstants.COMPDE_NAME, compDeName); + table_desc.getProperties().put(serdeConstants.COMPDE_CONFIG, + SessionState.get().getConf().getValByRegex(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR + "\\." + compDeName + "\\.[\\w|\\d]+")); + } + SerDeUtils.initializeSerDe(deserializer, conf, table_desc.getProperties(), null); oi = (StructObjectInspector) deserializer.getObjectInspector(); } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 408c92e..82053d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -285,7 +285,6 @@ public HiveConf getConf() { return sessionConf; } - public File getTmpOutputFile() { return tmpOutputFile; } diff --git a/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java b/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java index 04ed8f5..f3714e2 100644 --- a/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java +++ b/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java @@ -58,6 +58,10 @@ public static final String SERIALIZATION_ENCODING = "serialization.encoding"; + public static final String COMPDE_NAME = "compde.name"; + + public static final String COMPDE_CONFIG = "compde.config"; + public static final String FIELD_DELIM = "field.delim"; public static final String COLLECTION_DELIM = "colelction.delim"; diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDe.java new file mode 100644 index 0000000..9e9992f --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDe.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.serde2.compression; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface CompDe { + + /** + * Initialize the plug-in by overlaying the input configuration map + * onto the plug-in's default configuration. + * + * @param config Overlay configuration map + * + * @return True if initialization was successful + */ + public boolean init(Map config); + + /** + * Return the configuration settings of the CompDe after initialization. + * + * @return + */ + public Map getConfig(); + + /** + * Compress a set of columns + * + * @param colSet + * + * @return Bytes representing the compressed set. + */ + public byte[] compress(ColumnBuffer[] colSet); + + /** + * Decompress a set of columns + * + * @param input + * @param inputOffset + * @param inputLength + * + * @return The set of columns. + */ + public ColumnBuffer[] decompress(byte[] input, int inputOffset, int inputLength); + + /** + * + * @return The plug-in name + */ + public String getName(); + + /** + * Provide a namespace for the plug-in + * + * @return The vendor name + */ + public String getVendor(); +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDeServiceLoader.java b/serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDeServiceLoader.java new file mode 100644 index 0000000..d2ce7ce --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDeServiceLoader.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.compression; + +import java.util.Iterator; +import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Load classes that implement CompDe when starting up, and serve them at run + * time. + * + */ +public class CompDeServiceLoader { + private static CompDeServiceLoader instance; + // Map CompDe names to classes so we don't have to read the META-INF file for every session. + private ConcurrentHashMap> compressorTable + = new ConcurrentHashMap>(); + public static final Logger LOG = LoggerFactory.getLogger(CompDeServiceLoader.class); + + /** + * Get the singleton instance or initialize the CompDeServiceLoader. + * + * @return A singleton instance of CompDeServiceLoader. + */ + public static synchronized CompDeServiceLoader getInstance() { + if (instance == null) { + Iterator compressors = ServiceLoader.load(CompDe.class).iterator(); + instance = new CompDeServiceLoader(); + while (compressors.hasNext()) { + CompDe compressor = compressors.next(); + instance.compressorTable.put( + compressor.getVendor() + "." + compressor.getName(), + compressor.getClass()); + } + } + return instance; + } + + /** + * Get the CompDe if the compressor class was loaded from CLASSPATH. + * + * @param compDeName + * The compressor name qualified by the vendor namespace. + * + * @return A CompDe implementation object + */ + public CompDe getCompDe(String compDeName) { + try { + return compressorTable.get(compDeName).newInstance(); + } catch (Exception e) { + LOG.warn("Plug-in not found for " + compDeName + " CompDe"); + return null; + } + } + + /** + * Does the server have a plugin for the specified name? + * + * @param compDeName + * The compressor name qualified by the vendor namespace. + * + * @return + */ + public boolean hasCompDe(String compDeName) { + return compressorTable.containsKey(compDeName); + } + +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java index 5c31974..79bba76 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java @@ -18,18 +18,24 @@ package org.apache.hadoop.hive.serde2.thrift; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.compression.CompDe; +import org.apache.hadoop.hive.serde2.compression.CompDeServiceLoader; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -65,12 +71,27 @@ private int MAX_BUFFERED_ROWS; private int count; private StructObjectInspector rowObjectInspector; + private CompDe compDe; @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { + if (tbl.containsKey(serdeConstants.COMPDE_NAME)) { + String compDeName = tbl.getProperty(serdeConstants.COMPDE_NAME, null); + if (CompDeServiceLoader.getInstance().hasCompDe(compDeName)) { + compDe = CompDeServiceLoader.getInstance().getCompDe(compDeName); + if (tbl.containsKey(serdeConstants.COMPDE_CONFIG)) { + Map compDeConfig = (Map) tbl.get("compde.config"); + compDe.init(compDeConfig); + } + else { + compDe.init(new HashMap()); + } + } + } + // Get column names - MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); + MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); if (columnNameProperty.length() == 0) { @@ -102,32 +123,43 @@ public void initialize(Configuration conf, Properties tbl) throws SerDeException } private Writable serializeBatch() throws SerDeException { - output.reset(); - for (int i = 0; i < columnBuffers.length; i++) { - TColumn tColumn = columnBuffers[i].toTColumn(); - try { - tColumn.write(protocol); - } catch(TException e) { - throw new SerDeException(e); - } - } - initializeRowAndColumns(); - serializedBytesWritable.set(output.getData(), 0, output.getLength()); - return serializedBytesWritable; + output.reset(); + + if (compDe != null) { + try { + protocol.writeBinary(ByteBuffer.wrap(compDe.compress(columnBuffers))); + } catch (TException e) { + throw new SerDeException(e); + } + } + else { + for (int i = 0; i < columnBuffers.length; i++) { + TColumn tColumn = columnBuffers[i].toTColumn(); + try { + tColumn.write(protocol); + } catch(TException e) { + throw new SerDeException(e); + } + } + } + + initializeRowAndColumns(); + serializedBytesWritable.set(output.getData(), 0, output.getLength()); + return serializedBytesWritable; } // use the columnNames to initialize the reusable row object and the columnBuffers. reason this is being done is if buffer is full, we should reinitialize the // column buffers, otherwise at the end when closeOp() is called, things get printed multiple times. private void initializeRowAndColumns() { - row = new ArrayList(columnNames.size()); - for (int i = 0; i < columnNames.size(); i++) { - row.add(null); - } - // Initialize column buffers - columnBuffers = new ColumnBuffer[columnNames.size()]; - for (int i = 0; i < columnBuffers.length; i++) { - columnBuffers[i] = new ColumnBuffer(Type.getType(columnTypes.get(i))); - } + row = new ArrayList(columnNames.size()); + for (int i = 0; i < columnNames.size(); i++) { + row.add(null); + } + // Initialize column buffers + columnBuffers = new ColumnBuffer[columnNames.size()]; + for (int i = 0; i < columnBuffers.length; i++) { + columnBuffers[i] = new ColumnBuffer(Type.getType(columnTypes.get(i))); + } } /** @@ -142,10 +174,10 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDe StructObjectInspector soi = (StructObjectInspector) objInspector; List fields = soi.getAllStructFieldRefs(); try { - Object[] formattedRow = (Object[]) thriftFormatter.convert(obj, objInspector); - for (int i = 0; i < columnNames.size(); i++) { - columnBuffers[i].addValue(formattedRow[i]); - } + Object[] formattedRow = (Object[]) thriftFormatter.convert(obj, objInspector); + for (int i = 0; i < columnNames.size(); i++) { + columnBuffers[i].addValue(formattedRow[i]); + } } catch (Exception e) { throw new SerDeException(e); } diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/compression/SnappyCompDe.java b/serde/src/test/org/apache/hadoop/hive/serde2/compression/SnappyCompDe.java new file mode 100644 index 0000000..8e6d8b1 --- /dev/null +++ b/serde/src/test/org/apache/hadoop/hive/serde2/compression/SnappyCompDe.java @@ -0,0 +1,398 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.compression; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer; +import org.apache.hive.service.rpc.thrift.*; +import org.xerial.snappy.Snappy; + +public class SnappyCompDe implements CompDe { + + /** + * Initialize the plug-in by overlaying the input configuration map + * onto the plug-in's default configuration. + * + * @param config Overlay configuration map + * + * @return True is initialization was successful + */ + @Override + public boolean init(Map config) { + return true; + } + + /** + * Return the configuration settings of the CompDe + * + * @return + */ + @Override + public Map getConfig() { + return new HashMap(); + } + + /** + * Compress a set of columns. + * 1. write the number of columns + * 2. for each column, write: + * - the data type + * - the size of the nulls binary + * - the nulls data + * - for string and binary rows: write the number of rows in the column followed by the size of each row + * - the actual data (string and binary columns are flattened) + * + * @param colSet + * + * @return Bytes representing the compressed set. + */ + @Override + public byte[] compress(ColumnBuffer[] colSet) { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + DataOutputStream bufferedDataStream = new DataOutputStream(new BufferedOutputStream(bytesOut)); + + try { + bufferedDataStream.writeInt(colSet.length); + + for (int colNum = 0; colNum < colSet.length; colNum++) { + + bufferedDataStream.write(colSet[colNum].getType().toTType().getValue()); + + switch (TTypeId.findByValue(colSet[colNum].getType().toTType().getValue())) { + case BOOLEAN_TYPE: { + TBoolColumn column = colSet[colNum].toTColumn().getBoolVal(); + + List bools = column.getValues(); + BitSet bsBools = new BitSet(bools.size()); + for (int rowNum = 0; rowNum < bools.size(); rowNum++) { + bsBools.set(rowNum, bools.get(rowNum)); + } + + writePrimitives(column.getNulls(), bufferedDataStream); + + // BitSet won't write trailing zeroes so we encode the length + bufferedDataStream.writeInt(column.getValuesSize()); + + writePrimitives(bsBools.toByteArray(), bufferedDataStream); + + break; + } + case TINYINT_TYPE: { + TByteColumn column = colSet[colNum].toTColumn().getByteVal(); + writePrimitives(column.getNulls(), bufferedDataStream); + writeBoxedBytes(column.getValues(), bufferedDataStream); + break; + } + case SMALLINT_TYPE: { + TI16Column column = colSet[colNum].toTColumn().getI16Val(); + writePrimitives(column.getNulls(), bufferedDataStream); + writeBoxedShorts(column.getValues(), bufferedDataStream); + break; + } + case INT_TYPE: { + TI32Column column = colSet[colNum].toTColumn().getI32Val(); + writePrimitives(column.getNulls(), bufferedDataStream); + writeBoxedIntegers(column.getValues(), bufferedDataStream); + break; + } + case BIGINT_TYPE: { + TI64Column column = colSet[colNum].toTColumn().getI64Val(); + writePrimitives(column.getNulls(), bufferedDataStream); + writeBoxedLongs(column.getValues(), bufferedDataStream); + break; + } + case DOUBLE_TYPE: { + TDoubleColumn column = colSet[colNum].toTColumn().getDoubleVal(); + writePrimitives(column.getNulls(), bufferedDataStream); + writeBoxedDoubles(column.getValues(), bufferedDataStream); + break; + } + case BINARY_TYPE: { + TBinaryColumn column = colSet[colNum].toTColumn().getBinaryVal(); + + // Flatten the data for Snappy + int[] rowSizes = new int[column.getValuesSize()]; + ByteArrayOutputStream flattenedData = new ByteArrayOutputStream(); + + for (int rowNum = 0; rowNum < column.getValuesSize(); rowNum++) { + byte[] row = column.getValues().get(rowNum).array(); + rowSizes[rowNum] = row.length; + flattenedData.write(row); + } + + // Write nulls bitmap + writePrimitives(column.getNulls(), bufferedDataStream); + + // Write the list of row sizes + writePrimitives(rowSizes, bufferedDataStream); + + // Write the flattened data + writePrimitives(flattenedData.toByteArray(), bufferedDataStream); + + break; + } + case STRING_TYPE: { + TStringColumn column = colSet[colNum].toTColumn().getStringVal(); + + // Flatten the data for Snappy + int[] rowSizes = new int[column.getValuesSize()]; + ByteArrayOutputStream flattenedData = new ByteArrayOutputStream(); + + for (int rowNum = 0; rowNum < column.getValuesSize(); rowNum++) { + byte[] row = column.getValues().get(rowNum).getBytes(StandardCharsets.UTF_8); + rowSizes[rowNum] = row.length; + flattenedData.write(row); + } + + // Write nulls bitmap + writePrimitives(column.getNulls(), bufferedDataStream); + + // Write the list of row sizes + writePrimitives(rowSizes, bufferedDataStream); + + // Write the flattened data + writePrimitives(flattenedData.toByteArray(), bufferedDataStream); + + break; + } + default: + throw new IllegalStateException("Unrecognized column type"); + } + } + bufferedDataStream.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + return bytesOut.toByteArray(); + } + + /** + * Write the length, and data to the output stream. + * + * @param boxedVals A List of boxed Java-primitives. + * @param outputStream + * @throws IOException + */ + private void writeBoxedBytes(List boxedVals, DataOutputStream outputStream) throws IOException { + byte[] compressedVals = new byte[0]; + compressedVals = Snappy.compress(ArrayUtils.toPrimitive(boxedVals.toArray(new Byte[0]))); + writeBytes(compressedVals, outputStream); + } + private void writeBoxedShorts(List boxedVals, DataOutputStream outputStream) throws IOException { + byte[] compressedVals = new byte[0]; + compressedVals = Snappy.compress(ArrayUtils.toPrimitive(boxedVals.toArray(new Short[0]))); + writeBytes(compressedVals, outputStream); + } + private void writeBoxedIntegers(List boxedVals, DataOutputStream outputStream) throws IOException { + byte[] compressedVals = new byte[0]; + compressedVals = Snappy.compress(ArrayUtils.toPrimitive(boxedVals.toArray(new Integer[0]))); + writeBytes(compressedVals, outputStream); + } + private void writeBoxedLongs(List boxedVals, DataOutputStream outputStream) throws IOException { + byte[] compressedVals = new byte[0]; + compressedVals = Snappy.compress(ArrayUtils.toPrimitive(boxedVals.toArray(new Long[0]))); + writeBytes(compressedVals, outputStream); + } + private void writeBoxedDoubles(List boxedVals, DataOutputStream outputStream) throws IOException { + byte[] compressedVals = new byte[0]; + compressedVals = Snappy.compress(ArrayUtils.toPrimitive(boxedVals.toArray(new Double[0]))); + writeBytes(compressedVals, outputStream); + } + + /** + * Write the length, and data to the output stream. + * @param primitives + * @param outputStream + * @throws IOException + */ + private void writePrimitives(byte[] primitives, DataOutputStream outputStream) throws IOException { + writeBytes(Snappy.compress(primitives), outputStream); + } + private void writePrimitives(int[] primitives, DataOutputStream outputStream) throws IOException { + writeBytes(Snappy.compress(primitives), outputStream); + } + + private void writeBytes(byte[] bytes, DataOutputStream outputStream) throws IOException { + outputStream.writeInt(bytes.length); + outputStream.write(bytes); + } + + /** + * Decompress a set of columns + * + * @param input + * @param inputOffset + * @param inputLength + * + * @return The set of columns. + */ + @Override + public ColumnBuffer[] decompress(byte[] input, int inputOffset, int inputLength) { + ByteArrayInputStream inputStream = new ByteArrayInputStream(input, inputOffset, inputLength); + DataInputStream bufferedInput = new DataInputStream(new BufferedInputStream(inputStream)); + + try { + int numOfCols = bufferedInput.readInt(); + + ColumnBuffer[] outputCols = new ColumnBuffer[numOfCols]; + for (int colNum = 0; colNum < numOfCols; colNum++) { + int columnType = bufferedInput.read(); + + byte[] nulls = Snappy.uncompress(readCompressedChunk(bufferedInput)); + + switch (TTypeId.findByValue(columnType)) { + case BOOLEAN_TYPE: { + int numRows = bufferedInput.readInt(); + byte[] vals = Snappy.uncompress(readCompressedChunk(bufferedInput)); + BitSet bsBools = BitSet.valueOf(vals); + + boolean[] bools = new boolean[numRows]; + for (int rowNum = 0; rowNum < numRows; rowNum++) { + bools[rowNum] = bsBools.get(rowNum); + } + + TBoolColumn column = new TBoolColumn(Arrays.asList(ArrayUtils.toObject(bools)), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.boolVal(column)); + break; + } + case TINYINT_TYPE: { + byte[] vals = Snappy.uncompress(readCompressedChunk(bufferedInput)); + TByteColumn column = new TByteColumn(Arrays.asList(ArrayUtils.toObject(vals)), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.byteVal(column)); + break; + } + case SMALLINT_TYPE: { + short[] vals = Snappy.uncompressShortArray(readCompressedChunk(bufferedInput)); + TI16Column column = new TI16Column(Arrays.asList(ArrayUtils.toObject(vals)), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.i16Val(column)); + break; + } + case INT_TYPE: { + int[] vals = Snappy.uncompressIntArray(readCompressedChunk(bufferedInput)); + TI32Column column = new TI32Column(Arrays.asList(ArrayUtils.toObject(vals)), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.i32Val(column)); + break; + } + case BIGINT_TYPE: { + long[] vals = Snappy.uncompressLongArray(readCompressedChunk(bufferedInput)); + TI64Column column = new TI64Column(Arrays.asList(ArrayUtils.toObject(vals)), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.i64Val(column)); + break; + } + case DOUBLE_TYPE: { + double[] vals = Snappy.uncompressDoubleArray(readCompressedChunk(bufferedInput)); + TDoubleColumn column = new TDoubleColumn(Arrays.asList(ArrayUtils.toObject(vals)), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.doubleVal(column)); + break; + } + case BINARY_TYPE: { + int[] rowSizes = Snappy.uncompressIntArray(readCompressedChunk(bufferedInput)); + + BufferedInputStream flattenedRows = new BufferedInputStream(new ByteArrayInputStream( + Snappy.uncompress(readCompressedChunk(bufferedInput)))); + + ByteBuffer[] vals = new ByteBuffer[rowSizes.length]; + + for (int rowNum = 0; rowNum < rowSizes.length; rowNum++) { + byte[] row = new byte[rowSizes[rowNum]]; + flattenedRows.read(row, 0, rowSizes[rowNum]); + vals[rowNum] = ByteBuffer.wrap(row); + } + + TBinaryColumn column = new TBinaryColumn(Arrays.asList(vals), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.binaryVal(column)); + break; + } + case STRING_TYPE: { + int[] rowSizes = Snappy.uncompressIntArray(readCompressedChunk(bufferedInput)); + + byte[] flattenedBytes = Snappy.uncompress(readCompressedChunk(bufferedInput)); + BufferedInputStream flattenedRows = new BufferedInputStream(new ByteArrayInputStream(flattenedBytes)); + + String[] vals = new String[rowSizes.length]; + + for (int rowNum = 0; rowNum < rowSizes.length; rowNum++) { + byte[] row = new byte[rowSizes[rowNum]]; + flattenedRows.read(row, 0, rowSizes[rowNum]); + vals[rowNum] = new String(row, StandardCharsets.UTF_8); + } + + TStringColumn column = new TStringColumn(Arrays.asList(vals), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.stringVal(column)); + break; + } + default: + throw new IllegalStateException("Unrecognized column type: " + TTypeId.findByValue(columnType)); + } + } + return outputCols; + } catch (IOException e) { + e.printStackTrace(); + return (ColumnBuffer[]) null; + } + } + + /** + * Read a compressed chunk from a stream. + * @param input + * @return + * @throws IOException + */ + public byte[] readCompressedChunk(DataInputStream input) throws IOException { + int compressedValsSize = input.readInt(); + byte[] compressedVals = new byte[compressedValsSize]; + input.read(compressedVals, 0, compressedValsSize); + return compressedVals; + } + + /** + * + * @return The plug-in name + */ + @Override + public String getName(){ + return "snappy"; + } + + /** + * Provide a namespace for the plug-in + * + * @return The vendor name + */ + @Override + public String getVendor() { + return "snappy"; + } + +} diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/compression/TestSnappyCompDe.java b/serde/src/test/org/apache/hadoop/hive/serde2/compression/TestSnappyCompDe.java new file mode 100644 index 0000000..c5fcd34 --- /dev/null +++ b/serde/src/test/org/apache/hadoop/hive/serde2/compression/TestSnappyCompDe.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.compression; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.serde2.compression.SnappyCompDe; +import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer; +import org.apache.hive.service.rpc.thrift.*; + +import static org.junit.Assert.assertArrayEquals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; + +import org.junit.Before; +import org.junit.Test; + +public class TestSnappyCompDe { + private static HiveConf hiveConf = new HiveConf(); + private SnappyCompDe compDe = new SnappyCompDe(); + byte[] noNullMask = {0}; + byte[] firstNullMask = {1}; + byte[] secondNullMask = {2}; + byte[] thirdNullMask = {3}; + ColumnBuffer columnBinary; + ColumnBuffer columnBool; + ColumnBuffer columnByte; + ColumnBuffer columnShort; + ColumnBuffer columnInt; + ColumnBuffer columnLong; + ColumnBuffer columnDouble; + ColumnBuffer columnStr; + + @Before + public void init() { + hiveConf.setVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR, compDe.getVendor() + "." + compDe.getName()); + + byte[] firstRow = {2, 33, 7, 75, 5}; + byte[] secondRow = {3, 21, 6}; + byte[] thirdRow = {52, 25, 74, 74, 64}; + ArrayList someBinaries = new ArrayList(); + someBinaries.add(ByteBuffer.wrap(firstRow)); + someBinaries.add(ByteBuffer.wrap(secondRow)); + someBinaries.add(ByteBuffer.wrap(thirdRow)); + columnBinary = new ColumnBuffer(TColumn.binaryVal( + new TBinaryColumn(someBinaries, ByteBuffer.wrap(new byte[]{})))); + + // Test leading and trailing `false` in column + ArrayList bools = new ArrayList(); + bools.add(false); + bools.add(true); + bools.add(false); + bools.add(true); + bools.add(false); + columnBool = new ColumnBuffer(TColumn.boolVal( + new TBoolColumn(bools, ByteBuffer.wrap(noNullMask)))); + + ArrayList bytes = new ArrayList(); + bytes.add((byte) 0); + bytes.add((byte) 1); + bytes.add((byte) 2); + bytes.add((byte) 3); + columnByte= new ColumnBuffer(TColumn.byteVal( + new TByteColumn(bytes, ByteBuffer.wrap(noNullMask)))); + + ArrayList shorts = new ArrayList(); + shorts.add((short) 0); + shorts.add((short) 1); + shorts.add((short) -127); + shorts.add((short) 127); + columnShort= new ColumnBuffer(TColumn.i16Val( + new TI16Column(shorts, ByteBuffer.wrap(noNullMask)))); + + ArrayList ints = new ArrayList(); + ints.add(0); + ints.add(1); + ints.add(-32767); + ints.add(32767); + columnInt= new ColumnBuffer(TColumn.i32Val( + new TI32Column(ints, ByteBuffer.wrap(noNullMask)))); + + ArrayList longs = new ArrayList(); + longs.add((long) 0); + longs.add((long) 1); + longs.add((long) -2147483647 ); + longs.add((long) 2147483647 ); + columnLong = new ColumnBuffer(TColumn.i64Val( + new TI64Column(longs, ByteBuffer.wrap(noNullMask)))); + + ArrayList doubles = new ArrayList(); + doubles.add((double) 0); + doubles.add((double) 1.0); + doubles.add((double) -2147483647.5 ); + doubles.add((double) 2147483647.5 ); + columnDouble= new ColumnBuffer(TColumn.doubleVal( + new TDoubleColumn(doubles, ByteBuffer.wrap(noNullMask)))); + + ArrayList strings = new ArrayList(); + strings.add("ABC"); + strings.add("DEFG"); + strings.add("HI"); + strings.add(StringUtils.rightPad("", 65535, 'j')); + strings.add(""); + columnStr = new ColumnBuffer(TColumn.stringVal( + new TStringColumn(strings, ByteBuffer.wrap(noNullMask)))); + } + + @Test + public void testBinaryCol() { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnBinary}; + + byte[] compressed= compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, 0, compressed.length); + + assertArrayEquals( + inputCols[0].toTColumn().getBinaryVal().getValues().toArray(), + outputCols[0].toTColumn().getBinaryVal().getValues().toArray()); + } + + @Test + public void testBoolCol() { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnBool}; + + byte[] compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, 0, compressed.length); + + assertArrayEquals( + inputCols[0].toTColumn().getBoolVal().getValues().toArray(), + outputCols[0].toTColumn().getBoolVal().getValues().toArray()); + } + + @Test + public void testByteCol() { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnByte}; + + byte[] compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, 0, compressed.length); + + assertArrayEquals( + inputCols[0].toTColumn().getByteVal().getValues().toArray(), + outputCols[0].toTColumn().getByteVal().getValues().toArray()); + } + + @Test + public void testIntCol() { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnInt}; + + byte[] compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, 0, compressed.length); + + assertArrayEquals( + inputCols[0].toTColumn().getI32Val().getValues().toArray(), + outputCols[0].toTColumn().getI32Val().getValues().toArray()); + } + + @Test + public void testLongCol() { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnLong}; + + byte[] compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, 0, compressed.length); + + assertArrayEquals( + inputCols[0].toTColumn().getI64Val().getValues().toArray(), + outputCols[0].toTColumn().getI64Val().getValues().toArray()); + } + + @Test + public void testDoubleCol() { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnDouble}; + + byte[] compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, 0, compressed.length); + + assertArrayEquals( + inputCols[0].toTColumn().getDoubleVal().getValues().toArray(), + outputCols[0].toTColumn().getDoubleVal().getValues().toArray()); + } + + @Test + public void testStringCol() { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnStr}; + + byte[] compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, 0, compressed.length); + + assertArrayEquals( + inputCols[0].toTColumn().getStringVal().getValues().toArray(), + outputCols[0].toTColumn().getStringVal().getValues().toArray()); + } + + @Test + public void testNulls() { + ColumnBuffer[] inputCols; + ArrayList someStrings = new ArrayList(); + someStrings.add("test1"); + someStrings.add("test2"); + ColumnBuffer columnStr1 = new ColumnBuffer(TColumn.stringVal( + new TStringColumn(someStrings, ByteBuffer.wrap(firstNullMask)))); + ColumnBuffer columnStr2 = new ColumnBuffer(TColumn.stringVal( + new TStringColumn(someStrings, ByteBuffer.wrap(secondNullMask)))); + ColumnBuffer columnStr3 = new ColumnBuffer(TColumn.stringVal( + new TStringColumn(someStrings, ByteBuffer.wrap(thirdNullMask)))); + + inputCols = new ColumnBuffer[]{ + columnStr1, + columnStr2, + columnStr3}; + + byte[] compressedCols = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressedCols, 0, compressedCols.length); + + assertArrayEquals(inputCols, outputCols); + } + + @Test + public void testMulti() { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnInt, columnStr}; + + byte[] compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, 0, compressed.length); + + assertArrayEquals( + inputCols[0].toTColumn().getI32Val().getValues().toArray(), + outputCols[0].toTColumn().getI32Val().getValues().toArray()); + assertArrayEquals( + inputCols[1].toTColumn().getStringVal().getValues().toArray(), + outputCols[1].toTColumn().getStringVal().getValues().toArray()); + } +} diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift index a4fa7b0..768f907 100644 --- a/service-rpc/if/TCLIService.thrift +++ b/service-rpc/if/TCLIService.thrift @@ -583,8 +583,11 @@ struct TOpenSessionResp { // Session Handle 3: optional TSessionHandle sessionHandle - // The configuration settings for this session. - 4: optional map configuration + // The CompDe configuration settings for this session. + 4: optional map compressorConfiguration + + // The name of the CompDe for this session. + 5: optional string compressorName } diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp index 2f460e8..2a3bab0 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp @@ -4814,9 +4814,14 @@ void TOpenSessionResp::__set_sessionHandle(const TSessionHandle& val) { __isset.sessionHandle = true; } -void TOpenSessionResp::__set_configuration(const std::map & val) { - this->configuration = val; -__isset.configuration = true; +void TOpenSessionResp::__set_compressorConfiguration(const std::map & val) { + this->compressorConfiguration = val; +__isset.compressorConfiguration = true; +} + +void TOpenSessionResp::__set_compressorName(const std::string& val) { + this->compressorName = val; +__isset.compressorName = true; } uint32_t TOpenSessionResp::read(::apache::thrift::protocol::TProtocol* iprot) { @@ -4871,7 +4876,7 @@ uint32_t TOpenSessionResp::read(::apache::thrift::protocol::TProtocol* iprot) { case 4: if (ftype == ::apache::thrift::protocol::T_MAP) { { - this->configuration.clear(); + this->compressorConfiguration.clear(); uint32_t _size193; ::apache::thrift::protocol::TType _ktype194; ::apache::thrift::protocol::TType _vtype195; @@ -4881,12 +4886,20 @@ uint32_t TOpenSessionResp::read(::apache::thrift::protocol::TProtocol* iprot) { { std::string _key198; xfer += iprot->readString(_key198); - std::string& _val199 = this->configuration[_key198]; + std::string& _val199 = this->compressorConfiguration[_key198]; xfer += iprot->readString(_val199); } xfer += iprot->readMapEnd(); } - this->__isset.configuration = true; + this->__isset.compressorConfiguration = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 5: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->compressorName); + this->__isset.compressorName = true; } else { xfer += iprot->skip(ftype); } @@ -4925,12 +4938,12 @@ uint32_t TOpenSessionResp::write(::apache::thrift::protocol::TProtocol* oprot) c xfer += this->sessionHandle.write(oprot); xfer += oprot->writeFieldEnd(); } - if (this->__isset.configuration) { - xfer += oprot->writeFieldBegin("configuration", ::apache::thrift::protocol::T_MAP, 4); + if (this->__isset.compressorConfiguration) { + xfer += oprot->writeFieldBegin("compressorConfiguration", ::apache::thrift::protocol::T_MAP, 4); { - xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRING, ::apache::thrift::protocol::T_STRING, static_cast(this->configuration.size())); + xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRING, ::apache::thrift::protocol::T_STRING, static_cast(this->compressorConfiguration.size())); std::map ::const_iterator _iter200; - for (_iter200 = this->configuration.begin(); _iter200 != this->configuration.end(); ++_iter200) + for (_iter200 = this->compressorConfiguration.begin(); _iter200 != this->compressorConfiguration.end(); ++_iter200) { xfer += oprot->writeString(_iter200->first); xfer += oprot->writeString(_iter200->second); @@ -4939,6 +4952,11 @@ uint32_t TOpenSessionResp::write(::apache::thrift::protocol::TProtocol* oprot) c } xfer += oprot->writeFieldEnd(); } + if (this->__isset.compressorName) { + xfer += oprot->writeFieldBegin("compressorName", ::apache::thrift::protocol::T_STRING, 5); + xfer += oprot->writeString(this->compressorName); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -4949,7 +4967,8 @@ void swap(TOpenSessionResp &a, TOpenSessionResp &b) { swap(a.status, b.status); swap(a.serverProtocolVersion, b.serverProtocolVersion); swap(a.sessionHandle, b.sessionHandle); - swap(a.configuration, b.configuration); + swap(a.compressorConfiguration, b.compressorConfiguration); + swap(a.compressorName, b.compressorName); swap(a.__isset, b.__isset); } @@ -4957,14 +4976,16 @@ TOpenSessionResp::TOpenSessionResp(const TOpenSessionResp& other201) { status = other201.status; serverProtocolVersion = other201.serverProtocolVersion; sessionHandle = other201.sessionHandle; - configuration = other201.configuration; + compressorConfiguration = other201.compressorConfiguration; + compressorName = other201.compressorName; __isset = other201.__isset; } TOpenSessionResp& TOpenSessionResp::operator=(const TOpenSessionResp& other202) { status = other202.status; serverProtocolVersion = other202.serverProtocolVersion; sessionHandle = other202.sessionHandle; - configuration = other202.configuration; + compressorConfiguration = other202.compressorConfiguration; + compressorName = other202.compressorName; __isset = other202.__isset; return *this; } @@ -4974,7 +4995,8 @@ void TOpenSessionResp::printTo(std::ostream& out) const { out << "status=" << to_string(status); out << ", " << "serverProtocolVersion=" << to_string(serverProtocolVersion); out << ", " << "sessionHandle="; (__isset.sessionHandle ? (out << to_string(sessionHandle)) : (out << "")); - out << ", " << "configuration="; (__isset.configuration ? (out << to_string(configuration)) : (out << "")); + out << ", " << "compressorConfiguration="; (__isset.compressorConfiguration ? (out << to_string(compressorConfiguration)) : (out << "")); + out << ", " << "compressorName="; (__isset.compressorName ? (out << to_string(compressorName)) : (out << "")); out << ")"; } diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOpenSessionResp.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOpenSessionResp.java index 0a3c121..750c6d5 100644 --- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOpenSessionResp.java +++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOpenSessionResp.java @@ -41,7 +41,8 @@ private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.STRUCT, (short)1); private static final org.apache.thrift.protocol.TField SERVER_PROTOCOL_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("serverProtocolVersion", org.apache.thrift.protocol.TType.I32, (short)2); private static final org.apache.thrift.protocol.TField SESSION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("sessionHandle", org.apache.thrift.protocol.TType.STRUCT, (short)3); - private static final org.apache.thrift.protocol.TField CONFIGURATION_FIELD_DESC = new org.apache.thrift.protocol.TField("configuration", org.apache.thrift.protocol.TType.MAP, (short)4); + private static final org.apache.thrift.protocol.TField COMPRESSOR_CONFIGURATION_FIELD_DESC = new org.apache.thrift.protocol.TField("compressorConfiguration", org.apache.thrift.protocol.TType.MAP, (short)4); + private static final org.apache.thrift.protocol.TField COMPRESSOR_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("compressorName", org.apache.thrift.protocol.TType.STRING, (short)5); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -52,7 +53,8 @@ private TStatus status; // required private TProtocolVersion serverProtocolVersion; // required private TSessionHandle sessionHandle; // optional - private Map configuration; // optional + private Map compressorConfiguration; // optional + private String compressorName; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -63,7 +65,8 @@ */ SERVER_PROTOCOL_VERSION((short)2, "serverProtocolVersion"), SESSION_HANDLE((short)3, "sessionHandle"), - CONFIGURATION((short)4, "configuration"); + COMPRESSOR_CONFIGURATION((short)4, "compressorConfiguration"), + COMPRESSOR_NAME((short)5, "compressorName"); private static final Map byName = new HashMap(); @@ -84,8 +87,10 @@ public static _Fields findByThriftId(int fieldId) { return SERVER_PROTOCOL_VERSION; case 3: // SESSION_HANDLE return SESSION_HANDLE; - case 4: // CONFIGURATION - return CONFIGURATION; + case 4: // COMPRESSOR_CONFIGURATION + return COMPRESSOR_CONFIGURATION; + case 5: // COMPRESSOR_NAME + return COMPRESSOR_NAME; default: return null; } @@ -126,7 +131,7 @@ public String getFieldName() { } // isset id assignments - private static final _Fields optionals[] = {_Fields.SESSION_HANDLE,_Fields.CONFIGURATION}; + private static final _Fields optionals[] = {_Fields.SESSION_HANDLE,_Fields.COMPRESSOR_CONFIGURATION,_Fields.COMPRESSOR_NAME}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -136,10 +141,12 @@ public String getFieldName() { new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, TProtocolVersion.class))); tmpMap.put(_Fields.SESSION_HANDLE, new org.apache.thrift.meta_data.FieldMetaData("sessionHandle", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TSessionHandle.class))); - tmpMap.put(_Fields.CONFIGURATION, new org.apache.thrift.meta_data.FieldMetaData("configuration", org.apache.thrift.TFieldRequirementType.OPTIONAL, + tmpMap.put(_Fields.COMPRESSOR_CONFIGURATION, new org.apache.thrift.meta_data.FieldMetaData("compressorConfiguration", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); + tmpMap.put(_Fields.COMPRESSOR_NAME, new org.apache.thrift.meta_data.FieldMetaData("compressorName", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TOpenSessionResp.class, metaDataMap); } @@ -171,9 +178,12 @@ public TOpenSessionResp(TOpenSessionResp other) { if (other.isSetSessionHandle()) { this.sessionHandle = new TSessionHandle(other.sessionHandle); } - if (other.isSetConfiguration()) { - Map __this__configuration = new HashMap(other.configuration); - this.configuration = __this__configuration; + if (other.isSetCompressorConfiguration()) { + Map __this__compressorConfiguration = new HashMap(other.compressorConfiguration); + this.compressorConfiguration = __this__compressorConfiguration; + } + if (other.isSetCompressorName()) { + this.compressorName = other.compressorName; } } @@ -187,7 +197,8 @@ public void clear() { this.serverProtocolVersion = org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9; this.sessionHandle = null; - this.configuration = null; + this.compressorConfiguration = null; + this.compressorName = null; } public TStatus getStatus() { @@ -267,37 +278,60 @@ public void setSessionHandleIsSet(boolean value) { } } - public int getConfigurationSize() { - return (this.configuration == null) ? 0 : this.configuration.size(); + public int getCompressorConfigurationSize() { + return (this.compressorConfiguration == null) ? 0 : this.compressorConfiguration.size(); + } + + public void putToCompressorConfiguration(String key, String val) { + if (this.compressorConfiguration == null) { + this.compressorConfiguration = new HashMap(); + } + this.compressorConfiguration.put(key, val); + } + + public Map getCompressorConfiguration() { + return this.compressorConfiguration; + } + + public void setCompressorConfiguration(Map compressorConfiguration) { + this.compressorConfiguration = compressorConfiguration; + } + + public void unsetCompressorConfiguration() { + this.compressorConfiguration = null; + } + + /** Returns true if field compressorConfiguration is set (has been assigned a value) and false otherwise */ + public boolean isSetCompressorConfiguration() { + return this.compressorConfiguration != null; } - public void putToConfiguration(String key, String val) { - if (this.configuration == null) { - this.configuration = new HashMap(); + public void setCompressorConfigurationIsSet(boolean value) { + if (!value) { + this.compressorConfiguration = null; } - this.configuration.put(key, val); } - public Map getConfiguration() { - return this.configuration; + public String getCompressorName() { + return this.compressorName; } - public void setConfiguration(Map configuration) { - this.configuration = configuration; + public void setCompressorName(String compressorName) { + this.compressorName = compressorName; } - public void unsetConfiguration() { - this.configuration = null; + public void unsetCompressorName() { + this.compressorName = null; } - /** Returns true if field configuration is set (has been assigned a value) and false otherwise */ - public boolean isSetConfiguration() { - return this.configuration != null; + /** Returns true if field compressorName is set (has been assigned a value) and false otherwise */ + public boolean isSetCompressorName() { + return this.compressorName != null; } - public void setConfigurationIsSet(boolean value) { + public void setCompressorNameIsSet(boolean value) { if (!value) { - this.configuration = null; + this.compressorName = null; } } @@ -327,11 +361,19 @@ public void setFieldValue(_Fields field, Object value) { } break; - case CONFIGURATION: + case COMPRESSOR_CONFIGURATION: + if (value == null) { + unsetCompressorConfiguration(); + } else { + setCompressorConfiguration((Map)value); + } + break; + + case COMPRESSOR_NAME: if (value == null) { - unsetConfiguration(); + unsetCompressorName(); } else { - setConfiguration((Map)value); + setCompressorName((String)value); } break; @@ -349,8 +391,11 @@ public Object getFieldValue(_Fields field) { case SESSION_HANDLE: return getSessionHandle(); - case CONFIGURATION: - return getConfiguration(); + case COMPRESSOR_CONFIGURATION: + return getCompressorConfiguration(); + + case COMPRESSOR_NAME: + return getCompressorName(); } throw new IllegalStateException(); @@ -369,8 +414,10 @@ public boolean isSet(_Fields field) { return isSetServerProtocolVersion(); case SESSION_HANDLE: return isSetSessionHandle(); - case CONFIGURATION: - return isSetConfiguration(); + case COMPRESSOR_CONFIGURATION: + return isSetCompressorConfiguration(); + case COMPRESSOR_NAME: + return isSetCompressorName(); } throw new IllegalStateException(); } @@ -415,12 +462,21 @@ public boolean equals(TOpenSessionResp that) { return false; } - boolean this_present_configuration = true && this.isSetConfiguration(); - boolean that_present_configuration = true && that.isSetConfiguration(); - if (this_present_configuration || that_present_configuration) { - if (!(this_present_configuration && that_present_configuration)) + boolean this_present_compressorConfiguration = true && this.isSetCompressorConfiguration(); + boolean that_present_compressorConfiguration = true && that.isSetCompressorConfiguration(); + if (this_present_compressorConfiguration || that_present_compressorConfiguration) { + if (!(this_present_compressorConfiguration && that_present_compressorConfiguration)) + return false; + if (!this.compressorConfiguration.equals(that.compressorConfiguration)) + return false; + } + + boolean this_present_compressorName = true && this.isSetCompressorName(); + boolean that_present_compressorName = true && that.isSetCompressorName(); + if (this_present_compressorName || that_present_compressorName) { + if (!(this_present_compressorName && that_present_compressorName)) return false; - if (!this.configuration.equals(that.configuration)) + if (!this.compressorName.equals(that.compressorName)) return false; } @@ -446,10 +502,15 @@ public int hashCode() { if (present_sessionHandle) list.add(sessionHandle); - boolean present_configuration = true && (isSetConfiguration()); - list.add(present_configuration); - if (present_configuration) - list.add(configuration); + boolean present_compressorConfiguration = true && (isSetCompressorConfiguration()); + list.add(present_compressorConfiguration); + if (present_compressorConfiguration) + list.add(compressorConfiguration); + + boolean present_compressorName = true && (isSetCompressorName()); + list.add(present_compressorName); + if (present_compressorName) + list.add(compressorName); return list.hashCode(); } @@ -492,12 +553,22 @@ public int compareTo(TOpenSessionResp other) { return lastComparison; } } - lastComparison = Boolean.valueOf(isSetConfiguration()).compareTo(other.isSetConfiguration()); + lastComparison = Boolean.valueOf(isSetCompressorConfiguration()).compareTo(other.isSetCompressorConfiguration()); if (lastComparison != 0) { return lastComparison; } - if (isSetConfiguration()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.configuration, other.configuration); + if (isSetCompressorConfiguration()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.compressorConfiguration, other.compressorConfiguration); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetCompressorName()).compareTo(other.isSetCompressorName()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetCompressorName()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.compressorName, other.compressorName); if (lastComparison != 0) { return lastComparison; } @@ -547,13 +618,23 @@ public String toString() { } first = false; } - if (isSetConfiguration()) { + if (isSetCompressorConfiguration()) { if (!first) sb.append(", "); - sb.append("configuration:"); - if (this.configuration == null) { + sb.append("compressorConfiguration:"); + if (this.compressorConfiguration == null) { sb.append("null"); } else { - sb.append(this.configuration); + sb.append(this.compressorConfiguration); + } + first = false; + } + if (isSetCompressorName()) { + if (!first) sb.append(", "); + sb.append("compressorName:"); + if (this.compressorName == null) { + sb.append("null"); + } else { + sb.append(this.compressorName); } first = false; } @@ -640,22 +721,30 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TOpenSessionResp st org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 4: // CONFIGURATION + case 4: // COMPRESSOR_CONFIGURATION if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { org.apache.thrift.protocol.TMap _map152 = iprot.readMapBegin(); - struct.configuration = new HashMap(2*_map152.size); + struct.compressorConfiguration = new HashMap(2*_map152.size); String _key153; String _val154; for (int _i155 = 0; _i155 < _map152.size; ++_i155) { _key153 = iprot.readString(); _val154 = iprot.readString(); - struct.configuration.put(_key153, _val154); + struct.compressorConfiguration.put(_key153, _val154); } iprot.readMapEnd(); } - struct.setConfigurationIsSet(true); + struct.setCompressorConfigurationIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 5: // COMPRESSOR_NAME + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.compressorName = iprot.readString(); + struct.setCompressorNameIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -690,12 +779,12 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TOpenSessionResp s oprot.writeFieldEnd(); } } - if (struct.configuration != null) { - if (struct.isSetConfiguration()) { - oprot.writeFieldBegin(CONFIGURATION_FIELD_DESC); + if (struct.compressorConfiguration != null) { + if (struct.isSetCompressorConfiguration()) { + oprot.writeFieldBegin(COMPRESSOR_CONFIGURATION_FIELD_DESC); { - oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.configuration.size())); - for (Map.Entry _iter156 : struct.configuration.entrySet()) + oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.compressorConfiguration.size())); + for (Map.Entry _iter156 : struct.compressorConfiguration.entrySet()) { oprot.writeString(_iter156.getKey()); oprot.writeString(_iter156.getValue()); @@ -705,6 +794,13 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TOpenSessionResp s oprot.writeFieldEnd(); } } + if (struct.compressorName != null) { + if (struct.isSetCompressorName()) { + oprot.writeFieldBegin(COMPRESSOR_NAME_FIELD_DESC); + oprot.writeString(struct.compressorName); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -728,23 +824,29 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TOpenSessionResp st if (struct.isSetSessionHandle()) { optionals.set(0); } - if (struct.isSetConfiguration()) { + if (struct.isSetCompressorConfiguration()) { optionals.set(1); } - oprot.writeBitSet(optionals, 2); + if (struct.isSetCompressorName()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); if (struct.isSetSessionHandle()) { struct.sessionHandle.write(oprot); } - if (struct.isSetConfiguration()) { + if (struct.isSetCompressorConfiguration()) { { - oprot.writeI32(struct.configuration.size()); - for (Map.Entry _iter157 : struct.configuration.entrySet()) + oprot.writeI32(struct.compressorConfiguration.size()); + for (Map.Entry _iter157 : struct.compressorConfiguration.entrySet()) { oprot.writeString(_iter157.getKey()); oprot.writeString(_iter157.getValue()); } } } + if (struct.isSetCompressorName()) { + oprot.writeString(struct.compressorName); + } } @Override @@ -755,7 +857,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TOpenSessionResp str struct.setStatusIsSet(true); struct.serverProtocolVersion = org.apache.hive.service.rpc.thrift.TProtocolVersion.findByValue(iprot.readI32()); struct.setServerProtocolVersionIsSet(true); - BitSet incoming = iprot.readBitSet(2); + BitSet incoming = iprot.readBitSet(3); if (incoming.get(0)) { struct.sessionHandle = new TSessionHandle(); struct.sessionHandle.read(iprot); @@ -764,17 +866,21 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TOpenSessionResp str if (incoming.get(1)) { { org.apache.thrift.protocol.TMap _map158 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); - struct.configuration = new HashMap(2*_map158.size); + struct.compressorConfiguration = new HashMap(2*_map158.size); String _key159; String _val160; for (int _i161 = 0; _i161 < _map158.size; ++_i161) { _key159 = iprot.readString(); _val160 = iprot.readString(); - struct.configuration.put(_key159, _val160); + struct.compressorConfiguration.put(_key159, _val160); } } - struct.setConfigurationIsSet(true); + struct.setCompressorConfigurationIsSet(true); + } + if (incoming.get(2)) { + struct.compressorName = iprot.readString(); + struct.setCompressorNameIsSet(true); } } } diff --git a/service-rpc/src/gen/thrift/gen-php/Types.php b/service-rpc/src/gen/thrift/gen-php/Types.php index 786c773..927bf67 100644 --- a/service-rpc/src/gen/thrift/gen-php/Types.php +++ b/service-rpc/src/gen/thrift/gen-php/Types.php @@ -4700,7 +4700,11 @@ class TOpenSessionResp { /** * @var array */ - public $configuration = null; + public $compressorConfiguration = null; + /** + * @var string + */ + public $compressorName = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -4720,7 +4724,7 @@ class TOpenSessionResp { 'class' => '\TSessionHandle', ), 4 => array( - 'var' => 'configuration', + 'var' => 'compressorConfiguration', 'type' => TType::MAP, 'ktype' => TType::STRING, 'vtype' => TType::STRING, @@ -4731,6 +4735,10 @@ class TOpenSessionResp { 'type' => TType::STRING, ), ), + 5 => array( + 'var' => 'compressorName', + 'type' => TType::STRING, + ), ); } if (is_array($vals)) { @@ -4743,8 +4751,11 @@ class TOpenSessionResp { if (isset($vals['sessionHandle'])) { $this->sessionHandle = $vals['sessionHandle']; } - if (isset($vals['configuration'])) { - $this->configuration = $vals['configuration']; + if (isset($vals['compressorConfiguration'])) { + $this->compressorConfiguration = $vals['compressorConfiguration']; + } + if (isset($vals['compressorName'])) { + $this->compressorName = $vals['compressorName']; } } } @@ -4793,7 +4804,7 @@ class TOpenSessionResp { break; case 4: if ($ftype == TType::MAP) { - $this->configuration = array(); + $this->compressorConfiguration = array(); $_size134 = 0; $_ktype135 = 0; $_vtype136 = 0; @@ -4804,13 +4815,20 @@ class TOpenSessionResp { $val140 = ''; $xfer += $input->readString($key139); $xfer += $input->readString($val140); - $this->configuration[$key139] = $val140; + $this->compressorConfiguration[$key139] = $val140; } $xfer += $input->readMapEnd(); } else { $xfer += $input->skip($ftype); } break; + case 5: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->compressorName); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -4845,15 +4863,15 @@ class TOpenSessionResp { $xfer += $this->sessionHandle->write($output); $xfer += $output->writeFieldEnd(); } - if ($this->configuration !== null) { - if (!is_array($this->configuration)) { + if ($this->compressorConfiguration !== null) { + if (!is_array($this->compressorConfiguration)) { throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA); } - $xfer += $output->writeFieldBegin('configuration', TType::MAP, 4); + $xfer += $output->writeFieldBegin('compressorConfiguration', TType::MAP, 4); { - $output->writeMapBegin(TType::STRING, TType::STRING, count($this->configuration)); + $output->writeMapBegin(TType::STRING, TType::STRING, count($this->compressorConfiguration)); { - foreach ($this->configuration as $kiter141 => $viter142) + foreach ($this->compressorConfiguration as $kiter141 => $viter142) { $xfer += $output->writeString($kiter141); $xfer += $output->writeString($viter142); @@ -4863,6 +4881,11 @@ class TOpenSessionResp { } $xfer += $output->writeFieldEnd(); } + if ($this->compressorName !== null) { + $xfer += $output->writeFieldBegin('compressorName', TType::STRING, 5); + $xfer += $output->writeString($this->compressorName); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py index fdf6b1f..1e53fcb 100644 --- a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py +++ b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py @@ -3614,7 +3614,8 @@ class TOpenSessionResp: - status - serverProtocolVersion - sessionHandle - - configuration + - compressorConfiguration + - compressorName """ thrift_spec = ( @@ -3622,14 +3623,16 @@ class TOpenSessionResp: (1, TType.STRUCT, 'status', (TStatus, TStatus.thrift_spec), None, ), # 1 (2, TType.I32, 'serverProtocolVersion', None, 8, ), # 2 (3, TType.STRUCT, 'sessionHandle', (TSessionHandle, TSessionHandle.thrift_spec), None, ), # 3 - (4, TType.MAP, 'configuration', (TType.STRING,None,TType.STRING,None), None, ), # 4 + (4, TType.MAP, 'compressorConfiguration', (TType.STRING,None,TType.STRING,None), None, ), # 4 + (5, TType.STRING, 'compressorName', None, None, ), # 5 ) - def __init__(self, status=None, serverProtocolVersion=thrift_spec[2][4], sessionHandle=None, configuration=None,): + def __init__(self, status=None, serverProtocolVersion=thrift_spec[2][4], sessionHandle=None, compressorConfiguration=None, compressorName=None,): self.status = status self.serverProtocolVersion = serverProtocolVersion self.sessionHandle = sessionHandle - self.configuration = configuration + self.compressorConfiguration = compressorConfiguration + self.compressorName = compressorName def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -3659,15 +3662,20 @@ def read(self, iprot): iprot.skip(ftype) elif fid == 4: if ftype == TType.MAP: - self.configuration = {} + self.compressorConfiguration = {} (_ktype135, _vtype136, _size134 ) = iprot.readMapBegin() for _i138 in xrange(_size134): _key139 = iprot.readString() _val140 = iprot.readString() - self.configuration[_key139] = _val140 + self.compressorConfiguration[_key139] = _val140 iprot.readMapEnd() else: iprot.skip(ftype) + elif fid == 5: + if ftype == TType.STRING: + self.compressorName = iprot.readString() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -3690,14 +3698,18 @@ def write(self, oprot): oprot.writeFieldBegin('sessionHandle', TType.STRUCT, 3) self.sessionHandle.write(oprot) oprot.writeFieldEnd() - if self.configuration is not None: - oprot.writeFieldBegin('configuration', TType.MAP, 4) - oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.configuration)) - for kiter141,viter142 in self.configuration.items(): + if self.compressorConfiguration is not None: + oprot.writeFieldBegin('compressorConfiguration', TType.MAP, 4) + oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.compressorConfiguration)) + for kiter141,viter142 in self.compressorConfiguration.items(): oprot.writeString(kiter141) oprot.writeString(viter142) oprot.writeMapEnd() oprot.writeFieldEnd() + if self.compressorName is not None: + oprot.writeFieldBegin('compressorName', TType.STRING, 5) + oprot.writeString(self.compressorName) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -3714,7 +3726,8 @@ def __hash__(self): value = (value * 31) ^ hash(self.status) value = (value * 31) ^ hash(self.serverProtocolVersion) value = (value * 31) ^ hash(self.sessionHandle) - value = (value * 31) ^ hash(self.configuration) + value = (value * 31) ^ hash(self.compressorConfiguration) + value = (value * 31) ^ hash(self.compressorName) return value def __repr__(self): diff --git a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb index 4b1854c..35e60bd 100644 --- a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb +++ b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb @@ -979,13 +979,15 @@ class TOpenSessionResp STATUS = 1 SERVERPROTOCOLVERSION = 2 SESSIONHANDLE = 3 - CONFIGURATION = 4 + COMPRESSORCONFIGURATION = 4 + COMPRESSORNAME = 5 FIELDS = { STATUS => {:type => ::Thrift::Types::STRUCT, :name => 'status', :class => ::TStatus}, SERVERPROTOCOLVERSION => {:type => ::Thrift::Types::I32, :name => 'serverProtocolVersion', :default => 8, :enum_class => ::TProtocolVersion}, SESSIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'sessionHandle', :class => ::TSessionHandle, :optional => true}, - CONFIGURATION => {:type => ::Thrift::Types::MAP, :name => 'configuration', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true} + COMPRESSORCONFIGURATION => {:type => ::Thrift::Types::MAP, :name => 'compressorConfiguration', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true}, + COMPRESSORNAME => {:type => ::Thrift::Types::STRING, :name => 'compressorName', :optional => true} } def struct_fields; FIELDS; end diff --git a/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java b/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java index 9cbe89c..9f30916 100644 --- a/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java +++ b/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java @@ -20,11 +20,13 @@ import java.io.ByteArrayInputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer; +import org.apache.hadoop.hive.serde2.compression.CompDe; import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hive.service.rpc.thrift.TColumn; import org.apache.hive.service.rpc.thrift.TRow; @@ -58,27 +60,38 @@ public ColumnBasedSet(TableSchema schema) { } } - public ColumnBasedSet(TRowSet tRowSet) throws TException { + public ColumnBasedSet(TRowSet tRowSet, CompDe compDe) throws TException { descriptors = null; - columns = new ArrayList(); - // Use TCompactProtocol to read serialized TColumns if (tRowSet.isSetBinaryColumns()) { - TProtocol protocol = - new TCompactProtocol(new TIOStreamTransport(new ByteArrayInputStream( - tRowSet.getBinaryColumns()))); - // Read from the stream using the protocol for each column in final schema - for (int i = 0; i < tRowSet.getColumnCount(); i++) { - TColumn tvalue = new TColumn(); - try { - tvalue.read(protocol); - } catch (TException e) { - LOG.error(e.getMessage(), e); - throw new TException("Error reading column value from the row set blob", e); + // Use TCompactProtocol to read serialized TColumns + + if (compDe != null) { + TProtocol protocol = + new TCompactProtocol(new TIOStreamTransport(new ByteArrayInputStream( + tRowSet.getBinaryColumns()))); + byte[] compressedBytes = protocol.readBinary().array(); + columns = Arrays.asList(compDe.decompress(compressedBytes, 0, compressedBytes.length)); + } + else { + columns = new ArrayList(); + TProtocol protocol = + new TCompactProtocol(new TIOStreamTransport(new ByteArrayInputStream( + tRowSet.getBinaryColumns()))); + // Read from the stream using the protocol for each column in final schema + for (int i = 0; i < tRowSet.getColumnCount(); i++) { + TColumn tvalue = new TColumn(); + try { + tvalue.read(protocol); + } catch (TException e) { + LOG.error(e.getMessage(), e); + throw new TException("Error reading column value from the row set blob", e); + } + columns.add(new ColumnBuffer(tvalue)); } - columns.add(new ColumnBuffer(tvalue)); } } else { + columns = new ArrayList(); if (tRowSet.getColumns() != null) { for (TColumn tvalue : tRowSet.getColumns()) { columns.add(new ColumnBuffer(tvalue)); diff --git a/service/src/java/org/apache/hive/service/cli/RowSetFactory.java b/service/src/java/org/apache/hive/service/cli/RowSetFactory.java index d9be6a0..ef19c17 100644 --- a/service/src/java/org/apache/hive/service/cli/RowSetFactory.java +++ b/service/src/java/org/apache/hive/service/cli/RowSetFactory.java @@ -24,6 +24,8 @@ import static org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6; +import org.apache.hadoop.hive.serde2.compression.CompDe; + public class RowSetFactory { // This call is accessed from server side @@ -35,10 +37,10 @@ public static RowSet create(TableSchema schema, TProtocolVersion version, boolea } // This call is accessed from client (jdbc) side - public static RowSet create(TRowSet results, TProtocolVersion version) throws TException { - if (version.getValue() >= HIVE_CLI_SERVICE_PROTOCOL_V6.getValue()) { - return new ColumnBasedSet(results); - } - return new RowBasedSet(results); + public static RowSet create(TRowSet results, TProtocolVersion version, CompDe compDe) throws TException { + if (version.getValue() >= HIVE_CLI_SERVICE_PROTOCOL_V6.getValue()) { + return new ColumnBasedSet(results, compDe); + } + return new RowBasedSet(results); } } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 2938338..eeefa56 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -21,8 +21,11 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -30,6 +33,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.serde2.compression.CompDe; +import org.apache.hadoop.hive.serde2.compression.CompDeServiceLoader; import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; import org.apache.hadoop.hive.shims.ShimLoader; @@ -309,10 +314,50 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { LOG.info("Client protocol version: " + req.getClient_protocol()); TOpenSessionResp resp = new TOpenSessionResp(); try { + if (req.isSetConfiguration()) { + String[] serverCompDes = + HiveConf.getTrimmedStringsVar(hiveConf, ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST); + + List clientCompDes = new ArrayList(); + if (req.getConfiguration().containsKey("set:hiveconf:" + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST.varname) + && !req.getConfiguration().get("set:hiveconf:" + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST.varname).isEmpty()) { + HiveConf tempConf = new HiveConf(); + tempConf.setVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST, req.getConfiguration().get("set:hiveconf:" + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST.varname)); + clientCompDes = + Arrays.asList(HiveConf.getTrimmedStringsVar(tempConf, ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST)); + } + + // CompDe negotiation + req.getConfiguration().remove("set:hiveconf:" + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR.varname); + for (String compDeName : serverCompDes) { + if (clientCompDes.contains(compDeName)) { + // Client configuration overrides server defaults + Map compDeConfig = + hiveConf.getValByRegex(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR + "\\." + compDeName + "\\.[\\w|\\d]+"); + for (Entry entry : compDeConfig.entrySet()) { + if (req.getConfiguration().containsKey("set:hiveconf:" + entry.getKey())) { + compDeConfig.put(entry.getKey(), req.getConfiguration().get("set:hiveconf:" + entry.getKey())); + } + } + + Map compDeResponse = initCompDe(compDeName, compDeConfig); + + if (compDeResponse != null) { + LOG.info("Initialized CompDe plugin for " + compDeName); + resp.setCompressorConfiguration(compDeResponse); + resp.setCompressorName(compDeName); + // SessionState is initialized based on TOpenSessionRequest + req.getConfiguration().put("set:hiveconf:" + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR.varname, compDeName); + req.getConfiguration().putAll(compDeResponse); + break; + } + } + } + } + SessionHandle sessionHandle = getSessionHandle(req, resp); resp.setSessionHandle(sessionHandle.toTSessionHandle()); - // TODO: set real configuration map - resp.setConfiguration(new HashMap()); + resp.setStatus(OK_STATUS); ThriftCLIServerContext context = (ThriftCLIServerContext)currentServerContext.get(); @@ -326,6 +371,21 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { return resp; } + protected Map initCompDe(String compDeName, Map compDeConfig) { + if (CompDeServiceLoader.getInstance().hasCompDe(compDeName)) { + CompDe compDe = CompDeServiceLoader.getInstance().getCompDe(compDeName); + if (compDe.init(compDeConfig)) { + return compDe.getConfig(); + } + else { + return null; + } + } + else { + return null; + } + } + private String getIpAddress() { String clientIpAddress; // Http transport mode. diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java index 3c48dbb..99cf1fc 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java @@ -440,7 +440,7 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio req.setFetchType(fetchType.toTFetchType()); TFetchResultsResp resp = cliService.FetchResults(req); checkStatus(resp.getStatus()); - return RowSetFactory.create(resp.getResults(), opHandle.getProtocolVersion()); + return RowSetFactory.create(resp.getResults(), opHandle.getProtocolVersion(), null); } catch (HiveSQLException e) { throw e; } catch (Exception e) { diff --git a/service/src/test/org/apache/hive/service/cli/compression/TestCompDeNegotiation.java b/service/src/test/org/apache/hive/service/cli/compression/TestCompDeNegotiation.java new file mode 100644 index 0000000..889a1f8 --- /dev/null +++ b/service/src/test/org/apache/hive/service/cli/compression/TestCompDeNegotiation.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.compression; + +import static org.junit.Assert.*; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hive.service.auth.HiveAuthFactory; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.thrift.EmbeddedThriftBinaryCLIService; +import org.apache.hive.service.cli.thrift.ThriftCLIService; +import org.apache.hive.service.rpc.thrift.TOpenSessionReq; +import org.apache.hive.service.rpc.thrift.TOpenSessionResp; +import org.apache.thrift.TException; +import org.junit.Before; +import org.junit.Test; + +public class TestCompDeNegotiation { + private HiveConf noCompDes; + private HiveConf serverSingleCompDe; + private HiveConf serverMultiCompDes1; + private HiveConf serverMultiCompDes2; + private HiveConf clientSingleCompDe; + private HiveConf clientMultiCompDes1; + private HiveConf clientMultiCompDes2; + private HiveConf serverCompDeConf; + private HiveConf clientCompDeConf; + + @Before + public void init() throws Exception { + HiveConf baseConf = new HiveConf(); + baseConf.setVar(ConfVars.HIVE_SERVER2_AUTHENTICATION, HiveAuthFactory.AuthTypes.NONE.toString()); + baseConf.setVar(ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider"); + baseConf.setBoolean("datanucleus.schema.autoCreateTables", true); + + noCompDes = new HiveConf(baseConf); + + clientSingleCompDe = new HiveConf(baseConf); + clientSingleCompDe.set(clientCompressorListVarName(), "compde3"); + serverSingleCompDe = new HiveConf(baseConf); + serverSingleCompDe.setVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST, "compde3"); + + clientMultiCompDes1 = new HiveConf(baseConf); + clientMultiCompDes1.set(clientCompressorListVarName(), "compde1,compde2,compde3,compde4"); + serverMultiCompDes1 = new HiveConf(baseConf); + serverMultiCompDes1.setVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST, "compde1,compde2,compde3,compde4"); + + clientMultiCompDes2 = new HiveConf(baseConf); + clientMultiCompDes2.set(clientCompressorListVarName(), "compde2, compde4"); + serverMultiCompDes2 = new HiveConf(baseConf); + serverMultiCompDes2.setVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST, "compde2, compde4"); + + serverCompDeConf = new HiveConf(baseConf); + serverCompDeConf.setVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST, "compde3"); + serverCompDeConf.set(noCompDeConfigPrefix("compde3") + ".test1", "serverVal1"); + serverCompDeConf.set(noCompDeConfigPrefix("compde3") + ".test2", "serverVal2");//overriden by client + serverCompDeConf.set(noCompDeConfigPrefix("compde3") + ".test4", "serverVal4");//overriden by plug-in + + clientCompDeConf = new HiveConf(baseConf); + clientCompDeConf.set(clientCompressorListVarName(), "compde3"); + clientCompDeConf.set(clientCompDeConfigPrefix("compde3") + ".test2", "clientVal2");//overrides server + clientCompDeConf.set(clientCompDeConfigPrefix("compde3") + ".test3", "clientVal3"); + clientCompDeConf.set(clientCompDeConfigPrefix("compde3") + ".test5", "clientVal5");//overriden by plug-in + } + + private String noCompDeConfigPrefix(String compDeName) { + return ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR.varname + "." + compDeName; + } + // The JDBC driver prefixes all configuration names before sending the request and the server expects these prefixes + private String clientCompressorListVarName() { + return "set:hiveconf:" + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST.varname; + } + private String clientCompDeConfigPrefix(String compDeName) { + return "set:hiveconf:" + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR.varname + "." + compDeName; + } + + public class MockServiceWithoutCompDes extends EmbeddedThriftBinaryCLIService { + @Override + // Pretend that we have no CompDe plug-ins + protected Map initCompDe(String compDeName, Map compDeConfig) { + return null; + } + } + + @Test + // The server has no CompDe plug-ins + public void testServerWithoutCompDePlugins() throws HiveSQLException, InterruptedException, TException { + ThriftCLIService service = new MockServiceWithoutCompDes(); + service.init(noCompDes); + + TOpenSessionReq req = new TOpenSessionReq(); + req.setConfiguration(new HashMap()); + TOpenSessionResp resp; + + req.setConfiguration(noCompDes.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + req.setConfiguration(clientSingleCompDe.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + req.setConfiguration(clientMultiCompDes2.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + service.stop(); + } + + public class MockServiceWithCompDes extends EmbeddedThriftBinaryCLIService { + @Override + // Pretend that we have plug-ins for all CompDes except "compde1" + protected Map initCompDe(String compDeName, Map compDeConfig) { + if (compDeName.equals("compde1")) { + return null; + } + else { + return compDeConfig; + } + } + } + + @Test + // The server has plug-ins but the CompDe list is not configured + public void testServerWithoutCompDeInList() throws HiveSQLException, InterruptedException, TException { + ThriftCLIService service = new MockServiceWithCompDes(); + service.init(noCompDes); + + TOpenSessionReq req = new TOpenSessionReq(); + req.setConfiguration(new HashMap()); + TOpenSessionResp resp; + + req.setConfiguration(noCompDes.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + req.setConfiguration(clientSingleCompDe.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + req.setConfiguration(clientMultiCompDes2.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + service.stop(); + } + + @Test + public void testServerWithSingleCompDeInList() throws HiveSQLException, InterruptedException, TException { + ThriftCLIService service = new MockServiceWithCompDes(); + service.init(serverSingleCompDe); + + TOpenSessionReq req = new TOpenSessionReq(); + req.setConfiguration(new HashMap()); + TOpenSessionResp resp; + + req.setConfiguration(noCompDes.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + req.setConfiguration(clientSingleCompDe.getValByRegex(".*")); + resp = service.OpenSession(req); + assertEquals("compde3", resp.getCompressorName()); + + req.setConfiguration(clientMultiCompDes2.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + service.stop(); + } + + @Test + public void testServerWithMultiCompDesInList() throws HiveSQLException, InterruptedException, TException { + ThriftCLIService service = new MockServiceWithCompDes(); + service.init(serverMultiCompDes1); + + TOpenSessionReq req = new TOpenSessionReq(); + req.setConfiguration(new HashMap()); + TOpenSessionResp resp; + + req.setConfiguration(noCompDes.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + req.setConfiguration(clientSingleCompDe.getValByRegex(".*")); + resp = service.OpenSession(req); + assertEquals("compde3", resp.getCompressorName()); + + req.setConfiguration(clientMultiCompDes1.getValByRegex(".*")); + resp = service.OpenSession(req); + // "compde1" fails to initialize because our mock service does not have that plugin + assertEquals("compde2", resp.getCompressorName()); + + req.setConfiguration(clientMultiCompDes2.getValByRegex(".*")); + resp = service.OpenSession(req); + assertEquals("compde2", resp.getCompressorName()); + + service.stop(); + } + + public class MockWithCompDeConfig extends EmbeddedThriftBinaryCLIService { + @Override + // Mock a plug-in with an `init` function. + protected Map initCompDe(String compDeName, Map compDeConfig) { + compDeConfig.put(noCompDeConfigPrefix("compde3") + ".test4", "compDeVal4");//overrides server + compDeConfig.put(noCompDeConfigPrefix("compde3") + ".test5", "compDeVal5");//overrides client + compDeConfig.put(noCompDeConfigPrefix("compde3") + ".test6", "compDeVal6"); + return compDeConfig; + } + } + + @Test + // Ensure that the server combines the server default CompDe configuration with the client overrides and lets the plug-in `init` function create the final configuration. + public void testConfig() throws TException { + Map expectedConf = new HashMap(); + expectedConf.put(noCompDeConfigPrefix("compde3") + ".test1", "serverVal1"); + expectedConf.put(noCompDeConfigPrefix("compde3") + ".test2", "clientVal2"); + //expectedConf.put(noCompDeConfigPrefix("compde3") + ".test3", "clientVal3"); //TODO: fix this bug after modifying Thrift message structure to allow for cleaner negotiation code on server + expectedConf.put(noCompDeConfigPrefix("compde3") + ".test4", "compDeVal4"); + expectedConf.put(noCompDeConfigPrefix("compde3") + ".test5", "compDeVal5"); + expectedConf.put(noCompDeConfigPrefix("compde3") + ".test6", "compDeVal6"); + + ThriftCLIService service = new MockWithCompDeConfig(); + service.init(serverCompDeConf); + + TOpenSessionReq req = new TOpenSessionReq(); + req.setConfiguration(clientCompDeConf.getValByRegex(".*")); + + TOpenSessionResp resp = service.OpenSession(req); + assertEquals("compde3", resp.getCompressorName()); + assertEquals(expectedConf, resp.getCompressorConfiguration()); + } +} -- 1.8.3.4