diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 0533ae8..5bc052f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -225,7 +225,7 @@ public static Schema getSchema(BaseSemanticAnalyzer sem, HiveConf conf) { String tableName = "result"; List lst = null; try { - lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer()); + lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(conf)); } catch (Exception e) { LOG.warn("Error getting schema: " + org.apache.hadoop.util.StringUtils.stringifyException(e)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index cd017d8..2569b1b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -176,6 +176,7 @@ import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.hive.serde2.crypto.HiveSerdeKeyManagement; import org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -3795,6 +3796,12 @@ private int createTable(Hive db, CreateTableDesc crtTbl) throws HiveException { Table tbl = db.newTable(crtTbl.getTableName()); if (crtTbl.getTblProps() != null) { + //set up table for properties if encryption is needed + try { + HiveSerdeKeyManagement.setupTableForEncryption(conf, crtTbl.getTblProps()); + } catch (Exception e) { + throw new HiveException(e); + } tbl.getTTable().getParameters().putAll(crtTbl.getTblProps()); } @@ -3971,6 +3978,12 @@ private int createTableLike(Hive db, CreateTableLikeDesc crtTbl) throws HiveExce tbl=db.newTable(targetTableName); if (crtTbl.getTblProps() != null) { + //set up table for properties if encryption is needed + try { + HiveSerdeKeyManagement.setupTableForEncryption(conf, crtTbl.getTblProps()); + } catch (Exception e) { + throw new HiveException(e); + } tbl.getTTable().getParameters().putAll(crtTbl.getTblProps()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 1dde78e..84ff363 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -291,7 +291,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { statsFromRecordWriter = new boolean[numFiles]; serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance(); - serializer.initialize(null, conf.getTableInfo().getProperties()); + serializer.initialize(hconf, conf.getTableInfo().getProperties()); outputClass = serializer.getSerializedClass(); // Timeout is chosen to make sure that even if one iteration takes more than diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index e6b4422..9a40380 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -5948,7 +5948,7 @@ private Operator genFileSinkPlan(String dest, QB qb, Operator input) try { StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc - .getDeserializer().getObjectInspector(); + .getDeserializer(conf).getObjectInspector(); List fields = rowObjectInspector .getAllStructFieldRefs(); for (int i = 0; i < fields.size(); i++) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java index 39f1793..1e48002 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; @@ -79,8 +80,12 @@ public TableDesc( * Return a deserializer object corresponding to the tableDesc. */ public Deserializer getDeserializer() throws Exception { + return getDeserializer(null); + } + + public Deserializer getDeserializer(Configuration conf) throws Exception { Deserializer de = getDeserializerClass().newInstance(); - SerDeUtils.initializeSerDe(de, null, properties, null); + SerDeUtils.initializeSerDe(de, conf, properties, null); return de; } diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/AbstractFieldRewriter.java b/serde/src/java/org/apache/hadoop/hive/serde2/AbstractFieldRewriter.java index d6a0fcf..b1799fa 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/AbstractFieldRewriter.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/AbstractFieldRewriter.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.serde2; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import java.io.IOException; @@ -27,8 +28,8 @@ public class AbstractFieldRewriter implements FieldRewriter { @Override - public void init(List columnNames, List columnTypes, Properties properties) - throws IOException { + public void init(List columnNames, List columnTypes, Properties properties, + Configuration conf) throws IOException { } @Override diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/FieldRewriter.java b/serde/src/java/org/apache/hadoop/hive/serde2/FieldRewriter.java index e75bf6b..92a5542 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/FieldRewriter.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/FieldRewriter.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.serde2; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import java.io.IOException; @@ -32,8 +33,8 @@ */ public interface FieldRewriter { - void init(List columnNames, List columnTypes, Properties properties) - throws IOException; + void init(List columnNames, List columnTypes, Properties properties, + Configuration conf) throws IOException; void encode(int index, ByteStream.Input input, ByteStream.Output output) throws IOException; diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/crypto/CryptoConstants.java b/serde/src/java/org/apache/hadoop/hive/serde2/crypto/CryptoConstants.java new file mode 100644 index 0000000..22f13b5 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/crypto/CryptoConstants.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.crypto; + +import org.apache.hadoop.crypto.key.KeyProviderFactory; + +public class CryptoConstants { + // The length of key + public static final int KEY_LENGTH = 128; + + // The length of IV + public static final int IV_LENGTH = 128; + + // The keynames of every encrypted column split by ',', it should be set as TBLPROPERTIES + // like TBLPROPERTIES('hive.encrypt.keynames'='hive.k1,hive.k2') + public static final String HIVE_ENCRYPT_KEYNAMES = "hive.encrypt.keynames"; + + // The iv of encrypted column, it generated randomly by IV_LENGTH and stored to TBLPROPERTIES + public static final String HIVE_ENCRYPT_IV = "hive.encrypt.iv"; + + // The format of kms_uri should like "kms://http@localhost:16000/kms" + public static final String KMS_URI = KeyProviderFactory.KEY_PROVIDER_PATH; +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/crypto/CryptoRewriter.java b/serde/src/java/org/apache/hadoop/hive/serde2/crypto/CryptoRewriter.java new file mode 100644 index 0000000..78a19fb --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/crypto/CryptoRewriter.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.crypto; + +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.CryptoCodec; +import org.apache.hadoop.crypto.CryptoInputStream; +import org.apache.hadoop.crypto.CryptoOutputStream; +import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; +import org.apache.hadoop.hive.serde2.AbstractFieldRewriter; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; + +public class CryptoRewriter extends AbstractFieldRewriter { + private CryptoCodec codec; + private byte[] keyBytes; + private byte[] ivBytes; + private boolean isEncode = true; + private static final int bufferSize = 4096; + private static Log LOG = LogFactory.getLog(CryptoRewriter.class); + + @Override + public void init(List columnNames, List columnTypes, Properties properties, + Configuration conf) throws IOException { + String keyNames = properties.getProperty(CryptoConstants.HIVE_ENCRYPT_KEYNAMES); + + if (keyNames == null || keyNames.isEmpty()) { + isEncode = false; + LOG.warn("Please set " + CryptoConstants.HIVE_ENCRYPT_KEYNAMES); + return; + } + + String kmsUri = conf.get(CryptoConstants.KMS_URI); + if (kmsUri == null) { + isEncode = false; + LOG.warn("Please set " + CryptoConstants.KMS_URI); + return; + } + + try { + // currently just support all columns share one key + KeyVersion kv = HiveSerdeKeyManagement.getFirstKey(conf, kmsUri, keyNames); + codec = CryptoCodec.getInstance(conf); + keyBytes = kv.getMaterial(); + ivBytes = CryptoUtil.decodeBytes(properties.getProperty(CryptoConstants.HIVE_ENCRYPT_IV)); + } catch (IOException e) { + // if current user has no permission to access this key + // we will don't do encrypt/decrypt + isEncode = false; + } + } + + @Override + public void encode(int index, ByteStream.Input input, ByteStream.Output output) + throws IOException { + if (!isEncode) { + output.write(input.toBytes()); + return; + } + + byte[] data = input.toBytes(); + int dataLen = data.length; + + // 1. Encrypt data + DataOutputBuffer encryptedDataBuffer = new DataOutputBuffer(); + CryptoOutputStream out = new CryptoOutputStream(encryptedDataBuffer, + codec, bufferSize, keyBytes, ivBytes); + out.write(data, 0, dataLen); + out.flush(); + out.close(); + + // 2. Wrap data through Base64 + byte[] encryptedData = CryptoUtil.copyBytes(encryptedDataBuffer.getData(), + 0, encryptedDataBuffer.getLength()); + byte[] wrappedBytes = Base64.encodeBase64(encryptedData); + + // 3. Write to output + // Encrypted block format: + // +--------------------------+ + // | byte original length | + // +--------------------------+ + // | encrypted block data ... | + // +--------------------------+ + output.write(dataLen); + output.write(wrappedBytes); + LOG.info("Finished encrypting data"); + } + + @Override + public void decode(int index, ByteStream.Input input, ByteStream.Output output) + throws IOException { + if (!isEncode) { + output.write(input.toBytes()); + return; + } + + // 1.1 first read length of origin data + int dataLen = input.read(); + // 1.2 read remaining data bytes + byte[] wrappedBytes = input.toBytes(); + // 1.3 Unwrap data through Base64 + byte[] unwrappedBytes = Base64.decodeBase64(wrappedBytes); + + // 2. Decrypt data + DataInputBuffer decryptedDataBuffer = new DataInputBuffer(); + decryptedDataBuffer.reset(unwrappedBytes, 0, unwrappedBytes.length); + CryptoInputStream in = new CryptoInputStream(decryptedDataBuffer, + codec, bufferSize, keyBytes, ivBytes); + DataInputStream dataIn = new DataInputStream(new BufferedInputStream(in)); + byte[] decryptedData = new byte[dataLen]; + dataIn.readFully(decryptedData); + dataIn.close(); + + // 3. Write to output + output.write(decryptedData); + LOG.info("Finished decrypting data"); + } +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/crypto/CryptoUtil.java b/serde/src/java/org/apache/hadoop/hive/serde2/crypto/CryptoUtil.java new file mode 100644 index 0000000..c544ec5 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/crypto/CryptoUtil.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2.crypto; + +import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; +import java.security.GeneralSecurityException; +import java.security.SecureRandom; +import java.util.Arrays; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; + +public class CryptoUtil { + + /** + * Generate a number of secure, random bytes suitable for cryptographic use. + * + * @param conf + * @param length + */ + public static byte[] randomBytes(Configuration conf, int length) { + final String secureRandomAlg = conf.get( + CommonConfigurationKeysPublic.HADOOP_SECURITY_JAVA_SECURE_RANDOM_ALGORITHM_KEY, + CommonConfigurationKeysPublic.HADOOP_SECURITY_JAVA_SECURE_RANDOM_ALGORITHM_DEFAULT); + SecureRandom random; + try { + random = SecureRandom.getInstance(secureRandomAlg); + } catch (GeneralSecurityException e) { + random = new SecureRandom(); + } + byte[] result = new byte[length / 8]; + random.nextBytes(result); + return result; + } + + /** + * Wrap data through Base64 + * + * @param bytes + * @return wrapped string of given bytes + */ + public static String encodeBytes(byte[] bytes) throws UnsupportedEncodingException { + return bytes == null ? null : new String(Base64.encodeBase64(bytes), Charset.forName("UTF-8")); + } + + /** + * Unwrap data through Base64 + * + * @param str + * @return unwrapped bytes of given string + */ + public static byte[] decodeBytes(String str) throws UnsupportedEncodingException { + return Base64.decodeBase64(str.getBytes(Charset.forName("UTF-8"))); + } + + /** + * Copy len-pos bytes start from b[pos] + * + * @param b + * @param pos + * @param len + */ + public static byte[] copyBytes(byte[] b, int pos, int len) { + return Arrays.copyOfRange(b, pos, len); + } + +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/crypto/HiveSerdeKeyManagement.java b/serde/src/java/org/apache/hadoop/hive/serde2/crypto/HiveSerdeKeyManagement.java new file mode 100644 index 0000000..da6140b --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/crypto/HiveSerdeKeyManagement.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.crypto; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; +import org.apache.hadoop.crypto.key.KeyProviderFactory; +import org.apache.hadoop.security.authorize.AuthorizationException; + +public class HiveSerdeKeyManagement { + private static Log LOG = LogFactory.getLog(HiveSerdeKeyManagement.class); + + /** + * Setup table if set properties of encryption + * + * @param conf + * @param tblProps + */ + public static void setupTableForEncryption(Configuration conf, Map tblProps) + throws IOException { + if(tblProps == null) { + return; + } + + String keyNames = tblProps.get(CryptoConstants.HIVE_ENCRYPT_KEYNAMES); + if (keyNames == null || keyNames.isEmpty()) { + return; + } + + String kmsUri = conf.get(CryptoConstants.KMS_URI); + if (kmsUri == null) { + LOG.warn("Please set " + CryptoConstants.KMS_URI + " if you want to enable encryption"); + return; + } + + // 1. create keys using kms + createKeys(conf, kmsUri, keyNames); + + // 2. generate iv and set to table properties + byte[] ivBytes = CryptoUtil.randomBytes(conf, CryptoConstants.IV_LENGTH); + tblProps.put(CryptoConstants.HIVE_ENCRYPT_IV, CryptoUtil.encodeBytes(ivBytes)); + } + + /** + * Create keys through given key names from kms server + * + * @param conf + * @param kmsUri + * @param keyNames + */ + public static void createKeys(Configuration conf, String kmsUri, String keyNames) + throws IOException { + try { + KeyProvider kp = getKeyProvider(conf, kmsUri); + for (String keyName : getKeyNames(keyNames)) { + KeyVersion kv = kp.getCurrentKey(keyName); + if (kv == null) { + kv = kp.createKey(keyName, new KeyProvider.Options(conf)); + } + } + } catch (AuthorizationException e) { + throw new IOException("Current user has no permission to get/create key", e); + } catch (NoSuchAlgorithmException e) { + throw new IOException("No such algorithm when create key", e); + } + } + + /** + * Get first key through given key names from kms server + * + * @param conf + * @param kmsUri + * @param keyNames + */ + public static KeyVersion getFirstKey(Configuration conf, String kmsUri, String keyNames) + throws IOException { + KeyProvider kp = getKeyProvider(conf, kmsUri); + String keyName = getKeyNames(keyNames).get(0); + KeyVersion kv = kp.getCurrentKey(keyName); + if (kv == null) { + throw new IOException("Can't get the key when do ser/deser"); + } + return kv; + } + + /** + * Get a list of keys through given key names from kms server + * + * @param conf + * @param kmsUri + * @param keyNames + */ + public static List getKeys(Configuration conf, String kmsUri, String keyNames) + throws IOException { + List kvs = new ArrayList(); + KeyProvider kp = getKeyProvider(conf, kmsUri); + for (String keyName : getKeyNames(keyNames)) { + kvs.add(kp.getCurrentKey(keyName)); + } + return kvs; + } + + private static KeyProvider getKeyProvider(Configuration conf, String kmsUri) + throws IOException { + try { + URI uri = new URI(kmsUri); + return KeyProviderFactory.get(uri, conf); + } catch (URISyntaxException e) { + throw new IOException("Bad configuration of " + KeyProviderFactory.KEY_PROVIDER_PATH + + " at " + kmsUri, e); + } + } + + private static List getKeyNames(String keyNames) { + List keys = new ArrayList(); + for (String key : keyNames.split(",")) { + keys.add(key.trim()); + } + return keys; + } +} diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java index edea98d..5dca32c 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazySimpleSerDe.java @@ -390,7 +390,7 @@ private static FieldRewriter createRewriter(String encoderClass, Properties prop try { FieldRewriter rewriter = (FieldRewriter) ReflectionUtils.newInstance(Class.forName(encoderClass), job); - rewriter.init(parameters.columnNames, parameters.columnTypes, properties); + rewriter.init(parameters.columnNames, parameters.columnTypes, properties, job); return rewriter; } catch (Exception e) { throw new SerDeException(e);