From 1185163ba1bf4690e11757237a30799a3fabbade Mon Sep 17 00:00:00 2001 From: kliewkliew Date: Sun, 8 Jan 2017 07:17:12 -0800 Subject: [PATCH] HIVE-13680: HiveServer2: Provide a way to compress ResultSets --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 12 + .../java/org/apache/hive/jdbc/HiveConnection.java | 24 + .../org/apache/hive/jdbc/HiveQueryResultSet.java | 5 +- .../java/org/apache/hive/jdbc/HiveStatement.java | 4 +- pom.xml | 2 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 12 + .../hadoop/hive/ql/processors/SetProcessor.java | 4 +- .../hadoop/hive/serde2/compression/CompDe.java | 105 ++++ .../serde2/compression/CompDeServiceLoader.java | 87 ++++ .../hive/serde2/thrift/ThriftJDBCBinarySerDe.java | 90 +++- .../hive/serde2/compression/SnappyCompDe.java | 562 +++++++++++++++++++++ .../hive/serde2/compression/TestSnappyCompDe.java | 249 +++++++++ service-rpc/if/TCLIService.thrift | 10 +- .../src/gen/thrift/gen-cpp/TCLIService_types.cpp | 72 ++- .../src/gen/thrift/gen-cpp/TCLIService_types.h | 30 +- .../hive/service/rpc/thrift/TOpenSessionResp.java | 350 ++++++++++--- service-rpc/src/gen/thrift/gen-php/Types.php | 68 ++- .../src/gen/thrift/gen-py/TCLIService/ttypes.py | 48 +- .../src/gen/thrift/gen-rb/t_c_l_i_service_types.rb | 8 +- service/pom.xml | 5 + .../apache/hive/service/cli/ColumnBasedSet.java | 44 +- .../org/apache/hive/service/cli/RowSetFactory.java | 13 +- .../hive/service/cli/thrift/ThriftCLIService.java | 157 +++++- .../service/cli/thrift/ThriftCLIServiceClient.java | 2 +- .../cli/compression/TestCompDeNegotiation.java | 281 +++++++++++ 25 files changed, 2072 insertions(+), 172 deletions(-) create mode 100644 serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDe.java create mode 100644 serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDeServiceLoader.java create mode 100644 serde/src/test/org/apache/hadoop/hive/serde2/compression/SnappyCompDe.java create mode 100644 serde/src/test/org/apache/hadoop/hive/serde2/compression/TestSnappyCompDe.java create mode 100644 service/src/test/org/apache/hive/service/cli/compression/TestCompDeNegotiation.java diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 96928db..87fa141 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2623,6 +2623,18 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_SERVER2_RESULTSET_DEFAULT_FETCH_SIZE("hive.server2.resultset.default.fetch.size", 10000, "The number of rows sent in one Fetch RPC call by the server to the client, if not\n" + "specified by the client."), + + // ResultSet compression settings + HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST("hive.server2.thrift.resultset.compressor.list", "", + "A list of compressors ordered by the server's preference.\n " + + "This list will be used to negotiate a CompDe for each session."), + HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR("hive.server2.thrift.resultset.compressor", "", + "A CompDe that will be used for the session.\n " + + "This is determined by negotiation during OpenSession."), + HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_VERSION(HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR + ".version", "", + "The version of the CompDe that will be used for the session.\n " + + "This is determined by negotiation during OpenSession."), + HIVE_SERVER2_XSRF_FILTER_ENABLED("hive.server2.xsrf.filter.enabled",false, "If enabled, HiveServer2 will block any requests made to it over http " + "if an X-XSRF-HEADER header is not present"), diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index d6cf744..9af206d 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -20,6 +20,9 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.common.auth.HiveAuthUtils; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.serde2.compression.CompDe; +import org.apache.hadoop.hive.serde2.compression.CompDeServiceLoader; import org.apache.hive.jdbc.Utils.JdbcConnectionParams; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.KerberosSaslHelper; @@ -130,6 +133,7 @@ private int loginTimeout = 0; private TProtocolVersion protocol; private int fetchSize = HiveStatement.DEFAULT_FETCH_SIZE; + private CompDe sessCompde; private String initFile = null; public HiveConnection(String uri, Properties info) throws SQLException { @@ -655,6 +659,22 @@ private void openSession() throws SQLException { try { TOpenSessionResp openResp = client.OpenSession(openReq); + // Server initialized CompDe + if (openResp.isSetCompressorName()) + { + try { + CompDe testCompde = CompDeServiceLoader.getInstance() + .getCompde(openResp.getCompressorName(), openResp.getCompressorVersion()); + testCompde.init(openResp.getCompressorParameters()); + sessCompde = testCompde; + } + catch (Exception e) { + openReq.getConfiguration().remove( + "set:hiveconf:" + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST.varname); + openResp = client.OpenSession(openReq); + } + } + // validate connection Utils.verifySuccess(openResp.getStatus()); if (!supportedProtocols.contains(openResp.getServerProtocolVersion())) { @@ -997,6 +1017,10 @@ public String getClientInfo(String name) throws SQLException { throw new SQLException("Method not supported"); } + public CompDe getCompde() { + return sessCompde; + } + /* * (non-Javadoc) * diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java index 6a91381..b2af14e 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveQueryResultSet.java @@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.compression.CompDe; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.cli.TableSchema; @@ -78,6 +79,7 @@ private boolean fetchFirst = false; private final TProtocolVersion protocol; + private final CompDe compde; public static class Builder { @@ -193,6 +195,7 @@ protected HiveQueryResultSet(Builder builder) throws SQLException { normalizedColumnNames = new ArrayList(); columnTypes = new ArrayList(); columnAttributes = new ArrayList(); + compde = ((HiveConnection) builder.connection).getCompde(); if (builder.retrieveSchema) { retrieveSchema(); } else { @@ -373,7 +376,7 @@ public boolean next() throws SQLException { Utils.verifySuccessWithInfo(fetchResp.getStatus()); TRowSet results = fetchResp.getResults(); - fetchedRows = RowSetFactory.create(results, protocol); + fetchedRows = RowSetFactory.create(results, protocol, compde); fetchedRowsItr = fetchedRows.iterator(); } diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index a242501..b011ed5 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -916,11 +916,11 @@ public boolean hasMoreLogs() { try { RowSet rowSet; - rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), connection.getProtocol()); + rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), connection.getProtocol(), connection.getCompde()); for (Object[] row : rowSet) { logs.add(String.valueOf(row[0])); } - } catch (TException e) { + } catch (Exception e) { throw new SQLException("Error building result set for query log: " + e, e); } diff --git a/pom.xml b/pom.xml index b499cd2..a58d25f 100644 --- a/pom.xml +++ b/pom.xml @@ -130,7 +130,7 @@ 3.0.1 2.4 2.6 - 3.1 + 3.2 1.5.4 1.4 10.10.2.0 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index f275f6a..4ac08c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -215,6 +215,7 @@ import org.apache.hadoop.hive.serde2.NullStructSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.compression.CompDe; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -7002,6 +7003,17 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input) table_desc= PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, ThriftJDBCBinarySerDe.class); + String compdeName = SessionState.get().getConf() + .getVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR); + String compdeVersion = SessionState.get().getConf() + .getVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_VERSION); + if (!compdeName.isEmpty() && !compdeVersion.isEmpty()) { + table_desc.getProperties().put(CompDe.confName, compdeName); + table_desc.getProperties().put(CompDe.confVersion, compdeVersion); + table_desc.getProperties().put(CompDe.confParams, + SessionState.get().getConf() + .getValByRegex(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR + "\\." + compdeName + "\\.[\\w|\\d]+")); + } // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll // write out formatted thrift objects to SequenceFile conf.set(SerDeUtils.LIST_SINK_OUTPUT_FORMATTER, NoOpFetchFormatter.class.getName()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java index eab1886..e950309 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/SetProcessor.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.conf.HiveVariableSource; import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -226,7 +227,8 @@ static String setConf(String varname, String key, String varvalue, boolean regis message.append("' FAILED in validation : ").append(fail).append('.'); throw new IllegalArgumentException(message.toString()); } - } else if (!removedConfigs.contains(key) && key.startsWith("hive.")) { + } else if (!removedConfigs.contains(key) && key.startsWith("hive.") + && !key.startsWith(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR.varname) ) { throw new IllegalArgumentException("hive configuration " + key + " does not exists."); } } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDe.java new file mode 100644 index 0000000..11ae250 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDe.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.serde2.compression; + +import java.nio.ByteBuffer; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +/** + * CompDe is the interface used to compress or decompress a set of ColumnBuffer. + * It is used to compress results in ThriftJDBCBinarySerDe to send to remote + * clients. + * + */ +public interface CompDe { + + /** + * Table descriptor keys. + */ + public static String confName = "compde.name"; + public static String confVersion = "compde.version"; + public static String confParams = "compde.params"; + + /** + * Negotiate the server and client plug-in parameters. + * parameters. + * @param serverParams The server's default parameters for this plug-in. + * @param clientParams The client's requested parameters for this plug-in. + * @throws Exception if the plug-in failed to initialize. + */ + public Map getParams( + Map serverParams, + Map clientParams) + throws Exception; + + /** + * Initialize the plug-in with parameters. + * @param params + * @throws Exception if the plug-in failed to initialize. + */ + public void init(Map params) + throws Exception; + + /** + * Compress a set of columns. + * @param colSet The set of columns to be compressed. + * @return ByteBuffer representing the compressed set. + * @throws Exception + */ + public ByteBuffer compress(ColumnBuffer[] colSet) + throws Exception; + + /** + * Decompress a set of columns from a ByteBuffer and update the position of + * the buffer. + * @param input A ByteBuffer with `position` indicating the starting point + * of the compressed chunk. + * @param chunkSize The length of the compressed chunk to be decompressed from + * the input buffer. + * @return The set of columns. + * @throws Exception + */ + public ColumnBuffer[] decompress(ByteBuffer input, int chunkSize) + throws Exception; + + /** + * Provide a namespace for the plug-in. + * @return The vendor name. + */ + public String getVendor(); + + /** + * Provide a name for the plug-in. + * @return The plug-in name. + */ + public String getName(); + + /** + * Provide the version of the plug-in. + * @return The plug-in version. + */ + public String getVersion(); +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDeServiceLoader.java b/serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDeServiceLoader.java new file mode 100644 index 0000000..bc57e65 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/compression/CompDeServiceLoader.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.compression; + +import java.util.Iterator; +import java.util.ServiceLoader; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableMap; + +/** + * Load classes that implement CompDe when starting up, and serve them at run + * time. + * + */ +public class CompDeServiceLoader { + private static CompDeServiceLoader instance; + + // Map CompDe names to classes so we don't have to read the META-INF file for every session. + private ImmutableMap, Class> compdeTable; + + private static final Logger LOG = LoggerFactory.getLogger(CompDeServiceLoader.class); + + /** + * Get the singleton instance or initialize the CompDeServiceLoader. + * @return A singleton instance of CompDeServiceLoader. + */ + public static synchronized CompDeServiceLoader getInstance() { + if (instance == null) { + Iterator compdes = ServiceLoader.load(CompDe.class).iterator(); + instance = new CompDeServiceLoader(); + ImmutableMap.Builder, Class> compdeMapBuilder = + new ImmutableMap.Builder<>(); + while (compdes.hasNext()) { + CompDe compde = compdes.next(); + compdeMapBuilder.put( + ImmutablePair.of( + compde.getVendor() + "." + compde.getName(), + compde.getVersion()), + compde.getClass()); + } + instance.compdeTable = compdeMapBuilder.build(); + } + return instance; + } + + /** + * Get a new instance of the CompDe. + * @param compdeName The compressor name qualified by the vendor namespace. + * ie. hive.snappy + * @param version The plug-in version. + * @return A CompDe implementation object. + * @throws Exception if the plug-in cannot be instantiated. + */ + public CompDe getCompde(String compdeName, String version) throws Exception { + try { + ImmutablePair requestedCompde = + ImmutablePair.of(compdeName, version); + CompDe compde = compdeTable.get(requestedCompde).newInstance(); + LOG.debug("Instantiated CompDe plugin for " + compdeName); + return compde; + } catch (Exception e) { + LOG.debug("CompDe plug-in cannot be instantiated for " + compdeName); + throw e; + } + } + +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java index 5c31974..3085bac 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ThriftJDBCBinarySerDe.java @@ -18,18 +18,24 @@ package org.apache.hadoop.hive.serde2.thrift; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.compression.CompDe; +import org.apache.hadoop.hive.serde2.compression.CompDeServiceLoader; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -65,12 +71,33 @@ private int MAX_BUFFERED_ROWS; private int count; private StructObjectInspector rowObjectInspector; + private CompDe compde; @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { + if (tbl.containsKey(CompDe.confName) + && tbl.containsKey(CompDe.confVersion)) { + String compdeName = tbl.getProperty(CompDe.confName, null); + String compdeVersion = tbl.getProperty(CompDe.confVersion, null); + try { + compde = CompDeServiceLoader.getInstance() + .getCompde(compdeName, compdeVersion); + if (tbl.containsKey(CompDe.confParams)) { + @SuppressWarnings("unchecked") + Map compdeParams = (Map) tbl.get(CompDe.confParams); + compde.init(compdeParams); + } + else { + compde.init(new HashMap()); + } + } catch (Exception e) { + throw new SerDeException(e); + } + } + // Get column names - MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); + MAX_BUFFERED_ROWS = HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_MAX_FETCH_SIZE); String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); if (columnNameProperty.length() == 0) { @@ -102,32 +129,43 @@ public void initialize(Configuration conf, Properties tbl) throws SerDeException } private Writable serializeBatch() throws SerDeException { - output.reset(); - for (int i = 0; i < columnBuffers.length; i++) { - TColumn tColumn = columnBuffers[i].toTColumn(); - try { - tColumn.write(protocol); - } catch(TException e) { - throw new SerDeException(e); - } - } - initializeRowAndColumns(); - serializedBytesWritable.set(output.getData(), 0, output.getLength()); - return serializedBytesWritable; + output.reset(); + + if (compde != null) { + try { + protocol.writeBinary(compde.compress(columnBuffers)); + } catch (Exception e) { + throw new SerDeException(e); + } + } + else { + for (int i = 0; i < columnBuffers.length; i++) { + TColumn tColumn = columnBuffers[i].toTColumn(); + try { + tColumn.write(protocol); + } catch(TException e) { + throw new SerDeException(e); + } + } + } + + initializeRowAndColumns(); + serializedBytesWritable.set(output.getData(), 0, output.getLength()); + return serializedBytesWritable; } // use the columnNames to initialize the reusable row object and the columnBuffers. reason this is being done is if buffer is full, we should reinitialize the // column buffers, otherwise at the end when closeOp() is called, things get printed multiple times. private void initializeRowAndColumns() { - row = new ArrayList(columnNames.size()); - for (int i = 0; i < columnNames.size(); i++) { - row.add(null); - } - // Initialize column buffers - columnBuffers = new ColumnBuffer[columnNames.size()]; - for (int i = 0; i < columnBuffers.length; i++) { - columnBuffers[i] = new ColumnBuffer(Type.getType(columnTypes.get(i))); - } + row = new ArrayList(columnNames.size()); + for (int i = 0; i < columnNames.size(); i++) { + row.add(null); + } + // Initialize column buffers + columnBuffers = new ColumnBuffer[columnNames.size()]; + for (int i = 0; i < columnBuffers.length; i++) { + columnBuffers[i] = new ColumnBuffer(Type.getType(columnTypes.get(i))); + } } /** @@ -142,10 +180,10 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDe StructObjectInspector soi = (StructObjectInspector) objInspector; List fields = soi.getAllStructFieldRefs(); try { - Object[] formattedRow = (Object[]) thriftFormatter.convert(obj, objInspector); - for (int i = 0; i < columnNames.size(); i++) { - columnBuffers[i].addValue(formattedRow[i]); - } + Object[] formattedRow = (Object[]) thriftFormatter.convert(obj, objInspector); + for (int i = 0; i < columnNames.size(); i++) { + columnBuffers[i].addValue(formattedRow[i]); + } } catch (Exception e) { throw new SerDeException(e); } diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/compression/SnappyCompDe.java b/serde/src/test/org/apache/hadoop/hive/serde2/compression/SnappyCompDe.java new file mode 100644 index 0000000..d4fd762 --- /dev/null +++ b/serde/src/test/org/apache/hadoop/hive/serde2/compression/SnappyCompDe.java @@ -0,0 +1,562 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.compression; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.DoubleBuffer; +import java.nio.IntBuffer; +import java.nio.LongBuffer; +import java.nio.ShortBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer; +import org.apache.hive.service.rpc.thrift.*; +import org.iq80.snappy.Snappy; + +public class SnappyCompDe implements CompDe { + + /** + * Negotiate the server and client plug-in parameters. + * parameters. + * @param serverParams The server's default parameters for this plug-in. + * @param clientParams The client's requested parameters for this plug-in. + * @throws Exception if the plug-in failed to initialize. + */ + public Map getParams( + Map serverParams, + Map clientParams) + throws Exception { + return serverParams; + } + + /** + * Get the configuration settings of the CompDe after it has been initialized. + * @return The CompDe configuration. + */ + @Override + public void init(Map params) { + return; + } + + /** + * Compress a set of columns. + * + * The header contains a compressed array of data types. + * The body contains compressed columns and their metadata. + * The footer contains a compressed array of chunk sizes. + * The final four bytes of the footer encode the byte size of that + * compressed array. + * + * @param colSet The set of columns to be compressed. + * + * @return ByteBuffer representing the compressed set. + * @throws IOException on failure to compress. + * @throws SerDeException on invalid ColumnBuffer metadata. + */ + @Override + public ByteBuffer compress(ColumnBuffer[] colSet) + throws IOException, SerDeException { + + // Many compression libraries let you avoid allocation of intermediate arrays. + // To use these API, we pre-compute the size of the output container. + + // Reserve space for the header. + int[] dataType = new int[colSet.length]; + int maxCompressedSize = Snappy.maxCompressedLength(4*dataType.length); + + // Reserve space for the compressed nulls BitSet for each column. + maxCompressedSize += colSet.length * Snappy.maxCompressedLength((colSet.length/8) + 1); + + // Track the length of `List compressedSize` which will be declared later. + int uncompressedFooterLength = 1 + 2*colSet.length; + + for (int colNum = 0; colNum < colSet.length; ++colNum) { + // Reserve space for the compressed columns. + dataType[colNum] = colSet[colNum].getType().toTType().getValue(); + switch (TTypeId.findByValue(dataType[colNum])) { + case BOOLEAN_TYPE: + maxCompressedSize += Integer.SIZE / Byte.SIZE; // This is for the encoded length. + maxCompressedSize += Snappy.maxCompressedLength((colSet.length/8) + 1); + break; + case TINYINT_TYPE: + maxCompressedSize += Snappy.maxCompressedLength(colSet.length); + break; + case SMALLINT_TYPE: + maxCompressedSize += Snappy.maxCompressedLength(colSet.length * Short.SIZE / Byte.SIZE); + break; + case INT_TYPE: + maxCompressedSize += Snappy.maxCompressedLength(colSet.length * Integer.SIZE / Byte.SIZE); + break; + case BIGINT_TYPE: + maxCompressedSize += Snappy.maxCompressedLength(colSet.length * Long.SIZE / Byte.SIZE); + break; + case DOUBLE_TYPE: + maxCompressedSize += Snappy.maxCompressedLength(colSet.length * Double.SIZE / Byte.SIZE); + break; + case BINARY_TYPE: + // Reserve space for the size of the compressed array of row sizes. + maxCompressedSize += Snappy.maxCompressedLength(colSet.length * Integer.SIZE / Byte.SIZE); + + // Reserve space for the size of the compressed flattened bytes. + int rawBinaryLength = 0; + for (ByteBuffer nextBuffer : colSet[colNum].toTColumn().getBinaryVal().getValues()) { + rawBinaryLength += nextBuffer.limit(); + } + maxCompressedSize += Snappy.maxCompressedLength(rawBinaryLength); + + // Add an additional value to the list of compressed chunk sizes (length of `rowSize` array). + uncompressedFooterLength++; + + break; + case STRING_TYPE: + // Reserve space for the size of the compressed array of row sizes. + maxCompressedSize += Snappy.maxCompressedLength(colSet.length * Integer.SIZE / Byte.SIZE); + + // Reserve space for the size of the compressed flattened bytes. + int rawStringLength = 0; + for (String nextString: colSet[colNum].toTColumn().getStringVal().getValues()) { + rawStringLength += nextString.getBytes(StandardCharsets.UTF_8).length; + } + maxCompressedSize += Snappy.maxCompressedLength(rawStringLength); + + // Add an additional value to the list of compressed chunk sizes (length of `rowSize` array). + uncompressedFooterLength++; + + break; + default: + throw new SerDeException("Unrecognized column type: " + TTypeId.findByValue(dataType[colNum])); + } + } + // Reserve space for the footer. + maxCompressedSize += Snappy.maxCompressedLength(uncompressedFooterLength * Integer.SIZE / Byte.SIZE); + + // Allocate the output container. + ByteBuffer output = ByteBuffer.allocate(maxCompressedSize); + + // Allocate the footer. This goes in the footer because we don't know the chunk sizes until after + // the columns have been compressed and written. + ArrayList compressedSize = new ArrayList(uncompressedFooterLength); + + // Write to the output buffer. + + // Write the header. + compressedSize.add(writePrimitives(dataType, output)); + + // Write the compressed columns and metadata. + for (int colNum = 0; colNum < colSet.length; colNum++) { + switch (TTypeId.findByValue(dataType[colNum])) { + case BOOLEAN_TYPE: { + TBoolColumn column = colSet[colNum].toTColumn().getBoolVal(); + + List bools = column.getValues(); + BitSet bsBools = new BitSet(bools.size()); + for (int rowNum = 0; rowNum < bools.size(); rowNum++) { + bsBools.set(rowNum, bools.get(rowNum)); + } + + compressedSize.add(writePrimitives(column.getNulls(), output)); + + // BitSet won't write trailing zeroes so we encode the length + output.putInt(column.getValuesSize()); + + compressedSize.add(writePrimitives(bsBools.toByteArray(), output)); + + break; + } + case TINYINT_TYPE: { + TByteColumn column = colSet[colNum].toTColumn().getByteVal(); + compressedSize.add(writePrimitives(column.getNulls(), output)); + compressedSize.add(writeBoxedBytes(column.getValues(), output)); + break; + } + case SMALLINT_TYPE: { + TI16Column column = colSet[colNum].toTColumn().getI16Val(); + compressedSize.add(writePrimitives(column.getNulls(), output)); + compressedSize.add(writeBoxedShorts(column.getValues(), output)); + break; + } + case INT_TYPE: { + TI32Column column = colSet[colNum].toTColumn().getI32Val(); + compressedSize.add(writePrimitives(column.getNulls(), output)); + compressedSize.add(writeBoxedIntegers(column.getValues(), output)); + break; + } + case BIGINT_TYPE: { + TI64Column column = colSet[colNum].toTColumn().getI64Val(); + compressedSize.add(writePrimitives(column.getNulls(), output)); + compressedSize.add(writeBoxedLongs(column.getValues(), output)); + break; + } + case DOUBLE_TYPE: { + TDoubleColumn column = colSet[colNum].toTColumn().getDoubleVal(); + compressedSize.add(writePrimitives(column.getNulls(), output)); + compressedSize.add(writeBoxedDoubles(column.getValues(), output)); + break; + } + case BINARY_TYPE: { + TBinaryColumn column = colSet[colNum].toTColumn().getBinaryVal(); + + // Initialize the array of row sizes. + int[] rowSizes = new int[column.getValuesSize()]; + int totalSize = 0; + for (int rowNum = 0; rowNum < column.getValuesSize(); rowNum++) { + rowSizes[rowNum] = column.getValues().get(rowNum).limit(); + totalSize += column.getValues().get(rowNum).limit(); + } + + // Flatten the data for Snappy for a better compression ratio. + ByteBuffer flattenedData = ByteBuffer.allocate(totalSize); + for (int rowNum = 0; rowNum < column.getValuesSize(); rowNum++) { + flattenedData.put(column.getValues().get(rowNum)); + } + + // Write nulls bitmap. + compressedSize.add(writePrimitives(column.getNulls(), output)); + + // Write the list of row sizes. + compressedSize.add(writePrimitives(rowSizes, output)); + + // Write the compressed, flattened data. + compressedSize.add(writePrimitives(flattenedData.array(), output)); + + break; + } + case STRING_TYPE: { + TStringColumn column = colSet[colNum].toTColumn().getStringVal(); + + // Initialize the array of row sizes. + int[] rowSizes = new int[column.getValuesSize()]; + int totalSize = 0; + for (int rowNum = 0; rowNum < column.getValuesSize(); rowNum++) { + rowSizes[rowNum] = column.getValues().get(rowNum).length(); + totalSize += column.getValues().get(rowNum).length(); + } + + // Flatten the data for Snappy for a better compression ratio. + StringBuilder flattenedData = new StringBuilder(totalSize); + for (int rowNum = 0; rowNum < column.getValuesSize(); rowNum++) { + flattenedData.append(column.getValues().get(rowNum)); + } + + // Write nulls bitmap. + compressedSize.add(writePrimitives(column.getNulls(), output)); + + // Write the list of row sizes. + compressedSize.add(writePrimitives(rowSizes, output)); + + // Write the flattened data. + compressedSize.add(writePrimitives( + flattenedData.toString().getBytes(StandardCharsets.UTF_8), output)); + + break; + } + default: + throw new SerDeException("Unrecognized column type: " + TTypeId.findByValue(dataType[colNum])); + } + } + + // Write the footer. + output.putInt(writeBoxedIntegers(compressedSize, output)); + + output.flip(); + return output; + } + + /** + * Write compressed data to the output ByteBuffer and update the position of + * the buffer. + * + * @param boxedVals A list of boxed Java primitives. + * @param output The buffer to append with the compressed bytes. + * + * @return The number of bytes written. + * @throws IOException on failure to write compressed data. + */ + private int writeBoxedBytes(List boxedVals, ByteBuffer output)throws IOException { + return writePrimitives(ArrayUtils.toPrimitive(boxedVals.toArray(new Byte[0])), output); + } + private int writeBoxedShorts(List boxedVals, ByteBuffer output) throws IOException { + return writePrimitives(ArrayUtils.toPrimitive(boxedVals.toArray(new Short[0])), output); + } + private int writeBoxedIntegers(List boxedVals, ByteBuffer output) throws IOException { + return writePrimitives(ArrayUtils.toPrimitive(boxedVals.toArray(new Integer[0])), output); + } + private int writeBoxedLongs(List boxedVals, ByteBuffer output) throws IOException { + return writePrimitives(ArrayUtils.toPrimitive(boxedVals.toArray(new Long[0])), output); + } + private int writeBoxedDoubles(List boxedVals, ByteBuffer output) throws IOException { + return writePrimitives(ArrayUtils.toPrimitive(boxedVals.toArray(new Double[0])), output); + } + + /** + * Write compressed data to the output ByteBuffer and update the position of + * the buffer. + * + * @param primitives An array of primitive data types. + * @param output The buffer to append with the compressed bytes. + * + * @return The number of bytes written. + * @throws IOException on failure to write compressed data. + */ + private int writePrimitives(byte[] primitives, ByteBuffer output) throws IOException { + int bytesWritten = Snappy.compress(primitives, 0, primitives.length, output.array(), output.arrayOffset() + output.position()); + output.position(output.position() + bytesWritten); + return bytesWritten; + } + private int writePrimitives(short[] primitives, ByteBuffer output) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(primitives.length * Short.SIZE / Byte.SIZE); + ShortBuffer view = buffer.asShortBuffer(); + view.put(primitives); + int bytesWritten = Snappy.compress(buffer.array(), 0, buffer.capacity(), output.array(), output.arrayOffset() + output.position()); + output.position(output.position() + bytesWritten); + return bytesWritten; + } + private int writePrimitives(int[] primitives, ByteBuffer output) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(primitives.length * Integer.SIZE / Byte.SIZE); + IntBuffer view = buffer.asIntBuffer(); + view.put(primitives); + int bytesWritten = Snappy.compress(buffer.array(), 0, buffer.capacity(), output.array(), output.arrayOffset() + output.position()); + output.position(output.position() + bytesWritten); + return bytesWritten; + } + private int writePrimitives(long[] primitives, ByteBuffer output) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(primitives.length * Long.SIZE / Byte.SIZE); + LongBuffer view = buffer.asLongBuffer(); + view.put(primitives); + int bytesWritten = Snappy.compress(buffer.array(), 0, buffer.capacity(), output.array(), output.arrayOffset() + output.position()); + output.position(output.position() + bytesWritten); + return bytesWritten; + } + private int writePrimitives(double[] primitives, ByteBuffer output) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(primitives.length * Double.SIZE / Byte.SIZE); + DoubleBuffer view = buffer.asDoubleBuffer(); + view.put(primitives); + int bytesWritten = Snappy.compress(buffer.array(), 0, buffer.capacity(), output.array(), output.arrayOffset() + output.position()); + output.position(output.position() + bytesWritten); + return bytesWritten; + } + + /** + * Decompress a set of columns from a ByteBuffer and update the position of + * the buffer. + * + * @param input A ByteBuffer with `position` indicating the starting point + * of the compressed chunk. + * @param chunkSize The length of the compressed chunk to be decompressed from + * the input buffer. + * + * @return The set of columns. + * @throws IOException on failure to decompress. + * @throws SerDeException on invalid ColumnBuffer metadata. + */ + @Override + public ColumnBuffer[] decompress(ByteBuffer input, int chunkSize) throws IOException, SerDeException { + int startPos = input.position(); + + // Read the footer. + int footerSize = input.getInt(startPos + chunkSize - 4); + ByteBuffer footerView = input.slice(); + footerView.position(startPos + chunkSize - Integer.SIZE / Byte.SIZE - footerSize); + int[] compressedSizePrimitives = readIntegers(footerSize, footerView); + Iterator compressedSize = + Arrays.asList(ArrayUtils.toObject(compressedSizePrimitives)).iterator(); + + // Read the header. + int[] dataType = readIntegers(compressedSize.next(), input); + int numOfCols = dataType.length; + + // Read the columns. + ColumnBuffer[] outputCols = new ColumnBuffer[numOfCols]; + for (int colNum = 0; colNum < numOfCols; colNum++) { + byte[] nulls = readBytes(compressedSize.next(), input); + + switch (TTypeId.findByValue(dataType[colNum])) { + case BOOLEAN_TYPE: { + int numRows = input.getInt(); + byte[] vals = readBytes(compressedSize.next(), input); + BitSet bsBools = BitSet.valueOf(vals); + + boolean[] bools = new boolean[numRows]; + for (int rowNum = 0; rowNum < numRows; rowNum++) { + bools[rowNum] = bsBools.get(rowNum); + } + + TBoolColumn column = new TBoolColumn(Arrays.asList(ArrayUtils.toObject(bools)), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.boolVal(column)); + break; + } + case TINYINT_TYPE: { + byte[] vals = readBytes(compressedSize.next(), input); + TByteColumn column = new TByteColumn(Arrays.asList(ArrayUtils.toObject(vals)), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.byteVal(column)); + break; + } + case SMALLINT_TYPE: { + short[] vals = readShorts(compressedSize.next(), input); + TI16Column column = new TI16Column(Arrays.asList(ArrayUtils.toObject(vals)), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.i16Val(column)); + break; + } + case INT_TYPE: { + int[] vals = readIntegers(compressedSize.next(), input); + TI32Column column = new TI32Column(Arrays.asList(ArrayUtils.toObject(vals)), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.i32Val(column)); + break; + } + case BIGINT_TYPE: { + long[] vals = readLongs(compressedSize.next(), input); + TI64Column column = new TI64Column(Arrays.asList(ArrayUtils.toObject(vals)), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.i64Val(column)); + break; + } + case DOUBLE_TYPE: { + double[] vals = readDoubles(compressedSize.next(), input); + TDoubleColumn column = new TDoubleColumn(Arrays.asList(ArrayUtils.toObject(vals)), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.doubleVal(column)); + break; + } + case BINARY_TYPE: { + int[] rowSize = readIntegers(compressedSize.next(), input); + + ByteBuffer flattenedData = ByteBuffer.wrap(readBytes(compressedSize.next(), input)); + ByteBuffer[] vals = new ByteBuffer[rowSize.length]; + + for (int rowNum = 0; rowNum < rowSize.length; rowNum++) { + vals[rowNum] = ByteBuffer.wrap(flattenedData.array(), flattenedData.position(), rowSize[rowNum]); + flattenedData.position(flattenedData.position() + rowSize[rowNum]); + } + + TBinaryColumn column = new TBinaryColumn(Arrays.asList(vals), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.binaryVal(column)); + break; + } + case STRING_TYPE: { + int[] rowSize = readIntegers(compressedSize.next(), input); + + ByteBuffer flattenedData = ByteBuffer.wrap(readBytes(compressedSize.next(), input)); + + String[] vals = new String[rowSize.length]; + + for (int rowNum = 0; rowNum < rowSize.length; rowNum++) { + vals[rowNum] = new String(flattenedData.array(), flattenedData.position(), rowSize[rowNum], StandardCharsets.UTF_8); + flattenedData.position(flattenedData.position() + rowSize[rowNum]); + } + + TStringColumn column = new TStringColumn(Arrays.asList(vals), ByteBuffer.wrap(nulls)); + outputCols[colNum] = new ColumnBuffer(TColumn.stringVal(column)); + break; + } + default: + throw new SerDeException("Unrecognized column type: " + TTypeId.findByValue(dataType[colNum])); + } + } + input.position(startPos + chunkSize); + return outputCols; + } + + /** + * Read a chunk from a ByteBuffer and advance the buffer position. + * + * @param chunkSize The number of bytes to decompress starting at the current + * position. + * @param input The buffer to read from. + * + * @return An array of primitives. + * @throws IOException on failure to decompress. + */ + private byte[] readBytes(int chunkSize, ByteBuffer input) throws IOException { + byte[] uncompressedBytes = new byte[Snappy.getUncompressedLength(input.array(), input.arrayOffset() + input.position())]; + Snappy.uncompress(input.array(), input.arrayOffset() + input.position(), chunkSize, uncompressedBytes, 0); + input.position(input.position() + chunkSize); + return uncompressedBytes; + } + private short[] readShorts(int chunkSize, ByteBuffer input) throws IOException { + byte[] uncompressedBytes = new byte[Snappy.getUncompressedLength(input.array(), input.arrayOffset() + input.position())]; + Snappy.uncompress(input.array(), input.arrayOffset() + input.position(), chunkSize, uncompressedBytes, 0); + ShortBuffer view = ByteBuffer.wrap(uncompressedBytes).asShortBuffer(); + short[] vals = new short[view.capacity()]; + view.get(vals); + input.position(input.position() + chunkSize); + return vals; + } + private int[] readIntegers(int chunkSize, ByteBuffer input) throws IOException { + byte[] uncompressedBytes = new byte[Snappy.getUncompressedLength(input.array(), input.arrayOffset() + input.position())]; + Snappy.uncompress(input.array(), input.arrayOffset() + input.position(), chunkSize, uncompressedBytes, 0); + IntBuffer view = ByteBuffer.wrap(uncompressedBytes).asIntBuffer(); + int[] vals = new int[view.capacity()]; + view.get(vals); + input.position(input.position() + chunkSize); + return vals; + } + private long[] readLongs(int chunkSize, ByteBuffer input) throws IOException { + byte[] uncompressedBytes = new byte[Snappy.getUncompressedLength(input.array(), input.arrayOffset() + input.position())]; + Snappy.uncompress(input.array(), input.arrayOffset() + input.position(), chunkSize, uncompressedBytes, 0); + LongBuffer view = ByteBuffer.wrap(uncompressedBytes).asLongBuffer(); + long[] vals = new long[view.capacity()]; + view.get(vals); + input.position(input.position() + chunkSize); + return vals; + } + private double[] readDoubles(int chunkSize, ByteBuffer input) throws IOException { + byte[] uncompressedBytes = new byte[Snappy.getUncompressedLength(input.array(), input.arrayOffset() + input.position())]; + Snappy.uncompress(input.array(), input.arrayOffset() + input.position(), chunkSize, uncompressedBytes, 0); + DoubleBuffer view = ByteBuffer.wrap(uncompressedBytes).asDoubleBuffer(); + double[] vals = new double[view.capacity()]; + view.get(vals); + input.position(input.position() + chunkSize); + return vals; + } + + /** + * Provide a namespace for the plug-in. + * @return The vendor name. + */ + @Override + public String getVendor() { + return "hive"; + } + + /** + * Provide a name for the plug-in. + * @return The plug-in name. + */ + @Override + public String getName(){ + return "snappy"; + } + + /** + * Provide the version of the plug-in. + * @return The plug-in version. + */ + public String getVersion() { + return "1.0.0"; + } + +} diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/compression/TestSnappyCompDe.java b/serde/src/test/org/apache/hadoop/hive/serde2/compression/TestSnappyCompDe.java new file mode 100644 index 0000000..dd1f3c0 --- /dev/null +++ b/serde/src/test/org/apache/hadoop/hive/serde2/compression/TestSnappyCompDe.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.compression; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.serde2.compression.SnappyCompDe; +import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer; +import org.apache.hive.service.rpc.thrift.*; + +import static org.junit.Assert.assertArrayEquals; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; + +import org.junit.Before; +import org.junit.Test; + +public class TestSnappyCompDe { + private SnappyCompDe compDe = new SnappyCompDe(); + byte[] noNullMask = {0}; + byte[] firstNullMask = {1}; + byte[] secondNullMask = {2}; + byte[] thirdNullMask = {3}; + ColumnBuffer columnBinary; + ColumnBuffer columnBool; + ColumnBuffer columnByte; + ColumnBuffer columnShort; + ColumnBuffer columnInt; + ColumnBuffer columnLong; + ColumnBuffer columnDouble; + ColumnBuffer columnStr; + + @Before + public void init() throws Exception { + ByteBuffer firstRow = ByteBuffer.wrap(new byte[]{2, 33, 7, 75, 5}); + ByteBuffer secondRow = ByteBuffer.wrap(new byte[]{3, 21, 6}); + ByteBuffer thirdRow = ByteBuffer.wrap(new byte[]{52, 25, 74, 74, 64}); + firstRow.flip(); + secondRow.flip(); + thirdRow.flip(); + ArrayList someBinaries = new ArrayList(); + someBinaries.add(firstRow); + someBinaries.add(secondRow); + someBinaries.add(thirdRow); + columnBinary = new ColumnBuffer(TColumn.binaryVal( + new TBinaryColumn(someBinaries, ByteBuffer.wrap(new byte[]{})))); + + // Test leading and trailing `false` in column + ArrayList bools = new ArrayList(); + bools.add(false); + bools.add(true); + bools.add(false); + bools.add(true); + bools.add(false); + columnBool = new ColumnBuffer(TColumn.boolVal( + new TBoolColumn(bools, ByteBuffer.wrap(noNullMask)))); + + ArrayList bytes = new ArrayList(); + bytes.add((byte) 0); + bytes.add((byte) 1); + bytes.add((byte) 2); + bytes.add((byte) 3); + columnByte= new ColumnBuffer(TColumn.byteVal( + new TByteColumn(bytes, ByteBuffer.wrap(noNullMask)))); + + ArrayList shorts = new ArrayList(); + shorts.add((short) 0); + shorts.add((short) 1); + shorts.add((short) -127); + shorts.add((short) 127); + columnShort= new ColumnBuffer(TColumn.i16Val( + new TI16Column(shorts, ByteBuffer.wrap(noNullMask)))); + + ArrayList ints = new ArrayList(); + ints.add(0); + ints.add(1); + ints.add(-32767); + ints.add(32767); + columnInt= new ColumnBuffer(TColumn.i32Val( + new TI32Column(ints, ByteBuffer.wrap(noNullMask)))); + + ArrayList longs = new ArrayList(); + longs.add((long) 0); + longs.add((long) 1); + longs.add((long) -2147483647 ); + longs.add((long) 2147483647 ); + columnLong = new ColumnBuffer(TColumn.i64Val( + new TI64Column(longs, ByteBuffer.wrap(noNullMask)))); + + ArrayList doubles = new ArrayList(); + doubles.add((double) 0); + doubles.add((double) 1.0); + doubles.add((double) -2147483647.5 ); + doubles.add((double) 2147483647.5 ); + columnDouble= new ColumnBuffer(TColumn.doubleVal( + new TDoubleColumn(doubles, ByteBuffer.wrap(noNullMask)))); + + ArrayList strings = new ArrayList(); + strings.add("ABC"); + strings.add("DEFG"); + strings.add("HI"); + strings.add(StringUtils.rightPad("", 65535, 'j')); + strings.add(""); + columnStr = new ColumnBuffer(TColumn.stringVal( + new TStringColumn(strings, ByteBuffer.wrap(noNullMask)))); + + compDe.init(new HashMap()); + } + + @Test + public void testBinaryCol() throws Exception { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnBinary}; + + ByteBuffer compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, compressed.limit()); + + assertArrayEquals( + inputCols[0].toTColumn().getBinaryVal().getValues().toArray(), + outputCols[0].toTColumn().getBinaryVal().getValues().toArray()); + } + + @Test + public void testBoolCol() throws Exception { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnBool}; + + ByteBuffer compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, compressed.limit()); + + assertArrayEquals( + inputCols[0].toTColumn().getBoolVal().getValues().toArray(), + outputCols[0].toTColumn().getBoolVal().getValues().toArray()); + } + + @Test + public void testByteCol() throws Exception { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnByte}; + + ByteBuffer compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, compressed.limit()); + + assertArrayEquals( + inputCols[0].toTColumn().getByteVal().getValues().toArray(), + outputCols[0].toTColumn().getByteVal().getValues().toArray()); + } + + @Test + public void testIntCol() throws Exception { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnInt}; + + ByteBuffer compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, compressed.limit()); + + assertArrayEquals( + inputCols[0].toTColumn().getI32Val().getValues().toArray(), + outputCols[0].toTColumn().getI32Val().getValues().toArray()); + } + + @Test + public void testLongCol() throws Exception { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnLong}; + + ByteBuffer compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, compressed.limit()); + + assertArrayEquals( + inputCols[0].toTColumn().getI64Val().getValues().toArray(), + outputCols[0].toTColumn().getI64Val().getValues().toArray()); + } + + @Test + public void testDoubleCol() throws Exception { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnDouble}; + + ByteBuffer compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, compressed.limit()); + + assertArrayEquals( + inputCols[0].toTColumn().getDoubleVal().getValues().toArray(), + outputCols[0].toTColumn().getDoubleVal().getValues().toArray()); + } + + @Test + public void testStringCol() throws Exception { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnStr}; + + ByteBuffer compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, compressed.limit()); + + assertArrayEquals( + inputCols[0].toTColumn().getStringVal().getValues().toArray(), + outputCols[0].toTColumn().getStringVal().getValues().toArray()); + } + + @Test + public void testNulls() throws Exception { + ColumnBuffer[] inputCols; + ArrayList someStrings = new ArrayList(); + someStrings.add("test1"); + someStrings.add("test2"); + ColumnBuffer columnStr1 = new ColumnBuffer(TColumn.stringVal( + new TStringColumn(someStrings, ByteBuffer.wrap(firstNullMask)))); + ColumnBuffer columnStr2 = new ColumnBuffer(TColumn.stringVal( + new TStringColumn(someStrings, ByteBuffer.wrap(secondNullMask)))); + ColumnBuffer columnStr3 = new ColumnBuffer(TColumn.stringVal( + new TStringColumn(someStrings, ByteBuffer.wrap(thirdNullMask)))); + + inputCols = new ColumnBuffer[]{ + columnStr1, + columnStr2, + columnStr3}; + + ByteBuffer compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, compressed.limit()); + + assertArrayEquals(inputCols, outputCols); + } + + @Test + public void testMulti() throws Exception { + ColumnBuffer[] inputCols = new ColumnBuffer[]{columnInt, columnStr}; + + ByteBuffer compressed = compDe.compress(inputCols); + ColumnBuffer[] outputCols = compDe.decompress(compressed, compressed.limit()); + + assertArrayEquals( + inputCols[0].toTColumn().getI32Val().getValues().toArray(), + outputCols[0].toTColumn().getI32Val().getValues().toArray()); + assertArrayEquals( + inputCols[1].toTColumn().getStringVal().getValues().toArray(), + outputCols[1].toTColumn().getStringVal().getValues().toArray()); + } +} diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift index a4fa7b0..ad9201b 100644 --- a/service-rpc/if/TCLIService.thrift +++ b/service-rpc/if/TCLIService.thrift @@ -583,8 +583,14 @@ struct TOpenSessionResp { // Session Handle 3: optional TSessionHandle sessionHandle - // The configuration settings for this session. - 4: optional map configuration + // The CompDe configuration settings for this session. + 4: optional map compressorParameters + + // The name of the CompDe for this session. + 5: optional string compressorName + + // The version of the CompDe for this session. + 6: optional string compressorVersion } diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp index 2f460e8..607dd79 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.cpp @@ -4814,9 +4814,19 @@ void TOpenSessionResp::__set_sessionHandle(const TSessionHandle& val) { __isset.sessionHandle = true; } -void TOpenSessionResp::__set_configuration(const std::map & val) { - this->configuration = val; -__isset.configuration = true; +void TOpenSessionResp::__set_compressorParameters(const std::map & val) { + this->compressorParameters = val; +__isset.compressorParameters = true; +} + +void TOpenSessionResp::__set_compressorName(const std::string& val) { + this->compressorName = val; +__isset.compressorName = true; +} + +void TOpenSessionResp::__set_compressorVersion(const std::string& val) { + this->compressorVersion = val; +__isset.compressorVersion = true; } uint32_t TOpenSessionResp::read(::apache::thrift::protocol::TProtocol* iprot) { @@ -4871,7 +4881,7 @@ uint32_t TOpenSessionResp::read(::apache::thrift::protocol::TProtocol* iprot) { case 4: if (ftype == ::apache::thrift::protocol::T_MAP) { { - this->configuration.clear(); + this->compressorParameters.clear(); uint32_t _size193; ::apache::thrift::protocol::TType _ktype194; ::apache::thrift::protocol::TType _vtype195; @@ -4881,12 +4891,28 @@ uint32_t TOpenSessionResp::read(::apache::thrift::protocol::TProtocol* iprot) { { std::string _key198; xfer += iprot->readString(_key198); - std::string& _val199 = this->configuration[_key198]; + std::string& _val199 = this->compressorParameters[_key198]; xfer += iprot->readString(_val199); } xfer += iprot->readMapEnd(); } - this->__isset.configuration = true; + this->__isset.compressorParameters = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 5: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->compressorName); + this->__isset.compressorName = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 6: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->compressorVersion); + this->__isset.compressorVersion = true; } else { xfer += iprot->skip(ftype); } @@ -4925,12 +4951,12 @@ uint32_t TOpenSessionResp::write(::apache::thrift::protocol::TProtocol* oprot) c xfer += this->sessionHandle.write(oprot); xfer += oprot->writeFieldEnd(); } - if (this->__isset.configuration) { - xfer += oprot->writeFieldBegin("configuration", ::apache::thrift::protocol::T_MAP, 4); + if (this->__isset.compressorParameters) { + xfer += oprot->writeFieldBegin("compressorParameters", ::apache::thrift::protocol::T_MAP, 4); { - xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRING, ::apache::thrift::protocol::T_STRING, static_cast(this->configuration.size())); + xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRING, ::apache::thrift::protocol::T_STRING, static_cast(this->compressorParameters.size())); std::map ::const_iterator _iter200; - for (_iter200 = this->configuration.begin(); _iter200 != this->configuration.end(); ++_iter200) + for (_iter200 = this->compressorParameters.begin(); _iter200 != this->compressorParameters.end(); ++_iter200) { xfer += oprot->writeString(_iter200->first); xfer += oprot->writeString(_iter200->second); @@ -4939,6 +4965,16 @@ uint32_t TOpenSessionResp::write(::apache::thrift::protocol::TProtocol* oprot) c } xfer += oprot->writeFieldEnd(); } + if (this->__isset.compressorName) { + xfer += oprot->writeFieldBegin("compressorName", ::apache::thrift::protocol::T_STRING, 5); + xfer += oprot->writeString(this->compressorName); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.compressorVersion) { + xfer += oprot->writeFieldBegin("compressorVersion", ::apache::thrift::protocol::T_STRING, 6); + xfer += oprot->writeString(this->compressorVersion); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -4949,7 +4985,9 @@ void swap(TOpenSessionResp &a, TOpenSessionResp &b) { swap(a.status, b.status); swap(a.serverProtocolVersion, b.serverProtocolVersion); swap(a.sessionHandle, b.sessionHandle); - swap(a.configuration, b.configuration); + swap(a.compressorParameters, b.compressorParameters); + swap(a.compressorName, b.compressorName); + swap(a.compressorVersion, b.compressorVersion); swap(a.__isset, b.__isset); } @@ -4957,14 +4995,18 @@ TOpenSessionResp::TOpenSessionResp(const TOpenSessionResp& other201) { status = other201.status; serverProtocolVersion = other201.serverProtocolVersion; sessionHandle = other201.sessionHandle; - configuration = other201.configuration; + compressorParameters = other201.compressorParameters; + compressorName = other201.compressorName; + compressorVersion = other201.compressorVersion; __isset = other201.__isset; } TOpenSessionResp& TOpenSessionResp::operator=(const TOpenSessionResp& other202) { status = other202.status; serverProtocolVersion = other202.serverProtocolVersion; sessionHandle = other202.sessionHandle; - configuration = other202.configuration; + compressorParameters = other202.compressorParameters; + compressorName = other202.compressorName; + compressorVersion = other202.compressorVersion; __isset = other202.__isset; return *this; } @@ -4974,7 +5016,9 @@ void TOpenSessionResp::printTo(std::ostream& out) const { out << "status=" << to_string(status); out << ", " << "serverProtocolVersion=" << to_string(serverProtocolVersion); out << ", " << "sessionHandle="; (__isset.sessionHandle ? (out << to_string(sessionHandle)) : (out << "")); - out << ", " << "configuration="; (__isset.configuration ? (out << to_string(configuration)) : (out << "")); + out << ", " << "compressorParameters="; (__isset.compressorParameters ? (out << to_string(compressorParameters)) : (out << "")); + out << ", " << "compressorName="; (__isset.compressorName ? (out << to_string(compressorName)) : (out << "")); + out << ", " << "compressorVersion="; (__isset.compressorVersion ? (out << to_string(compressorVersion)) : (out << "")); out << ")"; } diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h index b249544..b945adc 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_types.h @@ -2189,9 +2189,11 @@ inline std::ostream& operator<<(std::ostream& out, const TOpenSessionReq& obj) } typedef struct _TOpenSessionResp__isset { - _TOpenSessionResp__isset() : sessionHandle(false), configuration(false) {} + _TOpenSessionResp__isset() : sessionHandle(false), compressorParameters(false), compressorName(false), compressorVersion(false) {} bool sessionHandle :1; - bool configuration :1; + bool compressorParameters :1; + bool compressorName :1; + bool compressorVersion :1; } _TOpenSessionResp__isset; class TOpenSessionResp { @@ -2199,7 +2201,7 @@ class TOpenSessionResp { TOpenSessionResp(const TOpenSessionResp&); TOpenSessionResp& operator=(const TOpenSessionResp&); - TOpenSessionResp() : serverProtocolVersion((TProtocolVersion::type)8) { + TOpenSessionResp() : serverProtocolVersion((TProtocolVersion::type)8), compressorName(), compressorVersion() { serverProtocolVersion = (TProtocolVersion::type)8; } @@ -2208,7 +2210,9 @@ class TOpenSessionResp { TStatus status; TProtocolVersion::type serverProtocolVersion; TSessionHandle sessionHandle; - std::map configuration; + std::map compressorParameters; + std::string compressorName; + std::string compressorVersion; _TOpenSessionResp__isset __isset; @@ -2218,7 +2222,11 @@ class TOpenSessionResp { void __set_sessionHandle(const TSessionHandle& val); - void __set_configuration(const std::map & val); + void __set_compressorParameters(const std::map & val); + + void __set_compressorName(const std::string& val); + + void __set_compressorVersion(const std::string& val); bool operator == (const TOpenSessionResp & rhs) const { @@ -2230,9 +2238,17 @@ class TOpenSessionResp { return false; else if (__isset.sessionHandle && !(sessionHandle == rhs.sessionHandle)) return false; - if (__isset.configuration != rhs.__isset.configuration) + if (__isset.compressorParameters != rhs.__isset.compressorParameters) return false; - else if (__isset.configuration && !(configuration == rhs.configuration)) + else if (__isset.compressorParameters && !(compressorParameters == rhs.compressorParameters)) + return false; + if (__isset.compressorName != rhs.__isset.compressorName) + return false; + else if (__isset.compressorName && !(compressorName == rhs.compressorName)) + return false; + if (__isset.compressorVersion != rhs.__isset.compressorVersion) + return false; + else if (__isset.compressorVersion && !(compressorVersion == rhs.compressorVersion)) return false; return true; } diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOpenSessionResp.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOpenSessionResp.java index 0a3c121..06a09d1 100644 --- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOpenSessionResp.java +++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TOpenSessionResp.java @@ -41,7 +41,9 @@ private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.STRUCT, (short)1); private static final org.apache.thrift.protocol.TField SERVER_PROTOCOL_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("serverProtocolVersion", org.apache.thrift.protocol.TType.I32, (short)2); private static final org.apache.thrift.protocol.TField SESSION_HANDLE_FIELD_DESC = new org.apache.thrift.protocol.TField("sessionHandle", org.apache.thrift.protocol.TType.STRUCT, (short)3); - private static final org.apache.thrift.protocol.TField CONFIGURATION_FIELD_DESC = new org.apache.thrift.protocol.TField("configuration", org.apache.thrift.protocol.TType.MAP, (short)4); + private static final org.apache.thrift.protocol.TField COMPRESSOR_PARAMETERS_FIELD_DESC = new org.apache.thrift.protocol.TField("compressorParameters", org.apache.thrift.protocol.TType.MAP, (short)4); + private static final org.apache.thrift.protocol.TField COMPRESSOR_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("compressorName", org.apache.thrift.protocol.TType.STRING, (short)5); + private static final org.apache.thrift.protocol.TField COMPRESSOR_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("compressorVersion", org.apache.thrift.protocol.TType.STRING, (short)6); private static final Map, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -52,7 +54,9 @@ private TStatus status; // required private TProtocolVersion serverProtocolVersion; // required private TSessionHandle sessionHandle; // optional - private Map configuration; // optional + private Map compressorParameters; // optional + private String compressorName; // optional + private String compressorVersion; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -63,7 +67,9 @@ */ SERVER_PROTOCOL_VERSION((short)2, "serverProtocolVersion"), SESSION_HANDLE((short)3, "sessionHandle"), - CONFIGURATION((short)4, "configuration"); + COMPRESSOR_PARAMETERS((short)4, "compressorParameters"), + COMPRESSOR_NAME((short)5, "compressorName"), + COMPRESSOR_VERSION((short)6, "compressorVersion"); private static final Map byName = new HashMap(); @@ -84,8 +90,12 @@ public static _Fields findByThriftId(int fieldId) { return SERVER_PROTOCOL_VERSION; case 3: // SESSION_HANDLE return SESSION_HANDLE; - case 4: // CONFIGURATION - return CONFIGURATION; + case 4: // COMPRESSOR_PARAMETERS + return COMPRESSOR_PARAMETERS; + case 5: // COMPRESSOR_NAME + return COMPRESSOR_NAME; + case 6: // COMPRESSOR_VERSION + return COMPRESSOR_VERSION; default: return null; } @@ -126,7 +136,7 @@ public String getFieldName() { } // isset id assignments - private static final _Fields optionals[] = {_Fields.SESSION_HANDLE,_Fields.CONFIGURATION}; + private static final _Fields optionals[] = {_Fields.SESSION_HANDLE,_Fields.COMPRESSOR_PARAMETERS,_Fields.COMPRESSOR_NAME,_Fields.COMPRESSOR_VERSION}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -136,10 +146,14 @@ public String getFieldName() { new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, TProtocolVersion.class))); tmpMap.put(_Fields.SESSION_HANDLE, new org.apache.thrift.meta_data.FieldMetaData("sessionHandle", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TSessionHandle.class))); - tmpMap.put(_Fields.CONFIGURATION, new org.apache.thrift.meta_data.FieldMetaData("configuration", org.apache.thrift.TFieldRequirementType.OPTIONAL, + tmpMap.put(_Fields.COMPRESSOR_PARAMETERS, new org.apache.thrift.meta_data.FieldMetaData("compressorParameters", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); + tmpMap.put(_Fields.COMPRESSOR_NAME, new org.apache.thrift.meta_data.FieldMetaData("compressorName", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.COMPRESSOR_VERSION, new org.apache.thrift.meta_data.FieldMetaData("compressorVersion", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TOpenSessionResp.class, metaDataMap); } @@ -171,9 +185,15 @@ public TOpenSessionResp(TOpenSessionResp other) { if (other.isSetSessionHandle()) { this.sessionHandle = new TSessionHandle(other.sessionHandle); } - if (other.isSetConfiguration()) { - Map __this__configuration = new HashMap(other.configuration); - this.configuration = __this__configuration; + if (other.isSetCompressorParameters()) { + Map __this__compressorParameters = new HashMap(other.compressorParameters); + this.compressorParameters = __this__compressorParameters; + } + if (other.isSetCompressorName()) { + this.compressorName = other.compressorName; + } + if (other.isSetCompressorVersion()) { + this.compressorVersion = other.compressorVersion; } } @@ -187,7 +207,9 @@ public void clear() { this.serverProtocolVersion = org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V9; this.sessionHandle = null; - this.configuration = null; + this.compressorParameters = null; + this.compressorName = null; + this.compressorVersion = null; } public TStatus getStatus() { @@ -267,37 +289,83 @@ public void setSessionHandleIsSet(boolean value) { } } - public int getConfigurationSize() { - return (this.configuration == null) ? 0 : this.configuration.size(); + public int getCompressorParametersSize() { + return (this.compressorParameters == null) ? 0 : this.compressorParameters.size(); + } + + public void putToCompressorParameters(String key, String val) { + if (this.compressorParameters == null) { + this.compressorParameters = new HashMap(); + } + this.compressorParameters.put(key, val); + } + + public Map getCompressorParameters() { + return this.compressorParameters; + } + + public void setCompressorParameters(Map compressorParameters) { + this.compressorParameters = compressorParameters; + } + + public void unsetCompressorParameters() { + this.compressorParameters = null; + } + + /** Returns true if field compressorParameters is set (has been assigned a value) and false otherwise */ + public boolean isSetCompressorParameters() { + return this.compressorParameters != null; } - public void putToConfiguration(String key, String val) { - if (this.configuration == null) { - this.configuration = new HashMap(); + public void setCompressorParametersIsSet(boolean value) { + if (!value) { + this.compressorParameters = null; } - this.configuration.put(key, val); } - public Map getConfiguration() { - return this.configuration; + public String getCompressorName() { + return this.compressorName; } - public void setConfiguration(Map configuration) { - this.configuration = configuration; + public void setCompressorName(String compressorName) { + this.compressorName = compressorName; } - public void unsetConfiguration() { - this.configuration = null; + public void unsetCompressorName() { + this.compressorName = null; } - /** Returns true if field configuration is set (has been assigned a value) and false otherwise */ - public boolean isSetConfiguration() { - return this.configuration != null; + /** Returns true if field compressorName is set (has been assigned a value) and false otherwise */ + public boolean isSetCompressorName() { + return this.compressorName != null; } - public void setConfigurationIsSet(boolean value) { + public void setCompressorNameIsSet(boolean value) { if (!value) { - this.configuration = null; + this.compressorName = null; + } + } + + public String getCompressorVersion() { + return this.compressorVersion; + } + + public void setCompressorVersion(String compressorVersion) { + this.compressorVersion = compressorVersion; + } + + public void unsetCompressorVersion() { + this.compressorVersion = null; + } + + /** Returns true if field compressorVersion is set (has been assigned a value) and false otherwise */ + public boolean isSetCompressorVersion() { + return this.compressorVersion != null; + } + + public void setCompressorVersionIsSet(boolean value) { + if (!value) { + this.compressorVersion = null; } } @@ -327,11 +395,27 @@ public void setFieldValue(_Fields field, Object value) { } break; - case CONFIGURATION: + case COMPRESSOR_PARAMETERS: + if (value == null) { + unsetCompressorParameters(); + } else { + setCompressorParameters((Map)value); + } + break; + + case COMPRESSOR_NAME: + if (value == null) { + unsetCompressorName(); + } else { + setCompressorName((String)value); + } + break; + + case COMPRESSOR_VERSION: if (value == null) { - unsetConfiguration(); + unsetCompressorVersion(); } else { - setConfiguration((Map)value); + setCompressorVersion((String)value); } break; @@ -349,8 +433,14 @@ public Object getFieldValue(_Fields field) { case SESSION_HANDLE: return getSessionHandle(); - case CONFIGURATION: - return getConfiguration(); + case COMPRESSOR_PARAMETERS: + return getCompressorParameters(); + + case COMPRESSOR_NAME: + return getCompressorName(); + + case COMPRESSOR_VERSION: + return getCompressorVersion(); } throw new IllegalStateException(); @@ -369,8 +459,12 @@ public boolean isSet(_Fields field) { return isSetServerProtocolVersion(); case SESSION_HANDLE: return isSetSessionHandle(); - case CONFIGURATION: - return isSetConfiguration(); + case COMPRESSOR_PARAMETERS: + return isSetCompressorParameters(); + case COMPRESSOR_NAME: + return isSetCompressorName(); + case COMPRESSOR_VERSION: + return isSetCompressorVersion(); } throw new IllegalStateException(); } @@ -415,12 +509,30 @@ public boolean equals(TOpenSessionResp that) { return false; } - boolean this_present_configuration = true && this.isSetConfiguration(); - boolean that_present_configuration = true && that.isSetConfiguration(); - if (this_present_configuration || that_present_configuration) { - if (!(this_present_configuration && that_present_configuration)) + boolean this_present_compressorParameters = true && this.isSetCompressorParameters(); + boolean that_present_compressorParameters = true && that.isSetCompressorParameters(); + if (this_present_compressorParameters || that_present_compressorParameters) { + if (!(this_present_compressorParameters && that_present_compressorParameters)) + return false; + if (!this.compressorParameters.equals(that.compressorParameters)) + return false; + } + + boolean this_present_compressorName = true && this.isSetCompressorName(); + boolean that_present_compressorName = true && that.isSetCompressorName(); + if (this_present_compressorName || that_present_compressorName) { + if (!(this_present_compressorName && that_present_compressorName)) return false; - if (!this.configuration.equals(that.configuration)) + if (!this.compressorName.equals(that.compressorName)) + return false; + } + + boolean this_present_compressorVersion = true && this.isSetCompressorVersion(); + boolean that_present_compressorVersion = true && that.isSetCompressorVersion(); + if (this_present_compressorVersion || that_present_compressorVersion) { + if (!(this_present_compressorVersion && that_present_compressorVersion)) + return false; + if (!this.compressorVersion.equals(that.compressorVersion)) return false; } @@ -446,10 +558,20 @@ public int hashCode() { if (present_sessionHandle) list.add(sessionHandle); - boolean present_configuration = true && (isSetConfiguration()); - list.add(present_configuration); - if (present_configuration) - list.add(configuration); + boolean present_compressorParameters = true && (isSetCompressorParameters()); + list.add(present_compressorParameters); + if (present_compressorParameters) + list.add(compressorParameters); + + boolean present_compressorName = true && (isSetCompressorName()); + list.add(present_compressorName); + if (present_compressorName) + list.add(compressorName); + + boolean present_compressorVersion = true && (isSetCompressorVersion()); + list.add(present_compressorVersion); + if (present_compressorVersion) + list.add(compressorVersion); return list.hashCode(); } @@ -492,12 +614,32 @@ public int compareTo(TOpenSessionResp other) { return lastComparison; } } - lastComparison = Boolean.valueOf(isSetConfiguration()).compareTo(other.isSetConfiguration()); + lastComparison = Boolean.valueOf(isSetCompressorParameters()).compareTo(other.isSetCompressorParameters()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetCompressorParameters()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.compressorParameters, other.compressorParameters); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetCompressorName()).compareTo(other.isSetCompressorName()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetCompressorName()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.compressorName, other.compressorName); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetCompressorVersion()).compareTo(other.isSetCompressorVersion()); if (lastComparison != 0) { return lastComparison; } - if (isSetConfiguration()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.configuration, other.configuration); + if (isSetCompressorVersion()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.compressorVersion, other.compressorVersion); if (lastComparison != 0) { return lastComparison; } @@ -547,13 +689,33 @@ public String toString() { } first = false; } - if (isSetConfiguration()) { + if (isSetCompressorParameters()) { + if (!first) sb.append(", "); + sb.append("compressorParameters:"); + if (this.compressorParameters == null) { + sb.append("null"); + } else { + sb.append(this.compressorParameters); + } + first = false; + } + if (isSetCompressorName()) { if (!first) sb.append(", "); - sb.append("configuration:"); - if (this.configuration == null) { + sb.append("compressorName:"); + if (this.compressorName == null) { sb.append("null"); } else { - sb.append(this.configuration); + sb.append(this.compressorName); + } + first = false; + } + if (isSetCompressorVersion()) { + if (!first) sb.append(", "); + sb.append("compressorVersion:"); + if (this.compressorVersion == null) { + sb.append("null"); + } else { + sb.append(this.compressorVersion); } first = false; } @@ -640,22 +802,38 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TOpenSessionResp st org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 4: // CONFIGURATION + case 4: // COMPRESSOR_PARAMETERS if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { org.apache.thrift.protocol.TMap _map152 = iprot.readMapBegin(); - struct.configuration = new HashMap(2*_map152.size); + struct.compressorParameters = new HashMap(2*_map152.size); String _key153; String _val154; for (int _i155 = 0; _i155 < _map152.size; ++_i155) { _key153 = iprot.readString(); _val154 = iprot.readString(); - struct.configuration.put(_key153, _val154); + struct.compressorParameters.put(_key153, _val154); } iprot.readMapEnd(); } - struct.setConfigurationIsSet(true); + struct.setCompressorParametersIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 5: // COMPRESSOR_NAME + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.compressorName = iprot.readString(); + struct.setCompressorNameIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 6: // COMPRESSOR_VERSION + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.compressorVersion = iprot.readString(); + struct.setCompressorVersionIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -690,12 +868,12 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TOpenSessionResp s oprot.writeFieldEnd(); } } - if (struct.configuration != null) { - if (struct.isSetConfiguration()) { - oprot.writeFieldBegin(CONFIGURATION_FIELD_DESC); + if (struct.compressorParameters != null) { + if (struct.isSetCompressorParameters()) { + oprot.writeFieldBegin(COMPRESSOR_PARAMETERS_FIELD_DESC); { - oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.configuration.size())); - for (Map.Entry _iter156 : struct.configuration.entrySet()) + oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.compressorParameters.size())); + for (Map.Entry _iter156 : struct.compressorParameters.entrySet()) { oprot.writeString(_iter156.getKey()); oprot.writeString(_iter156.getValue()); @@ -705,6 +883,20 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TOpenSessionResp s oprot.writeFieldEnd(); } } + if (struct.compressorName != null) { + if (struct.isSetCompressorName()) { + oprot.writeFieldBegin(COMPRESSOR_NAME_FIELD_DESC); + oprot.writeString(struct.compressorName); + oprot.writeFieldEnd(); + } + } + if (struct.compressorVersion != null) { + if (struct.isSetCompressorVersion()) { + oprot.writeFieldBegin(COMPRESSOR_VERSION_FIELD_DESC); + oprot.writeString(struct.compressorVersion); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -728,23 +920,35 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TOpenSessionResp st if (struct.isSetSessionHandle()) { optionals.set(0); } - if (struct.isSetConfiguration()) { + if (struct.isSetCompressorParameters()) { optionals.set(1); } - oprot.writeBitSet(optionals, 2); + if (struct.isSetCompressorName()) { + optionals.set(2); + } + if (struct.isSetCompressorVersion()) { + optionals.set(3); + } + oprot.writeBitSet(optionals, 4); if (struct.isSetSessionHandle()) { struct.sessionHandle.write(oprot); } - if (struct.isSetConfiguration()) { + if (struct.isSetCompressorParameters()) { { - oprot.writeI32(struct.configuration.size()); - for (Map.Entry _iter157 : struct.configuration.entrySet()) + oprot.writeI32(struct.compressorParameters.size()); + for (Map.Entry _iter157 : struct.compressorParameters.entrySet()) { oprot.writeString(_iter157.getKey()); oprot.writeString(_iter157.getValue()); } } } + if (struct.isSetCompressorName()) { + oprot.writeString(struct.compressorName); + } + if (struct.isSetCompressorVersion()) { + oprot.writeString(struct.compressorVersion); + } } @Override @@ -755,7 +959,7 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TOpenSessionResp str struct.setStatusIsSet(true); struct.serverProtocolVersion = org.apache.hive.service.rpc.thrift.TProtocolVersion.findByValue(iprot.readI32()); struct.setServerProtocolVersionIsSet(true); - BitSet incoming = iprot.readBitSet(2); + BitSet incoming = iprot.readBitSet(4); if (incoming.get(0)) { struct.sessionHandle = new TSessionHandle(); struct.sessionHandle.read(iprot); @@ -764,17 +968,25 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TOpenSessionResp str if (incoming.get(1)) { { org.apache.thrift.protocol.TMap _map158 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); - struct.configuration = new HashMap(2*_map158.size); + struct.compressorParameters = new HashMap(2*_map158.size); String _key159; String _val160; for (int _i161 = 0; _i161 < _map158.size; ++_i161) { _key159 = iprot.readString(); _val160 = iprot.readString(); - struct.configuration.put(_key159, _val160); + struct.compressorParameters.put(_key159, _val160); } } - struct.setConfigurationIsSet(true); + struct.setCompressorParametersIsSet(true); + } + if (incoming.get(2)) { + struct.compressorName = iprot.readString(); + struct.setCompressorNameIsSet(true); + } + if (incoming.get(3)) { + struct.compressorVersion = iprot.readString(); + struct.setCompressorVersionIsSet(true); } } } diff --git a/service-rpc/src/gen/thrift/gen-php/Types.php b/service-rpc/src/gen/thrift/gen-php/Types.php index 786c773..31c7415 100644 --- a/service-rpc/src/gen/thrift/gen-php/Types.php +++ b/service-rpc/src/gen/thrift/gen-php/Types.php @@ -4700,7 +4700,15 @@ class TOpenSessionResp { /** * @var array */ - public $configuration = null; + public $compressorParameters = null; + /** + * @var string + */ + public $compressorName = null; + /** + * @var string + */ + public $compressorVersion = null; public function __construct($vals=null) { if (!isset(self::$_TSPEC)) { @@ -4720,7 +4728,7 @@ class TOpenSessionResp { 'class' => '\TSessionHandle', ), 4 => array( - 'var' => 'configuration', + 'var' => 'compressorParameters', 'type' => TType::MAP, 'ktype' => TType::STRING, 'vtype' => TType::STRING, @@ -4731,6 +4739,14 @@ class TOpenSessionResp { 'type' => TType::STRING, ), ), + 5 => array( + 'var' => 'compressorName', + 'type' => TType::STRING, + ), + 6 => array( + 'var' => 'compressorVersion', + 'type' => TType::STRING, + ), ); } if (is_array($vals)) { @@ -4743,8 +4759,14 @@ class TOpenSessionResp { if (isset($vals['sessionHandle'])) { $this->sessionHandle = $vals['sessionHandle']; } - if (isset($vals['configuration'])) { - $this->configuration = $vals['configuration']; + if (isset($vals['compressorParameters'])) { + $this->compressorParameters = $vals['compressorParameters']; + } + if (isset($vals['compressorName'])) { + $this->compressorName = $vals['compressorName']; + } + if (isset($vals['compressorVersion'])) { + $this->compressorVersion = $vals['compressorVersion']; } } } @@ -4793,7 +4815,7 @@ class TOpenSessionResp { break; case 4: if ($ftype == TType::MAP) { - $this->configuration = array(); + $this->compressorParameters = array(); $_size134 = 0; $_ktype135 = 0; $_vtype136 = 0; @@ -4804,13 +4826,27 @@ class TOpenSessionResp { $val140 = ''; $xfer += $input->readString($key139); $xfer += $input->readString($val140); - $this->configuration[$key139] = $val140; + $this->compressorParameters[$key139] = $val140; } $xfer += $input->readMapEnd(); } else { $xfer += $input->skip($ftype); } break; + case 5: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->compressorName); + } else { + $xfer += $input->skip($ftype); + } + break; + case 6: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->compressorVersion); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -4845,15 +4881,15 @@ class TOpenSessionResp { $xfer += $this->sessionHandle->write($output); $xfer += $output->writeFieldEnd(); } - if ($this->configuration !== null) { - if (!is_array($this->configuration)) { + if ($this->compressorParameters !== null) { + if (!is_array($this->compressorParameters)) { throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA); } - $xfer += $output->writeFieldBegin('configuration', TType::MAP, 4); + $xfer += $output->writeFieldBegin('compressorParameters', TType::MAP, 4); { - $output->writeMapBegin(TType::STRING, TType::STRING, count($this->configuration)); + $output->writeMapBegin(TType::STRING, TType::STRING, count($this->compressorParameters)); { - foreach ($this->configuration as $kiter141 => $viter142) + foreach ($this->compressorParameters as $kiter141 => $viter142) { $xfer += $output->writeString($kiter141); $xfer += $output->writeString($viter142); @@ -4863,6 +4899,16 @@ class TOpenSessionResp { } $xfer += $output->writeFieldEnd(); } + if ($this->compressorName !== null) { + $xfer += $output->writeFieldBegin('compressorName', TType::STRING, 5); + $xfer += $output->writeString($this->compressorName); + $xfer += $output->writeFieldEnd(); + } + if ($this->compressorVersion !== null) { + $xfer += $output->writeFieldBegin('compressorVersion', TType::STRING, 6); + $xfer += $output->writeString($this->compressorVersion); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py index fdf6b1f..2d039cc 100644 --- a/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py +++ b/service-rpc/src/gen/thrift/gen-py/TCLIService/ttypes.py @@ -3614,7 +3614,9 @@ class TOpenSessionResp: - status - serverProtocolVersion - sessionHandle - - configuration + - compressorParameters + - compressorName + - compressorVersion """ thrift_spec = ( @@ -3622,14 +3624,18 @@ class TOpenSessionResp: (1, TType.STRUCT, 'status', (TStatus, TStatus.thrift_spec), None, ), # 1 (2, TType.I32, 'serverProtocolVersion', None, 8, ), # 2 (3, TType.STRUCT, 'sessionHandle', (TSessionHandle, TSessionHandle.thrift_spec), None, ), # 3 - (4, TType.MAP, 'configuration', (TType.STRING,None,TType.STRING,None), None, ), # 4 + (4, TType.MAP, 'compressorParameters', (TType.STRING,None,TType.STRING,None), None, ), # 4 + (5, TType.STRING, 'compressorName', None, None, ), # 5 + (6, TType.STRING, 'compressorVersion', None, None, ), # 6 ) - def __init__(self, status=None, serverProtocolVersion=thrift_spec[2][4], sessionHandle=None, configuration=None,): + def __init__(self, status=None, serverProtocolVersion=thrift_spec[2][4], sessionHandle=None, compressorParameters=None, compressorName=None, compressorVersion=None,): self.status = status self.serverProtocolVersion = serverProtocolVersion self.sessionHandle = sessionHandle - self.configuration = configuration + self.compressorParameters = compressorParameters + self.compressorName = compressorName + self.compressorVersion = compressorVersion def read(self, iprot): if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: @@ -3659,15 +3665,25 @@ def read(self, iprot): iprot.skip(ftype) elif fid == 4: if ftype == TType.MAP: - self.configuration = {} + self.compressorParameters = {} (_ktype135, _vtype136, _size134 ) = iprot.readMapBegin() for _i138 in xrange(_size134): _key139 = iprot.readString() _val140 = iprot.readString() - self.configuration[_key139] = _val140 + self.compressorParameters[_key139] = _val140 iprot.readMapEnd() else: iprot.skip(ftype) + elif fid == 5: + if ftype == TType.STRING: + self.compressorName = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.STRING: + self.compressorVersion = iprot.readString() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -3690,14 +3706,22 @@ def write(self, oprot): oprot.writeFieldBegin('sessionHandle', TType.STRUCT, 3) self.sessionHandle.write(oprot) oprot.writeFieldEnd() - if self.configuration is not None: - oprot.writeFieldBegin('configuration', TType.MAP, 4) - oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.configuration)) - for kiter141,viter142 in self.configuration.items(): + if self.compressorParameters is not None: + oprot.writeFieldBegin('compressorParameters', TType.MAP, 4) + oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.compressorParameters)) + for kiter141,viter142 in self.compressorParameters.items(): oprot.writeString(kiter141) oprot.writeString(viter142) oprot.writeMapEnd() oprot.writeFieldEnd() + if self.compressorName is not None: + oprot.writeFieldBegin('compressorName', TType.STRING, 5) + oprot.writeString(self.compressorName) + oprot.writeFieldEnd() + if self.compressorVersion is not None: + oprot.writeFieldBegin('compressorVersion', TType.STRING, 6) + oprot.writeString(self.compressorVersion) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -3714,7 +3738,9 @@ def __hash__(self): value = (value * 31) ^ hash(self.status) value = (value * 31) ^ hash(self.serverProtocolVersion) value = (value * 31) ^ hash(self.sessionHandle) - value = (value * 31) ^ hash(self.configuration) + value = (value * 31) ^ hash(self.compressorParameters) + value = (value * 31) ^ hash(self.compressorName) + value = (value * 31) ^ hash(self.compressorVersion) return value def __repr__(self): diff --git a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb index 4b1854c..9f035c8 100644 --- a/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb +++ b/service-rpc/src/gen/thrift/gen-rb/t_c_l_i_service_types.rb @@ -979,13 +979,17 @@ class TOpenSessionResp STATUS = 1 SERVERPROTOCOLVERSION = 2 SESSIONHANDLE = 3 - CONFIGURATION = 4 + COMPRESSORPARAMETERS = 4 + COMPRESSORNAME = 5 + COMPRESSORVERSION = 6 FIELDS = { STATUS => {:type => ::Thrift::Types::STRUCT, :name => 'status', :class => ::TStatus}, SERVERPROTOCOLVERSION => {:type => ::Thrift::Types::I32, :name => 'serverProtocolVersion', :default => 8, :enum_class => ::TProtocolVersion}, SESSIONHANDLE => {:type => ::Thrift::Types::STRUCT, :name => 'sessionHandle', :class => ::TSessionHandle, :optional => true}, - CONFIGURATION => {:type => ::Thrift::Types::MAP, :name => 'configuration', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true} + COMPRESSORPARAMETERS => {:type => ::Thrift::Types::MAP, :name => 'compressorParameters', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRING}, :optional => true}, + COMPRESSORNAME => {:type => ::Thrift::Types::STRING, :name => 'compressorName', :optional => true}, + COMPRESSORVERSION => {:type => ::Thrift::Types::STRING, :name => 'compressorVersion', :optional => true} } def struct_fields; FIELDS; end diff --git a/service/pom.xml b/service/pom.xml index 9306739..73697c3 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -67,6 +67,11 @@ ${commons-cli.version} + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + net.sf.jpam jpam ${jpam.version} diff --git a/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java b/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java index 9cbe89c..8a1b888 100644 --- a/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java +++ b/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java @@ -19,12 +19,15 @@ package org.apache.hive.service.cli; import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer; +import org.apache.hadoop.hive.serde2.compression.CompDe; import org.apache.hadoop.hive.serde2.thrift.Type; import org.apache.hive.service.rpc.thrift.TColumn; import org.apache.hive.service.rpc.thrift.TRow; @@ -58,27 +61,38 @@ public ColumnBasedSet(TableSchema schema) { } } - public ColumnBasedSet(TRowSet tRowSet) throws TException { + public ColumnBasedSet(TRowSet tRowSet, CompDe compde) throws Exception { descriptors = null; - columns = new ArrayList(); - // Use TCompactProtocol to read serialized TColumns if (tRowSet.isSetBinaryColumns()) { - TProtocol protocol = - new TCompactProtocol(new TIOStreamTransport(new ByteArrayInputStream( - tRowSet.getBinaryColumns()))); - // Read from the stream using the protocol for each column in final schema - for (int i = 0; i < tRowSet.getColumnCount(); i++) { - TColumn tvalue = new TColumn(); - try { - tvalue.read(protocol); - } catch (TException e) { - LOG.error(e.getMessage(), e); - throw new TException("Error reading column value from the row set blob", e); + // Use TCompactProtocol to read serialized TColumns + + if (compde != null) { + TProtocol protocol = + new TCompactProtocol(new TIOStreamTransport(new ByteArrayInputStream( + tRowSet.getBinaryColumns()))); + ByteBuffer compressedBytes = protocol.readBinary(); + columns = Arrays.asList(compde.decompress(compressedBytes, compressedBytes.limit())); + } + else { + columns = new ArrayList(); + TProtocol protocol = + new TCompactProtocol(new TIOStreamTransport(new ByteArrayInputStream( + tRowSet.getBinaryColumns()))); + // Read from the stream using the protocol for each column in final schema + for (int i = 0; i < tRowSet.getColumnCount(); i++) { + TColumn tvalue = new TColumn(); + try { + tvalue.read(protocol); + } catch (TException e) { + LOG.error(e.getMessage(), e); + throw new TException("Error reading column value from the row set blob", e); + } + columns.add(new ColumnBuffer(tvalue)); } - columns.add(new ColumnBuffer(tvalue)); } } else { + columns = new ArrayList(); if (tRowSet.getColumns() != null) { for (TColumn tvalue : tRowSet.getColumns()) { columns.add(new ColumnBuffer(tvalue)); diff --git a/service/src/java/org/apache/hive/service/cli/RowSetFactory.java b/service/src/java/org/apache/hive/service/cli/RowSetFactory.java index d9be6a0..ebc4951 100644 --- a/service/src/java/org/apache/hive/service/cli/RowSetFactory.java +++ b/service/src/java/org/apache/hive/service/cli/RowSetFactory.java @@ -20,10 +20,11 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion; import org.apache.hive.service.rpc.thrift.TRowSet; -import org.apache.thrift.TException; import static org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6; +import org.apache.hadoop.hive.serde2.compression.CompDe; + public class RowSetFactory { // This call is accessed from server side @@ -35,10 +36,10 @@ public static RowSet create(TableSchema schema, TProtocolVersion version, boolea } // This call is accessed from client (jdbc) side - public static RowSet create(TRowSet results, TProtocolVersion version) throws TException { - if (version.getValue() >= HIVE_CLI_SERVICE_PROTOCOL_V6.getValue()) { - return new ColumnBasedSet(results); - } - return new RowBasedSet(results); + public static RowSet create(TRowSet results, TProtocolVersion version, CompDe compde) throws Exception { + if (version.getValue() >= HIVE_CLI_SERVICE_PROTOCOL_V6.getValue()) { + return new ColumnBasedSet(results, compde); + } + return new RowBasedSet(results); } } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 2938338..355b3ae 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -21,8 +21,11 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -30,6 +33,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.serde2.compression.CompDe; +import org.apache.hadoop.hive.serde2.compression.CompDeServiceLoader; +import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; import org.apache.hadoop.hive.shims.ShimLoader; @@ -309,10 +316,41 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { LOG.info("Client protocol version: " + req.getClient_protocol()); TOpenSessionResp resp = new TOpenSessionResp(); try { + if (req.isSetConfiguration()) { + Map reqConfMap = req.getConfiguration(); + + reqConfMap.remove( + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR.varname); + if (reqConfMap.containsKey("set:hiveconf:" + + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST.varname)) + { + HiveConf clientConf = new HiveConf(); + for (Entry entry : reqConfMap.entrySet()) { + clientConf.set( + entry.getKey().replace("set:hiveconf:", ""), + entry.getValue()); + } + ImmutableTriple> compdeNameVersionParams = + negotiateCompde(hiveConf, clientConf); + if (null != compdeNameVersionParams) { + // Set the response for the client. + resp.setCompressorName(compdeNameVersionParams.left); + resp.setCompressorVersion(compdeNameVersionParams.middle); + resp.setCompressorParameters(compdeNameVersionParams.right); + // SessionState is initialized based on TOpenSessionRequest. + reqConfMap.put( + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR.varname, + compdeNameVersionParams.left); + reqConfMap.putAll(compdeNameVersionParams.right); + reqConfMap.put( + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_VERSION.varname, + compdeNameVersionParams.middle); + } + } + } + SessionHandle sessionHandle = getSessionHandle(req, resp); resp.setSessionHandle(sessionHandle.toTSessionHandle()); - // TODO: set real configuration map - resp.setConfiguration(new HashMap()); resp.setStatus(OK_STATUS); ThriftCLIServerContext context = (ThriftCLIServerContext)currentServerContext.get(); @@ -326,6 +364,119 @@ public TOpenSessionResp OpenSession(TOpenSessionReq req) throws TException { return resp; } + /** + * Find the first CompDe in in the server's list that the client supports and + * can be initialized on the server. + * @param serverConf + * @param clientConf + * @return the name and parameter map of the negotiated plug-in. + */ + private ImmutableTriple> negotiateCompde( + HiveConf serverConf, + HiveConf clientConf) { + List serverCompdes = Arrays.asList( + HiveConf.getTrimmedStringsVar( + serverConf, + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST)); + List clientCompdes = Arrays.asList( + HiveConf.getTrimmedStringsVar( + clientConf, + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST)); + + // CompDe negotiation + for (String compdeName : serverCompdes) { + if (clientCompdes.contains(compdeName)) { + + try { + String clientCompdeVersion = + getVersionForCompde(clientConf, compdeName); + + Map compdeResponse = + initCompde( + compdeName, clientCompdeVersion, + serverConf, clientConf); + + return ImmutableTriple.of( + compdeName, clientCompdeVersion, compdeResponse); + } catch (Exception e) { + continue; + } + } + } + return null; + } + + /** + * Get the parameters for the specified CompDe plug-in from the HiveConf. + * @param hiveConf + * @param compdeName + * @return a map of plug-in parameters. + */ + private static Map getParamsForCompde( + HiveConf hiveConf, + String compdeName) { + String pattern = String.format( + "%s\\.%s\\.[\\w|\\d]+", + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR, + compdeName); + return hiveConf.getValByRegex(pattern); + } + + /** + * Get the client-requested version of the specified CompDe plug-in. + * @param hiveConf + * @param compdeName + * @return the version string. + */ + private static String getVersionForCompde( + HiveConf hiveConf, + String compdeName) { + String versionKey = String.format( + "%s.%s.%s", + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR, + compdeName, "version"); + return hiveConf.get(versionKey); + } + + /** + * Initialize a CompDe plug-in with requested parameters from the server and + * the client. + * @param compdeName + * @param serverParams + * @param clientParams + * @return The configuration map as finalized by the plug-in or null if the + * plug-in cannot be initialized with the given parameters. + */ + protected Map initCompde( + String compdeName, + String version, + HiveConf serverConf, + HiveConf clientConf) + throws Exception { + Map serverCompdeParams = + getParamsForCompde(serverConf, compdeName); + Map clientCompdeParams = + getParamsForCompde(clientConf, compdeName); + try { + CompDe compDe = CompDeServiceLoader.getInstance() + .getCompde(compdeName, version); + Map pluginParams = + compDe.getParams(serverCompdeParams, clientCompdeParams); + compDe.init(pluginParams); + LOG.info(String.format( + "Initialized CompDe plugin for %s %s", + compdeName, version)); + LOG.debug(compdeName + " params: " + pluginParams.toString()); + return pluginParams; + } + catch (Exception e) { + LOG.debug(String.format( + "Failed to initialize CompDe plugin for %s %s", + compdeName, version)); + throw e; + } + } + private String getIpAddress() { String clientIpAddress; // Http transport mode. diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java index 9805641..5c548f9 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java @@ -449,7 +449,7 @@ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientatio req.setFetchType(fetchType.toTFetchType()); TFetchResultsResp resp = cliService.FetchResults(req); checkStatus(resp.getStatus()); - return RowSetFactory.create(resp.getResults(), opHandle.getProtocolVersion()); + return RowSetFactory.create(resp.getResults(), opHandle.getProtocolVersion(), null); } catch (HiveSQLException e) { throw e; } catch (Exception e) { diff --git a/service/src/test/org/apache/hive/service/cli/compression/TestCompDeNegotiation.java b/service/src/test/org/apache/hive/service/cli/compression/TestCompDeNegotiation.java new file mode 100644 index 0000000..c3a211f --- /dev/null +++ b/service/src/test/org/apache/hive/service/cli/compression/TestCompDeNegotiation.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.service.cli.compression; + +import static org.junit.Assert.*; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hive.service.auth.HiveAuthFactory; +import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.thrift.EmbeddedThriftBinaryCLIService; +import org.apache.hive.service.cli.thrift.ThriftCLIService; +import org.apache.hive.service.rpc.thrift.TOpenSessionReq; +import org.apache.hive.service.rpc.thrift.TOpenSessionResp; +import org.apache.thrift.TException; +import org.junit.Before; +import org.junit.Test; + +public class TestCompDeNegotiation { + private HiveConf noCompDes; + private HiveConf serverSingleCompDe; + private HiveConf serverMultiCompDes1; + private HiveConf serverMultiCompDes2; + private HiveConf clientSingleCompDe; + private HiveConf clientMultiCompDes1; + private HiveConf clientMultiCompDes2; + private HiveConf serverCompDeConf; + private HiveConf clientCompDeConf; + + @Before + public void init() throws Exception { + HiveConf baseConf = new HiveConf(); + baseConf.setVar(ConfVars.HIVE_SERVER2_AUTHENTICATION, HiveAuthFactory.AuthTypes.NONE.toString()); + baseConf.setVar(ConfVars.HIVE_AUTHORIZATION_MANAGER, + "org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider"); + baseConf.setBoolean("datanucleus.schema.autoCreateTables", true); + + // Set the compressor lists and compressor configs used for negotiation + + noCompDes = new HiveConf(baseConf); + + clientSingleCompDe = new HiveConf(baseConf); + clientSingleCompDe.set(clientCompressorListVarName(), "compde3"); + clientSingleCompDe.set(clientCompdeParamPrefix("compde3") + ".version", "1.0"); + serverSingleCompDe = new HiveConf(baseConf); + serverSingleCompDe.setVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST, "compde3"); + + clientMultiCompDes1 = new HiveConf(baseConf); + clientMultiCompDes1.set(clientCompressorListVarName(), "compde1,compde2,compde3,compde4"); + clientMultiCompDes1.set(clientCompdeParamPrefix("compde1") + ".version", "1.0"); + clientMultiCompDes1.set(clientCompdeParamPrefix("compde2") + ".version", "1.0"); + clientMultiCompDes1.set(clientCompdeParamPrefix("compde3") + ".version", "1.0"); + clientMultiCompDes1.set(clientCompdeParamPrefix("compde4") + ".version", "1.0"); + serverMultiCompDes1 = new HiveConf(baseConf); + serverMultiCompDes1.setVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST, "compde1,compde2,compde3,compde4"); + + clientMultiCompDes2 = new HiveConf(baseConf); + clientMultiCompDes2.set(clientCompressorListVarName(), "compde2, compde4"); + clientMultiCompDes2.set(clientCompdeParamPrefix("compde2") + ".version", "1.0"); + clientMultiCompDes2.set(clientCompdeParamPrefix("compde4") + ".version", "1.0"); + serverMultiCompDes2 = new HiveConf(baseConf); + serverMultiCompDes2.setVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST, "compde2, compde4"); + + serverCompDeConf = new HiveConf(baseConf); + serverCompDeConf.setVar(ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST, "compde3"); + serverCompDeConf.set(noCompDeConfigPrefix("compde3") + ".test1", "serverVal1"); + serverCompDeConf.set(noCompDeConfigPrefix("compde3") + ".test2", "serverVal2");//overriden by client + serverCompDeConf.set(noCompDeConfigPrefix("compde3") + ".test4", "serverVal4");//overriden by plug-in + + clientCompDeConf = new HiveConf(baseConf); + clientCompDeConf.set(clientCompressorListVarName(), "compde3"); + clientCompDeConf.set(clientCompdeParamPrefix("compde3") + ".version", "1.0"); + clientCompDeConf.set(clientCompdeParamPrefix("compde3") + ".test2", "clientVal2");//overrides server + clientCompDeConf.set(clientCompdeParamPrefix("compde3") + ".test3", "clientVal3"); + clientCompDeConf.set(clientCompdeParamPrefix("compde3") + ".test5", "clientVal5");//overriden by plug-in + } + + private static String noCompDeConfigPrefix(String compDeName) { + return ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR.varname + "." + compDeName; + } + // The JDBC driver prefixes all configuration names before sending the request and the server expects these prefixes + private static String clientCompressorListVarName() { + return "set:hiveconf:" + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR_LIST.varname; + } + private static String clientCompdeParamPrefix(String compDeName) { + return "set:hiveconf:" + ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_COMPRESSOR.varname + "." + compDeName; + } + + public class MockServiceWithoutCompDes extends EmbeddedThriftBinaryCLIService { + @Override + // Pretend that we have no CompDe plug-ins + protected Map initCompde( + String compdeName, + String version, + HiveConf serverConf, + HiveConf clientConf) throws Exception { + throw new Exception("No supported compdes"); + } + } + + @Test + // The server has no CompDe plug-ins + public void testServerWithoutCompDePlugins() throws HiveSQLException, InterruptedException, TException { + ThriftCLIService service = new MockServiceWithoutCompDes(); + service.init(noCompDes); + + TOpenSessionReq req = new TOpenSessionReq(); + req.setConfiguration(new HashMap()); + TOpenSessionResp resp; + + req.setConfiguration(noCompDes.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + req.setConfiguration(clientSingleCompDe.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + req.setConfiguration(clientMultiCompDes2.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + service.stop(); + } + + public class MockServiceWithCompDes extends EmbeddedThriftBinaryCLIService { + @Override + // Pretend that we have plug-ins for all CompDes except "compde1" + protected Map initCompde( + String compdeName, + String version, + HiveConf serverConf, + HiveConf clientConf) throws Exception { + if (compdeName.equals("compde1")) { + throw new Exception("compde1 not supported"); + } + else { + return serverConf.getValByRegex(".*"); + } + } + } + + @Test + // The server has plug-ins but the CompDe list is not configured + public void testServerWithoutCompDeInList() throws HiveSQLException, InterruptedException, TException { + ThriftCLIService service = new MockServiceWithCompDes(); + service.init(noCompDes); + + TOpenSessionReq req = new TOpenSessionReq(); + req.setConfiguration(new HashMap()); + TOpenSessionResp resp; + + req.setConfiguration(noCompDes.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + req.setConfiguration(clientSingleCompDe.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + req.setConfiguration(clientMultiCompDes2.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + service.stop(); + } + + @Test + public void testServerWithSingleCompDeInList() throws HiveSQLException, InterruptedException, TException { + ThriftCLIService service = new MockServiceWithCompDes(); + service.init(serverSingleCompDe); + + TOpenSessionReq req = new TOpenSessionReq(); + req.setConfiguration(new HashMap()); + TOpenSessionResp resp; + + req.setConfiguration(noCompDes.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + req.setConfiguration(clientSingleCompDe.getValByRegex(".*")); + resp = service.OpenSession(req); + assertEquals("compde3", resp.getCompressorName()); + + req.setConfiguration(clientMultiCompDes2.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + service.stop(); + } + + @Test + public void testServerWithMultiCompDesInList() throws HiveSQLException, InterruptedException, TException { + ThriftCLIService service = new MockServiceWithCompDes(); + service.init(serverMultiCompDes1); + + TOpenSessionReq req = new TOpenSessionReq(); + req.setConfiguration(new HashMap()); + TOpenSessionResp resp; + + req.setConfiguration(noCompDes.getValByRegex(".*")); + resp = service.OpenSession(req); + assertNull(resp.getCompressorName()); + + req.setConfiguration(clientSingleCompDe.getValByRegex(".*")); + resp = service.OpenSession(req); + assertEquals("compde3", resp.getCompressorName()); + + req.setConfiguration(clientMultiCompDes1.getValByRegex(".*")); + resp = service.OpenSession(req); + // "compde1" fails to initialize because our mock service does not have that plugin + assertEquals("compde2", resp.getCompressorName()); + + req.setConfiguration(clientMultiCompDes2.getValByRegex(".*")); + resp = service.OpenSession(req); + assertEquals("compde2", resp.getCompressorName()); + + service.stop(); + } + + public class MockWithCompDeConfig extends EmbeddedThriftBinaryCLIService { + @Override + // Mock a plug-in with an `init` function. + protected Map initCompde( + String compdeName, + String version, + HiveConf serverConf, + HiveConf clientConf) { + Map finalParams = serverConf.getValByRegex(noCompDeConfigPrefix(compdeName) + ".*"); + finalParams.putAll(clientConf.getValByRegex(noCompDeConfigPrefix(compdeName) + ".*")); + finalParams.put(noCompDeConfigPrefix("compde3") + ".test4", "compDeVal4");//overrides server + finalParams.put(noCompDeConfigPrefix("compde3") + ".test5", "compDeVal5");//overrides client + finalParams.put(noCompDeConfigPrefix("compde3") + ".test6", "compDeVal6"); + return finalParams; + } + } + + @Test + // Ensure that the server allows the plug-in to combine the server's default + // CompDe parameters with the client overrides and returns the final + // configuration. + public void testConfig() throws TException { + Map expectedConf = new HashMap(); + expectedConf.put(noCompDeConfigPrefix("compde3") + ".version", "1.0"); + expectedConf.put(noCompDeConfigPrefix("compde3") + ".test1", "serverVal1"); + expectedConf.put(noCompDeConfigPrefix("compde3") + ".test2", "clientVal2"); + expectedConf.put(noCompDeConfigPrefix("compde3") + ".test3", "clientVal3"); + expectedConf.put(noCompDeConfigPrefix("compde3") + ".test4", "compDeVal4"); + expectedConf.put(noCompDeConfigPrefix("compde3") + ".test5", "compDeVal5"); + expectedConf.put(noCompDeConfigPrefix("compde3") + ".test6", "compDeVal6"); + + ThriftCLIService service = new MockWithCompDeConfig(); + service.init(serverCompDeConf); + + TOpenSessionReq req = new TOpenSessionReq(); + req.setConfiguration(clientCompDeConf.getValByRegex(".*compressor.*")); + + TOpenSessionResp resp = service.OpenSession(req); + assertEquals("compde3", resp.getCompressorName()); + assertEquals(expectedConf, resp.getCompressorParameters()); + } +} -- 1.8.3.4