Index: shims/src/crypto/java/org/apache/hadoop/hive/crypto/shims/KeyMeta.java =================================================================== --- shims/src/crypto/java/org/apache/hadoop/hive/crypto/shims/KeyMeta.java (revision 0) +++ shims/src/crypto/java/org/apache/hadoop/hive/crypto/shims/KeyMeta.java (revision 0) @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.crypto.shims; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.cryptocontext.provider.KeyContext; + +public class KeyMeta implements Writable { + private KeyContext keyContext; + private String keyProviderType; + private String keyProviderClass; + private String keyProviderParameters; + + public KeyMeta() { + + } + + public KeyMeta(KeyContext keyContext) { + super(); + this.keyContext = keyContext; + } + + public KeyMeta(KeyContext keyContext, String keyProviderType, String keyProviderClass, + String keyProviderParameters) { + super(); + this.keyContext = keyContext; + this.keyProviderType = keyProviderType; + this.keyProviderClass = keyProviderClass; + this.keyProviderParameters = keyProviderParameters; + } + + public KeyContext getKeyContext() { + return keyContext; + } + + public void setKeyContext(KeyContext keyContext) { + this.keyContext = keyContext; + } + + public String getKeyProviderType() { + return keyProviderType; + } + + public void setKeyProviderType(String keyProviderType) { + this.keyProviderType = keyProviderType; + } + + public String getKeyProviderClass() { + return keyProviderClass; + } + + public void setKeyProviderClass(String keyProviderClass) { + this.keyProviderClass = keyProviderClass; + } + + public String getKeyProviderParameters() { + return keyProviderParameters; + } + + public void setKeyProviderParameters(String keyProviderParameters) { + this.keyProviderParameters = keyProviderParameters; + } + + @Override + public void write(DataOutput out) throws IOException { + keyContext.write(out); + + if(keyProviderType != null) { + out.writeBoolean(true); + Text.writeString(out, keyProviderType); + } else { + out.writeBoolean(false); + } + + if(keyProviderClass != null) { + out.writeBoolean(true); + Text.writeString(out, keyProviderClass); + } else { + out.writeBoolean(false); + } + + if(keyProviderParameters != null) { + out.writeBoolean(true); + Text.writeString(out, keyProviderParameters); + } else { + out.writeBoolean(false); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + keyContext = new KeyContext(); + keyContext.readFields(in); + + if(in.readBoolean()) + keyProviderType = Text.readString(in); + else + keyProviderType = null; + + if(in.readBoolean()) + keyProviderClass = Text.readString(in); + else + keyProviderClass = null; + + if(in.readBoolean()) + keyProviderParameters = Text.readString(in); + else + keyProviderParameters = null; + } +} Index: shims/src/crypto/java/org/apache/hadoop/hive/crypto/shims/ObjectSerializer.java =================================================================== --- shims/src/crypto/java/org/apache/hadoop/hive/crypto/shims/ObjectSerializer.java (revision 0) +++ shims/src/crypto/java/org/apache/hadoop/hive/crypto/shims/ObjectSerializer.java (revision 0) @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.crypto.shims; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.InflaterInputStream; + +import org.apache.commons.codec.binary.Base64; + +public class ObjectSerializer { + + public static String serialize(Serializable obj) throws IOException { + if (obj == null) + return ""; + try { + ByteArrayOutputStream serialObj = new ByteArrayOutputStream(); + Deflater def = new Deflater(Deflater.BEST_COMPRESSION); + ObjectOutputStream objStream = new ObjectOutputStream(new DeflaterOutputStream( + serialObj, def)); + objStream.writeObject(obj); + objStream.close(); + return encodeBytes(serialObj.toByteArray()); + } catch (Exception e) { + throw new IOException("Serialization error: " + e.getMessage(), e); + } + } + + public static Object deserialize(String str) throws IOException { + if (str == null || str.length() == 0) + return null; + try { + ByteArrayInputStream serialObj = new ByteArrayInputStream(decodeBytes(str)); + ObjectInputStream objStream = new ObjectInputStream(new InflaterInputStream(serialObj)); + return objStream.readObject(); + } catch (Exception e) { + throw new IOException("Deserialization error: " + e.getMessage(), e); + } + } + + public static String encodeBytes(byte[] bytes) throws UnsupportedEncodingException { + return bytes == null ? null : new String(Base64.encodeBase64(bytes), Charset.forName("UTF-8")); + } + + public static byte[] decodeBytes(String str) throws UnsupportedEncodingException { + return Base64.decodeBase64(str.getBytes(Charset.forName("UTF-8"))); + } +} Index: shims/src/crypto/java/org/apache/hadoop/hive/crypto/shims/HiveCryptoShims.java =================================================================== --- shims/src/crypto/java/org/apache/hadoop/hive/crypto/shims/HiveCryptoShims.java (revision 0) +++ shims/src/crypto/java/org/apache/hadoop/hive/crypto/shims/HiveCryptoShims.java (revision 0) @@ -0,0 +1,1316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.crypto.shims; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import javax.crypto.Cipher; +import javax.crypto.KeyGenerator; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; + +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.IOPrepareCache; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.CreateTableDesc; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.crypto.CryptoCodec; +import org.apache.hadoop.io.crypto.CryptoContext; +import org.apache.hadoop.io.crypto.CryptoException; +import org.apache.hadoop.io.crypto.Decryptor; +import org.apache.hadoop.io.crypto.Encryptor; +import org.apache.hadoop.io.crypto.Key; +import org.apache.hadoop.io.crypto.Key.KeyType; +import org.apache.hadoop.io.crypto.KeyProvider; +import org.apache.hadoop.io.crypto.aes.AESCodec; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.cryptocontext.CryptoContextHelper; +import org.apache.hadoop.mapreduce.cryptocontext.provider.FileMatches; +import org.apache.hadoop.mapreduce.cryptocontext.provider.KeyContext; +import org.apache.hadoop.mapreduce.cryptocontext.provider.KeyProviderCryptoContextProvider; +import org.apache.hadoop.mapreduce.cryptocontext.provider.KeyProviderCryptoContextProvider.KeyProviderConfig; +import org.apache.hadoop.util.ReflectionUtils; + + +public class HiveCryptoShims { + public static final Log LOG = LogFactory.getLog(HiveCryptoShims.class); + + private static final String TMPFILE = ".intermediatefile."; + + private static final String BACKUP_PREFIX = "hive.backup."; + private static final String DEFAULT_KEY_PROVIDER_CLASS = "org.apache.hadoop.io.crypto.KeyStoreKeyProvider"; + + public static final String KEY_PROVIDER_LOCAL = "local"; + public static final String KEY_PROVIDER_REMOTE = "remote"; + + public static final String HIVE_ENCRYPT_MAP_OUTPUT = "hive.encrypt.map.output"; + public static final String HIVE_ENCRYPT_MAP_OUTPUT_CODEC_CLASS = "hive.encrypt.map.output.codec"; + + public static final String HIVE_ENCRYPT_TMPFILE = "hive.encrypt.tmpfile"; + + public static final String HIVE_ENCRYPT_TMPFILE_KEY_NAME = "hive.encrypt.tmpfile.keyName"; + public static final String HIVE_ENCRYPT_TMPFILE_KEY_PROVIDER_TYPE = "hive.encrypt.tmpfile.keyProviderType"; + public static final String HIVE_ENCRYPT_TMPFILE_KEY_PROVIDER_CLASS = "hive.encrypt.tmpfile.keyProvider"; + public static final String HIVE_ENCRYPT_TMPFILE_KEY_PROVIDER_PARAMETERS = "hive.encrypt.tmpfile.keyProviderParameters"; + + public static final String COMPRESS = "mapred.output.compress"; + public static final String COMPRESS_CODEC = "mapred.output.compression.codec"; + public static final String OUTPUT_CODEC = "avro.output.codec"; + + public static final String HIVE_ENCRYPT_KEY_PROVIDER_TYPE = "hive.encrypt.keyProviderType"; + public static final String HIVE_ENCRYPT_KEY_PROVIDER_CLASS = "hive.encrypt.keyProvider"; + public static final String HIVE_ENCRYPT_KEY_PROVIDER_PARAMETERS = "hive.encrypt.keyProviderParameters"; + + public static final String HIVE_ENCRYPT_KEY_RESOLVER_CLASS = "hive.encrypt.keyResolver"; + + public static final String HIVE_ENCRYPT_ENABLE = "hive.encrypt.enable"; + public static final String HIVE_ENCRYPT_CODEC_CLASS = "hive.encrypt.codec"; + public static final String HIVE_ENCRYPT_KEY_NAME = "hive.encrypt.keyName"; + public static final String HIVE_ENCRYPT_RAW_KEY = "hive.encrypt.rawKey"; + public static final String HIVE_ENCRYPT_KEY_MANAGED = "hive.encrypt.key.managed"; + + public static final String KEYSTORE_TYPE = "JCEKS"; + + protected Configuration conf; + protected HiveCryptoContext context; + protected HiveKeyResolver keyResolver; + + public HiveCryptoShims(Configuration conf) throws CryptoException { + this(conf, true); + } + + public HiveCryptoShims(Configuration conf, boolean resolvingKey) throws CryptoException { + this.conf = conf; + this.context = new HiveCryptoContext(); + + context.init(conf); + + if(resolvingKey) { + // Only load the key resolver when resolving key is needed + keyResolver = getKeyResolver(conf); + } + } + + public void load() throws IOException { + context.load(); + } + + public static void setupTableForEncryption(Configuration conf, CreateTableDesc crtTbl) throws HiveException { + Map tblProps = crtTbl.getTblProps(); + if(tblProps == null) + return; + + String encryption = tblProps.get(HIVE_ENCRYPT_ENABLE); + if(!Boolean.parseBoolean(encryption)) + return; + + // encryption is enabled + // check either keyName is specified + String keyName = tblProps.get(HIVE_ENCRYPT_KEY_NAME); + if(keyName == null || + keyName.isEmpty()) { + // no key name is specified, by default auto key + Key key = generateKey(); + + TwoTiredKey twoTiredKey = new TwoTiredKey(conf, key); + byte[] wrappedKey = twoTiredKey.getWrappedKey(); + try { + String encodedKey = ObjectSerializer.encodeBytes(wrappedKey); + tblProps.put(HIVE_ENCRYPT_RAW_KEY, encodedKey); + } catch(UnsupportedEncodingException e) { + throw new HiveException(e); + } + } else { + // key name specified, check the key mode + String keyManaged = tblProps.get(HIVE_ENCRYPT_KEY_MANAGED); + if(keyManaged != null && + keyManaged.equals("true")) { + // retrieve the key if managed + String keyProviderClass = tblProps.get(HIVE_ENCRYPT_KEY_PROVIDER_CLASS); + String keyProviderParameters = tblProps.get(HIVE_ENCRYPT_KEY_PROVIDER_PARAMETERS); + if((keyProviderClass == null || keyProviderClass.isEmpty()) && + (keyProviderParameters == null || keyProviderParameters.isEmpty())) { + // Only use global when both keyProvider and its parameters is not specified + keyProviderClass = getKeyProviderClass(conf); + keyProviderParameters = getKeyProviderParameters(conf); + } + + Key key = retrieveKey(conf, keyName, keyProviderClass, keyProviderParameters); + TwoTiredKey twoTiredKey = new TwoTiredKey(conf, key); + byte[] wrappedKey = twoTiredKey.getWrappedKey(); + try { + String encodedKey = ObjectSerializer.encodeBytes(wrappedKey); + tblProps.put(HIVE_ENCRYPT_RAW_KEY, encodedKey); + + // Remove the key name for this case + tblProps.remove(HIVE_ENCRYPT_KEY_NAME); + } catch(UnsupportedEncodingException e) { + throw new HiveException(e); + } + } + } + } + + public static void setupMapRedJob(JobConf job, MapredWork work) throws HiveException { + try { + HiveCryptoShims helper = new HiveCryptoShims(job, true); + helper.setupMapRedJob(work); + } catch(CryptoException e) { + throw new HiveException(e); + } + } + + public static void setupMapRedJob(JobConf job, MapWork work) throws HiveException { + try { + HiveCryptoShims helper = new HiveCryptoShims(job, true); + helper.setupMapRedJob(work); + } catch(CryptoException e) { + throw new HiveException(e); + } + } + + public static void setupFetchJob(JobConf job, FetchWork work) throws HiveException { + try { + HiveCryptoShims helper = new HiveCryptoShims(job, true); + helper.setupFetchJob(work); + } catch(CryptoException e) { + throw new HiveException(e); + } + } + + public static void setupMapRedLocalJob(JobConf job, MapredLocalWork work) throws HiveException { + try { + HiveCryptoShims helper = new HiveCryptoShims(job, true); + helper.setupMapRedLocalJob(work); + } catch(CryptoException e) { + throw new HiveException(e); + } + } + + public static void setInputCryptoContext(Configuration conf, Path file) throws IOException { + try { + HiveCryptoShims helper = new HiveCryptoShims(conf, false); + helper.load(); + + // call helper to set the crypto context for a file input format + helper.setInputCryptoContext(file); + } catch(CryptoException e) { + throw new IOException(e); + } + } + + public static void setInputCryptoContext(Configuration conf, TableDesc tableDesc, Path file) throws IOException { + try { + HiveCryptoShims helper = new HiveCryptoShims(conf, false); + helper.load(); + + // call helper to set the crypto context for a file input format + if(tableDesc != null) + helper.setInputCryptoContext(tableDesc, file); + else if(file != null) + helper.setInputCryptoContext(file); + else + LOG.info("Crypto context is not set because there is no table desc or file context."); + } catch(CryptoException e) { + throw new IOException(e); + } + } + + public static boolean setOutputCryptoContext(Configuration conf, TableDesc tableDesc, + Path outputPath) throws IOException { + try { + HiveCryptoShims helper = new HiveCryptoShims(conf, false); + helper.load(); + + // call helper to set the crypto context for output file + return helper.setOutputCryptoContextForTable(conf, tableDesc, outputPath); + } catch(CryptoException e) { + throw new IOException(e); + } + } + + public static boolean hasOutputCryptoContext(Configuration conf, TableDesc tableDesc) + throws HiveException { + try { + HiveCryptoShims helper = new HiveCryptoShims(conf, false); + helper.load(); + + // call helper to set the crypto context for output file + return helper.hasOutputCryptoContextForTable(conf, tableDesc); + } catch(IOException e) { + throw new HiveException(e); + } catch(CryptoException e) { + throw new HiveException(e); + } + } + + public static void setInputCryptoContext(CompressionCodec codec, + Configuration conf, Path file) throws IOException { + if(codec == null) + return; + + if(codec instanceof CryptoCodec && + conf instanceof JobConf) { + CryptoContextHelper.resetInputCryptoContext((CryptoCodec) codec, + (JobConf)conf, file); + } + } + + public static void setOutputCryptoContext(CompressionCodec codec, + Configuration conf, Path file) throws IOException { + if(codec == null) + return; + + if(codec instanceof CryptoCodec && + conf instanceof JobConf) { + CryptoContextHelper.resetOutputCryptoContext((CryptoCodec) codec, + (JobConf)conf, file); + } + } + + public static Object getOutputCompressContext(Configuration conf, Path file, String codec) + throws IOException { + if(codec == null || + codec.isEmpty()) + return null; + + if(!(conf instanceof JobConf)) + return null; + + return (Object) CryptoContextHelper.getOutputCryptoContext( + (JobConf)conf, file); + } + + public static Object getInputCompressContext(Configuration conf, Path file) + throws IOException { + if(!(conf instanceof JobConf)) + return null; + + return (Object) CryptoContextHelper.getInputCryptoContext( + (JobConf)conf, file); + } + + public static void setCryptoContext(CompressionCodec codec, Object codecContext) { + if(codec instanceof CryptoCodec) { + CryptoCodec cryptCodec = (CryptoCodec)codec; + + if(codecContext == null || + codecContext instanceof CryptoContext) { + cryptCodec.setCryptoContext((CryptoContext)codecContext); + } + } + } + + public static void setCryptoContext(CompressionCodec codec, Compressor compressor) { + // reset to the codec context from codec + if(compressor instanceof Encryptor && + codec instanceof CryptoCodec ) { + Encryptor encryptor = (Encryptor) compressor; + CryptoCodec cryptoCodec = (CryptoCodec)codec; + + encryptor.setCryptoContext(cryptoCodec.getCryptoContext()); + } + } + + public static void setCryptoContext(CompressionCodec codec, Decompressor decompressor) { + // reset to the codec context from codec + if(decompressor instanceof Decryptor && + codec instanceof CryptoCodec ) { + Decryptor decryptor = (Decryptor) decompressor; + CryptoCodec cryptoCodec = (CryptoCodec)codec; + + decryptor.setCryptoContext(cryptoCodec.getCryptoContext()); + } + } + + public static void setFileInputFormatCodec(String codecClassName, Configuration conf) { + // the codec for inputs matches by extension + if(codecClassName == null || + codecClassName.isEmpty()) + codecClassName = AESCodec.class.getName(); + + List> oldCodecs = + CompressionCodecFactory.getCodecClasses(conf); + + @SuppressWarnings("rawtypes") + List codecs = new ArrayList(); + for(Class codec : oldCodecs) + codecs.add(codec); + + Class codecClass = + getClassByName(conf, codecClassName, null, CompressionCodec.class); + codecs.add(codecClass); + + CompressionCodecFactory.setCodecClasses(conf, codecs); + } + + public static void setFileOutputFormatCodec(String codecClassName, + Configuration conf) { + if(!(conf instanceof JobConf)) + return; + + if(codecClassName == null || + codecClassName.isEmpty()) + codecClassName = AESCodec.class.getName(); + + Class codecClass = + getClassByName(conf, codecClassName, null, CompressionCodec.class); + + FileOutputFormat.setOutputCompressorClass((JobConf)conf, codecClass); + } + + public static Object getAvroReaderCodecContext(Configuration conf, Path path) + throws IOException { + return AvroCodecContext.getReaderCodecContext(conf, path); + } + + public static DataFileReader createAvroReader(SeekableInput sin, + GenericDatumReader reader, + Object codecContext) throws IOException { + return AvroCodecContext.createReader(sin, reader, codecContext); + } + + public static void setAvroWriterCodecContext(DataFileWriter writer, + Configuration conf, Path path) throws IOException { + AvroCodecContext.setWriterCodecContext(writer, conf, path); + } + + protected void setupMapRedJob(MapredWork work) throws HiveException { + setupMapRedJob(work.getMapWork(), work.getAllOperators()); + } + + protected void setupMapRedJob(MapWork work) throws HiveException { + setupMapRedJob(work, work.getAllOperators()); + } + + protected void setupMapRedJob(MapWork mapWork, List> opList) throws HiveException { + if(!(conf instanceof JobConf)) { + return; + } + + JobConf jobConf = (JobConf)conf; + + // for input we backup + backupConf(jobConf, "io.compression.codecs"); + + // for store we backup the original properties of COMPRESS and COMPRESS_CODEC + backupConf(jobConf, COMPRESS); + backupConf(jobConf, COMPRESS_CODEC); + backupConf(jobConf, OUTPUT_CODEC); + + // add the crypto context for input tables + setupInputCryptoContext(jobConf, mapWork); + + // setup the output tables for FileSinkOperator + setupOutputCryptoContext(jobConf, opList); + + // setup temp file key meta + setupTmpFileCryptoContext(jobConf); + + context.setupJob(); + + //set up the codec for intermediate output if needed + if(jobConf.getBoolean(HIVE_ENCRYPT_MAP_OUTPUT, false)) { + setMapOutputCryptoContext(jobConf); + } + + cleanUp(jobConf); + } + + protected void setupFetchJob(FetchWork work) throws HiveException { + if(!(conf instanceof JobConf)) { + return; + } + + JobConf jobConf = (JobConf)conf; + + // for input we backup + backupConf(jobConf, "io.compression.codecs"); + + // add the crypto context for input tables + setupInputCryptoContext(jobConf, work); + + // setup temp file key meta + setupTmpFileCryptoContext(jobConf); + + context.setupJob(false); + } + + protected void setupMapRedLocalJob(MapredLocalWork work) throws HiveException { + if(!(conf instanceof JobConf)) { + return; + } + + JobConf jobConf = (JobConf)conf; + + // for input we backup + backupConf(jobConf, "io.compression.codecs"); + + // add the crypto context for input tables + setupInputCryptoContext(jobConf, work); + + // setup temp file key meta + setupTmpFileCryptoContext(jobConf); + + context.setupJob(false); + } + + protected void addTableCryptoContext(TableDesc tableDesc) + throws HiveException { + // IMPROVE + // if the table already been processed, don't process it again + String id = getContextId(tableDesc); + KeyMeta keyMeta = context.getKey(id); + if(keyMeta != null) { + // ready added + return; + } + + keyMeta = getTableKeyMeta(conf, tableDesc); + if(keyMeta == null) + return; + + // add to context + context.addKey(id, keyMeta); + } + + protected static boolean isEncryptTmpFile(Configuration conf) { + if(conf.getBoolean(HIVE_ENCRYPT_TMPFILE, false)) + return true; + + return false; + } + + protected void setupInputCryptoContext(JobConf job, MapWork work) + throws HiveException { + Map pathToPartitionInfo = work.getPathToPartitionInfo(); + Collection parts = pathToPartitionInfo.values(); + for(PartitionDesc part : parts) { + TableDesc tableDesc = part.getTableDesc(); + if(tableDesc == null) + continue; + + addTableCryptoContext(tableDesc); + } + } + + protected void setupOutputCryptoContext(Configuration conf, List> opList) + throws HiveException { + for (Operator op : opList) { + if (op instanceof FileSinkOperator) { + FileSinkOperator fsop = (FileSinkOperator) op; + FileSinkDesc fsDesc = fsop.getConf(); + TableDesc tableDesc = fsDesc.getTableInfo(); + if(tableDesc != null) { + addTableCryptoContext(tableDesc); + } + } + } + } + + protected void setupInputCryptoContext(JobConf job, FetchWork work) + throws HiveException { + TableDesc tblDesc = work.getTblDesc(); + if(tblDesc != null) { + addTableCryptoContext(tblDesc); + } + + Collection parts = work.getPartDesc(); + if(parts != null) { + for(PartitionDesc part : parts) { + TableDesc tableDesc = part.getTableDesc(); + if(tableDesc == null) + continue; + + addTableCryptoContext(tableDesc); + } + } + } + + protected void setupInputCryptoContext(JobConf job, MapredLocalWork work) + throws HiveException { + for (Map.Entry entry : work.getAliasToFetchWork().entrySet()) { + FetchWork fetchWork = entry.getValue(); + setupInputCryptoContext(job, fetchWork); + } + } + + protected void setupTmpFileCryptoContext(Configuration conf) throws HiveException { + if(!isEncryptTmpFile(conf)) + return; + + String keyName = conf.get(HIVE_ENCRYPT_TMPFILE_KEY_NAME); + if(keyName == null || + keyName.isEmpty()) { + throw new HiveException("No key name specified for tmp file encryption."); + } + + String keyProviderType = conf.get(HIVE_ENCRYPT_TMPFILE_KEY_PROVIDER_TYPE); + String keyProviderClass = conf.get(HIVE_ENCRYPT_TMPFILE_KEY_PROVIDER_CLASS); + String keyProviderParameters = conf.get(HIVE_ENCRYPT_TMPFILE_KEY_PROVIDER_PARAMETERS); + + KeyMeta keyMeta = localizeKey(conf, keyName, + keyProviderType, keyProviderClass, keyProviderParameters); + if(keyMeta == null) + return; + + // save it to load context + String id = TMPFILE; + context.addKey(id, keyMeta); + } + + protected CryptoContext getTemFileCryptoContext(Configuration conf) throws IOException { + String id = TMPFILE; + KeyMeta keyMeta = context.getKey(id); + if(keyMeta == null) + return null; + + // retrieve the key if needed + Key key = convertKey(keyMeta, conf); + + // set the codec for output + return new CryptoContext(key, null, null); + } + + protected void setMapOutputCryptoContext(Configuration conf) throws HiveException { + //try to get the key + if(!(conf instanceof JobConf)) + return; + + String keyName = conf.get(HIVE_ENCRYPT_TMPFILE_KEY_NAME); + if(keyName == null || + keyName.isEmpty()) { + throw new HiveException("No key name specified for tmp file encryption."); + } + + String keyProviderType = conf.get(HIVE_ENCRYPT_TMPFILE_KEY_PROVIDER_TYPE); + String keyProviderClass = conf.get(HIVE_ENCRYPT_TMPFILE_KEY_PROVIDER_CLASS); + String keyProviderParameters = conf.get(HIVE_ENCRYPT_TMPFILE_KEY_PROVIDER_PARAMETERS); + + boolean clientSide = true; + if(KEY_PROVIDER_REMOTE.equals(keyProviderType)) + clientSide = false; + + if(keyProviderClass == null || + keyProviderClass.isEmpty()) { + // use the default + keyProviderClass = DEFAULT_KEY_PROVIDER_CLASS; + } + + try { + String codecClassName = conf.get(HIVE_ENCRYPT_MAP_OUTPUT_CODEC_CLASS); + setMapOutputCodec((JobConf)conf, codecClassName); + + // set the map ouput key provider + KeyProviderConfig keyProviderConfig = + new KeyProviderConfig(keyProviderClass, keyProviderParameters); + KeyContext keyContext = KeyContext.refer(keyName, KeyType.SYMMETRIC_KEY, + null, 0); + FileMatches fileMatches = new FileMatches(keyContext); + + KeyProviderCryptoContextProvider.setMapOutputCryptoContextProvider((JobConf)conf, + fileMatches, clientSide, keyProviderConfig); + } catch(IOException e) { + throw new HiveException(e); + } catch(CryptoException e) { + throw new HiveException(e); + } + } + + protected KeyMeta getTableKeyMeta(Configuration conf, TableDesc tableDesc) + throws HiveException { + KeyMeta keyMeta = getKeyMetaForTable(conf, tableDesc); + if(keyMeta != null) { + return localizeKey(conf, keyMeta); + } + + return matchTableKey(conf, tableDesc); + } + + protected String getContextId(TableDesc tableDesc) { + String tableName = tableDesc.getTableName(); + if(tableName == null) + tableName = ""; + + return tableName; + } + + protected KeyMeta getKeyMetaForTable(Configuration conf, TableDesc tableDesc) + throws HiveException { + Properties properties = tableDesc.getProperties(); + if(properties == null) + return null; + + // Check whether the table is set with encryption to true + String encryption = properties.getProperty(HIVE_ENCRYPT_ENABLE); + if(!Boolean.parseBoolean(encryption)) + return null; + + try { + String keyName = properties.getProperty(HIVE_ENCRYPT_KEY_NAME); + if(keyName != null && + !keyName.isEmpty()) { + //key name specified + KeyContext keyContext = KeyContext.refer(keyName, KeyType.SYMMETRIC_KEY, + null, 0); + KeyMeta keyMeta = new KeyMeta(keyContext, + properties.getProperty(HIVE_ENCRYPT_KEY_PROVIDER_TYPE), + properties.getProperty(HIVE_ENCRYPT_KEY_PROVIDER_CLASS), + properties.getProperty(HIVE_ENCRYPT_KEY_PROVIDER_PARAMETERS)); + + return localizeKey(conf, keyMeta); + } else { + // try raw key + String encodedRawKey = properties.getProperty(HIVE_ENCRYPT_RAW_KEY); + if(encodedRawKey == null || + encodedRawKey.isEmpty()) + return null; + + // decode and decrypt with the cluster master key + Key key = decodeRawKey(conf, encodedRawKey); + KeyContext keyContext = KeyContext.fromKey(key); + return new KeyMeta(keyContext); + } + } catch(CryptoException e) { + throw new HiveException(e); + } + } + + protected Key decodeRawKey(Configuration conf, String encodedRawKey) throws HiveException { + try { + byte[] encryptedKeyBytes = ObjectSerializer.decodeBytes(encodedRawKey); + TwoTiredKey twoTiredKey = new TwoTiredKey(conf, encryptedKeyBytes); + return twoTiredKey.getKey(); + } catch(UnsupportedEncodingException e) { + throw new HiveException(e); + } + } + + protected static Key generateKey() throws HiveException { + try { + KeyGenerator keyGen = KeyGenerator.getInstance("AES"); + int maxKeyLength = Cipher.getMaxAllowedKeyLength("AES"); + int keyLength = 128; + if (maxKeyLength >=256) + keyLength = 256; + + // cryptograpphic secure random + SecureRandom random = new SecureRandom(); + keyGen.init(keyLength, random); + + SecretKey secretKey = keyGen.generateKey(); + byte[] rawKey = secretKey.getEncoded(); + + Key key = new Key(Key.KeyType.SYMMETRIC_KEY, Key.AES, keyLength, rawKey); + return key; + } catch(NoSuchAlgorithmException e) { + throw new HiveException(e); + } + } + + protected String getTableCodec(Configuration conf, + TableDesc tableDesc, KeyMeta keyMeta) throws IOException{ + // try first from table properties + String codec = getCodecForTable(conf, tableDesc); + if(codec != null && + !codec.isEmpty()) { + return codec; + } + + // Get from the global settings + return conf.get(HIVE_ENCRYPT_CODEC_CLASS); + } + + protected String getCodecForTable(Configuration conf, TableDesc tableDesc) { + // get codec properties from the table properties + Properties properties = tableDesc.getProperties(); + if(properties == null) + return null; + + return properties.getProperty(HIVE_ENCRYPT_CODEC_CLASS); + } + + protected KeyMeta matchTableKey(Configuration conf, TableDesc tableDesc) throws HiveException { + if(keyResolver == null) + return null; + + KeyMeta keyMeta = keyResolver.resovleKey(tableDesc); + if(keyMeta == null) + return null; + + return localizeKey(conf, keyMeta); + } + + protected static HiveKeyResolver getKeyResolver(Configuration conf) throws CryptoException { + String keyResolverClassName = conf.get(HIVE_ENCRYPT_KEY_RESOLVER_CLASS); + if(keyResolverClassName == null || + keyResolverClassName.isEmpty()) + return null; + + return getKeyResolver(conf, keyResolverClassName); + } + + protected static HiveKeyResolver getKeyResolver(Configuration conf, String keyResolverClassName) + throws CryptoException { + Class keyResolverClass = + getClassByName(conf, keyResolverClassName, null, HiveKeyResolver.class); + + if(keyResolverClass == null) + throw new CryptoException("Key resolver class cannot be found."); + + HiveKeyResolver keyResolver = + (HiveKeyResolver) ReflectionUtils.newInstance(keyResolverClass, conf); + if(keyResolver == null) + throw new CryptoException("Failed to instance key resolver: " + keyResolverClassName); + + keyResolver.init(conf); + return keyResolver; + } + + protected KeyMeta localizeKey(Configuration conf, String keyName) throws HiveException { + String keyProviderType = getKeyProviderType(conf); + String keyProviderClass = getKeyProviderClass(conf); + String keyProviderParameters = getKeyProviderParameters(conf); + + return localizeKey(conf, keyName, + keyProviderType, keyProviderClass, keyProviderParameters); + } + + protected KeyMeta localizeKey(Configuration conf, String keyName, + String keyProviderType, String keyProviderClass, String keyProviderParameters) throws HiveException { + + if(!KEY_PROVIDER_REMOTE.equals(keyProviderType)) { + //local, retrieve the key + Key key = retrieveKey(conf, keyName, keyProviderClass, keyProviderParameters); + if(key == null) + throw new HiveException("Failed to retrieve key '" + keyName + "' using " + keyProviderClass); + + KeyContext keyContext = KeyContext.fromKey(key); + return new KeyMeta(keyContext); + } else { + //remote, make a reference key context for the name + try { + KeyContext keyContext = KeyContext.refer(keyName, KeyType.SYMMETRIC_KEY, + null, 0); + return new KeyMeta(keyContext, + keyProviderType, keyProviderClass, keyProviderParameters); + } catch(CryptoException e) { + throw new HiveException(e); + } + } + } + + protected KeyMeta localizeKey(Configuration conf, KeyMeta keyMeta) throws HiveException { + // Check key provider override + String keyProviderType = keyMeta.getKeyProviderType(); + String keyProviderClass = keyMeta.getKeyProviderClass(); + String keyProviderParameters = keyMeta.getKeyProviderParameters(); + if((keyProviderClass == null || + keyProviderClass.isEmpty()) && + (keyProviderParameters == null || + keyProviderParameters.isEmpty())) { + // Only use global when both keyProvider and its parameters is not specified + keyProviderType = getKeyProviderType(conf); + keyProviderClass = getKeyProviderClass(conf); + keyProviderParameters = getKeyProviderParameters(conf); + } + + try { + KeyContext keyContext = keyMeta.getKeyContext(); + if(keyContext.getType() == KeyContext.KeyContextType.REFERENCE) { + String keyName = keyContext.toReference(); + return localizeKey(conf, keyName, + keyProviderType, keyProviderClass, keyProviderParameters); + } else { + // raw key + return new KeyMeta(keyContext); + } + } catch(CryptoException e) { + throw new HiveException(e); + } + } + + protected static String getKeyProviderType(Configuration conf) { + return conf.get(HIVE_ENCRYPT_KEY_PROVIDER_TYPE); + } + + protected static String getKeyProviderClass(Configuration conf) { + return conf.get(HIVE_ENCRYPT_KEY_PROVIDER_CLASS); + } + + protected static String getKeyProviderParameters(Configuration conf) { + return conf.get(HIVE_ENCRYPT_KEY_PROVIDER_PARAMETERS); + } + + protected static Key retrieveKey(Configuration conf, KeyMeta keyMeta) + throws HiveException { + KeyContext keyContext = keyMeta.getKeyContext(); + if(keyContext.getType() != KeyContext.KeyContextType.REFERENCE) { + return null; + } + + try { + String keyName = keyContext.toReference(); + return retrieveKey(conf, keyName, + keyMeta.getKeyProviderClass(), keyMeta.getKeyProviderParameters()); + } catch(CryptoException e) { + throw new HiveException(e); + } + } + + public static Key retrieveKey(Configuration conf, + String keyName, String keyProviderClass, String keyProviderParameters ) + throws HiveException { + if(keyName == null || + keyName.isEmpty()) + return null; + + if(keyProviderClass == null || + keyProviderClass.isEmpty()) { + // use the default + keyProviderClass = DEFAULT_KEY_PROVIDER_CLASS; + } + + try { + // create a new instance of KeyProvider + KeyProvider keyProvider = getKeyProvider(conf, + keyProviderClass, keyProviderParameters); + String[] keyNames = {keyName}; + Key[] keys = keyProvider.getKeys(keyNames); + if(keys == null || + keys.length != 1) { + // failed to retrieve the key + throw new CryptoException("Failed to retrieve the key."); + } + + Key key = keys[0]; + + //set the cryptographic length according to the raw key + if(key.getCryptographicLength() == 0 && + key.getRawKey() != null) { + key.setCryptographicLength(key.getRawKey().length * 8); + } + + return key; + } catch(CryptoException e) { + throw new HiveException(e); + } + } + + public static KeyProvider getKeyProvider(Configuration conf, + String keyProviderClassName, String keyProviderParameters) throws CryptoException { + if(keyProviderClassName == null || + keyProviderClassName.isEmpty()) + return null; + + Class keyProviderClass = + getClassByName(conf, keyProviderClassName, null, KeyProvider.class); + + if(keyProviderClass == null) + throw new CryptoException("Key provider class cannot be found."); + + KeyProvider keyProvider = + (KeyProvider) ReflectionUtils.newInstance(keyProviderClass, conf); + + if(keyProvider == null) + throw new CryptoException("Failed to instance key provider: " + keyProviderClassName); + + keyProvider.init(keyProviderParameters); + return keyProvider; + } + + public static Class getClassByName(Configuration conf, String className, + Class defaultValue, + Class xface) { + try { + Class theClass = conf.getClassByName(className); + if (theClass != null && !xface.isAssignableFrom(theClass)) + throw new RuntimeException(theClass+" not "+xface.getName()); + else if (theClass != null) + return theClass.asSubclass(xface); + else + return defaultValue; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void exportKeyStore(List tbls, String location, + String password, Configuration conf) throws Throwable { + KeyStore keyStore = KeyStore.getInstance(KEYSTORE_TYPE); + char[] passwordChars = password.toCharArray(); + + File chkFileExist = new File(location); + if(!chkFileExist.exists()) { + keyStore.load(null, passwordChars); + } else { + keyStore.load(new FileInputStream(chkFileExist), passwordChars); + } + + SecretKey sk; + KeyStore.SecretKeyEntry skEntry; + + for (Table tbl : tbls) { + if(keyStore.containsAlias(tbl.getTableName())) { + throw new IllegalArgumentException("the alias name "+ + tbl.getTableName()+" already exist"); + } + + String encryption = tbl.getProperty(HIVE_ENCRYPT_ENABLE); + if (!Boolean.parseBoolean(encryption)) + continue; + String encodedKey = tbl.getProperty(HIVE_ENCRYPT_RAW_KEY); + if(encodedKey == null) + continue; + byte[] aesEncodedKey = ObjectSerializer.decodeBytes(encodedKey); + + TwoTiredKey ttk = new TwoTiredKey(conf, aesEncodedKey); + byte[] rawKey = ttk.getKey().getRawKey(); + sk = new SecretKeySpec(rawKey, 0, rawKey.length, "AES"); + skEntry = new KeyStore.SecretKeyEntry(sk); + keyStore.setEntry(tbl.getTableName(), skEntry, + new KeyStore.PasswordProtection(passwordChars)); + } + + FileOutputStream fos = null; + + try { + fos = new FileOutputStream(location); + keyStore.store(fos, passwordChars); + } finally { + fos.close(); + } + } + + protected void setInputCryptoContext(Path file) throws IOException { + TableDesc tableDesc = getTableDescForFile(conf, file); + if(tableDesc == null) { + LOG.info("No table desc is found for file: " + file); + return; + } + + setInputCryptoContext(tableDesc, file); + } + + protected void setInputCryptoContext(TableDesc tableDesc, Path file) throws IOException { + LOG.info("Trying to set runtime crypto context for input file: " + file + + " from table: " + tableDesc.getTableName()); + if(!(conf instanceof JobConf)) + return; + + JobConf jobConf = (JobConf) conf; + + String id = getContextId(tableDesc); + KeyMeta keyMeta = context.getKey(id); + if(keyMeta != null) { + // retrieve the key if needed + Key key = convertKey(keyMeta, jobConf); + + //set the codec for inputs + String codec = getTableCodec(jobConf, tableDesc, keyMeta); + setFileInputFormatCodec(codec, jobConf); + + try { + // set the input key provider + KeyProviderConfig keyProviderConfig = + new KeyProviderConfig(DirectKeyProvider.class.getName(), null); + + KeyContext encodedKey = getEncodedKeyContext(key); + FileMatches fileMatches = new FileMatches(encodedKey); + + KeyProviderCryptoContextProvider.setInputCryptoContextProvider(jobConf, + fileMatches, true, keyProviderConfig); + } catch(CryptoException e) { + throw new IOException(e); + } + + LOG.info("Successfully set crypto context for file: " + file + " from table: " + id); + } else { + // no key provided + // reset the input key provider properties + restoreConf(jobConf, "io.compression.codecs", ""); + } + } + + protected boolean hasOutputCryptoContextForTable(Configuration conf, + TableDesc tableDesc) throws IOException { + if(!(conf instanceof JobConf)) + return false; + + String id = getContextId(tableDesc); + KeyMeta keyMeta = context.getKey(id); + if(keyMeta != null) + return true; + + return false; + } + + protected boolean setOutputCryptoContextForTable(Configuration conf, + TableDesc tableDesc, Path outputPath) throws IOException { + LOG.info("Trying to set runtime crypto context for output: " + outputPath); + if(!(conf instanceof JobConf)) + return false; + + JobConf jobConf = (JobConf) conf; + + String id = getContextId(tableDesc); + KeyMeta keyMeta = context.getKey(id); + if(keyMeta != null) { + // retrieve the key if needed + Key key = convertKey(keyMeta, jobConf); + + // set the codec for output + String codec = getTableCodec(jobConf, tableDesc, keyMeta); + setFileOutputFormatCodec(codec, jobConf); + + // set the code for avro file + setAvroOutputFormatCodec(codec, jobConf); + + try { + // set the output key provider + KeyProviderConfig keyProviderConfig = + new KeyProviderConfig(DirectKeyProvider.class.getName(), null); + + KeyContext encodedKey = getEncodedKeyContext(key); + FileMatches fileMatches = new FileMatches(encodedKey); + + KeyProviderCryptoContextProvider.setOutputCryptoContextProvider(jobConf, + fileMatches, true, keyProviderConfig); + } catch(CryptoException e) { + throw new IOException(e); + } + + LOG.info("Successfully set crypto context for ouput: " + outputPath + " from table: " + id); + return true; + } else { + // no key provided + // reset the output key provider properties + restoreConf(jobConf, COMPRESS, "false"); + restoreConf(jobConf, COMPRESS_CODEC, ""); + restoreConf(jobConf, OUTPUT_CODEC, ""); + return false; + } + } + + protected static void setAvroOutputFormatCodec(String codecName, + Configuration conf) { + if(!(conf instanceof JobConf)) + return; + + if(codecName == null || + codecName.isEmpty()) + codecName = "aes"; + + conf.setBoolean(COMPRESS, true); + conf.set(OUTPUT_CODEC, codecName); + } + + protected void setMapOutputCodec(JobConf jobConf, + String codecClassName) throws IOException { + if(codecClassName == null || + codecClassName.isEmpty()) + codecClassName = AESCodec.class.getName(); + + Class codecClass = + getClassByName(jobConf, codecClassName, null, CompressionCodec.class); + + jobConf.setMapOutputCompressorClass(codecClass); + } + + protected TableDesc getTableDescForFile(Configuration conf, Path path) throws IOException { + Map pathToPartitionInfo = Utilities + .getMapRedWork(conf).getMapWork().getPathToPartitionInfo(); + + PartitionDesc part = HiveFileFormatUtils + .getPartitionDescFromPathRecursively(pathToPartitionInfo, + path, IOPrepareCache.get().getPartitionDescMap()); + return part.getTableDesc(); + } + + protected static void backupConf(Configuration conf, String key) { + String v = conf.get(key); + if(v != null) { + String backupKey = BACKUP_PREFIX + key; + conf.set(backupKey, v); + } + } + + protected static void copyConf(Configuration from, Configuration to, String key) { + String v = from.get(key); + if(v != null) { + to.set(key, v); + } + } + + protected static void restoreConf(Configuration conf, String key, String defaultValue) { + String backupKey = BACKUP_PREFIX + key; + String v = conf.get(backupKey); + if(v != null) { + conf.set(key, v); + } else { + //original has no such key + //check whether we have the key now + v = conf.get(key); + if(v != null && + defaultValue != null) { + //the value was changed, but there is no original value + conf.set(key, defaultValue); + } + } + } + + protected void cleanUp(Configuration conf) { + cleanConf(conf, HIVE_ENCRYPT_TMPFILE_KEY_PROVIDER_PARAMETERS); + cleanConf(conf, HIVE_ENCRYPT_KEY_PROVIDER_PARAMETERS); + } + + protected static void cleanConf(Configuration conf, String key) { + String v = conf.get(key); + if(v != null) { + conf.set(key, ""); + } + } + + protected Key convertKey(KeyMeta keyMeta, Configuration conf) throws IOException{ + try { + KeyContext keyContext = keyMeta.getKeyContext(); + if(keyContext.getType() == KeyContext.KeyContextType.RAWKEY) { + return keyContext.toKey(); + } + + LOG.info("Retrieving key at remote side."); + Key key = retrieveKey(conf, keyMeta); + if(key == null) + throw new IOException("Failed to get the key."); + + return key; + } catch(CryptoException e) { + throw new IOException(e); + } catch(HiveException e) { + throw new IOException(e); + } + } + + protected KeyContext getEncodedKeyContext(Key key) throws CryptoException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(outputStream); + + try { + key.write(out); + } catch(IOException e) { + throw new CryptoException(e); + } + + byte[] bytes = outputStream.toByteArray(); + + try { + String encodedKey = ObjectSerializer.encodeBytes(bytes); + KeyContext encodedKeyContext = KeyContext.refer( + encodedKey, key.getKeyType(), + key.getCryptographicAlgorithm(), key.getCryptographicLength()); + return encodedKeyContext; + } catch(UnsupportedEncodingException e) { + throw new CryptoException(e); + } + } + + public static class DirectKeyProvider implements KeyProvider { + private Configuration conf; + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + @Override + public Key[] getKeys(String[] keyNames) throws CryptoException { + if(keyNames == null) + return null; + + Key[] rawKeys= new Key[keyNames.length]; + + try { + for (int i = 0; i < keyNames.length; i++) { + byte[] keyBytes = ObjectSerializer.decodeBytes(keyNames[i]); + ByteArrayInputStream inputStream = new ByteArrayInputStream(keyBytes); + DataInputStream in = new DataInputStream(inputStream); + + Key key = new Key(); + key.readFields(in); + rawKeys[i] = key; + } + } catch(UnsupportedEncodingException e) { + throw new CryptoException("Error happened when decoding the key string.", e); + } catch(IOException e) { + throw new CryptoException("Error happened when decoding the key bytes.", e); + } + + return rawKeys; + } + + @Override + public void init(String parameters) throws CryptoException { + + } + } +} Index: shims/src/crypto/java/org/apache/hadoop/hive/crypto/shims/HiveCryptoContext.java =================================================================== --- shims/src/crypto/java/org/apache/hadoop/hive/crypto/shims/HiveCryptoContext.java (revision 0) +++ shims/src/crypto/java/org/apache/hadoop/hive/crypto/shims/HiveCryptoContext.java (revision 0) @@ -0,0 +1,347 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.crypto.shims; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.io.crypto.CryptoException; +import org.apache.hadoop.mapreduce.cryptocontext.provider.KeyProviderCryptoContextProvider; +import org.apache.hadoop.mapreduce.cryptocontext.provider.SecretsProtector; +import org.apache.hadoop.util.ReflectionUtils; + +public class HiveCryptoContext { + public static final Log LOG = LogFactory.getLog(HiveCryptoContext.class); + + public static final String HIVE_MAPRED_CRYPTO_CONTEXT_SECRETS = "hive.mapred.crypto.context.secrets"; + private KeyMap keyMap; + private Configuration conf; + + public HiveCryptoContext() { + + } + + public void init(Configuration conf) { + this.conf = conf; + keyMap = new KeyMap(); + } + + public Configuration getConf() { + return conf; + } + + public void load() throws IOException { + // load the persistent context + byte[] secrets = getSecrets(conf); + if(secrets == null || + secrets.length == 0) { + // no secrets data + return; + } + + try { + secrets = unwrapSecretsData(secrets, conf); + } catch(CryptoException e) { + throw new IOException("Failed to unwrap the secrets data.", e); + } + + keyMap = KeyMap.from(secrets); + } + + public void addKey(String id, KeyMeta keyMeta) { + keyMap.addKey(id, keyMeta); + } + + public KeyMeta getKey(String id) { + return keyMap.getKey(id); + } + + public void setupJob() throws HiveException { + setupJob(true); + } + + public void setupJob(boolean protectSecrets) throws HiveException { + byte[] secrets = new byte[0]; + + if(!keyMap.isEmpty()) { + try { + secrets = keyMap.toBytes(); + } catch(CryptoException e) { + throw new HiveException(e); + } + } + + try { + secrets = wrapSecretsData(secrets, conf, protectSecrets); + } catch(CryptoException e) { + throw new HiveException("Failed to wrap the secrets data.", e); + } + + try { + setSecrets(conf, secrets); + } catch(IOException e) { + throw new HiveException(e); + } + } + + public static void copyConf(Configuration from, Configuration to) throws IOException { + byte[] secrets = getSecrets(from); + if(secrets == null || + secrets.length == 0) + return; + + setSecrets(to, secrets); + } + + protected static byte[] getSecrets(Configuration conf) throws IOException { + byte[] secrets = null; + + // get the secrets directly from configuration key + String secretsString = conf.get(HIVE_MAPRED_CRYPTO_CONTEXT_SECRETS); + if(secretsString != null && + !secretsString.isEmpty()) { + try { + secrets = ObjectSerializer.decodeBytes(secretsString); + } catch(UnsupportedEncodingException e) { + throw new IOException(e); + } + } + + return secrets; + } + + protected static void setSecrets(Configuration conf, byte[] secrets) throws IOException { + // store in the conf + String secretsString = ""; + + if(secrets.length > 0) { + try { + secretsString = ObjectSerializer.encodeBytes(secrets); + } catch(UnsupportedEncodingException e) { + throw new IOException(e); + } + } + + conf.set(HIVE_MAPRED_CRYPTO_CONTEXT_SECRETS, secretsString); + } + + protected static byte[] wrapSecretsData(byte[] data, Configuration conf, boolean protectSecrets) + throws CryptoException { + if(data == null || + data.length == 0) + return data; + + ByteArrayOutputStream os = new ByteArrayOutputStream(); + + // protect secrets only when needed + SecretsProtector secretsProtector = null; + if(protectSecrets) + secretsProtector = getSecretsProtector(conf); + + try { + if(secretsProtector == null) { + LOG.info("Wrap secrets data without protection."); + os.write(0); + os.write(data); + } else { + LOG.info("Wrap secrets data with protection."); + os.write(1); + + //encrypted data + byte[] encryptedData = secretsProtector.wrap(data, 0, data.length); + os.write(encryptedData); + } + } catch(IOException e) { + throw new CryptoException(e); + } + + return os.toByteArray(); + } + + protected static byte[] unwrapSecretsData(byte[] data, Configuration conf) + throws CryptoException { + if(data == null || + data.length == 0) + return data; + + ByteArrayOutputStream os = new ByteArrayOutputStream(); + boolean encrypted = (data[0] == 0) ? false : true; + try { + if(encrypted) { + LOG.info("Unwrap secrets data with protection."); + SecretsProtector secretsProtector = getSecretsProtector(conf); + byte[] decryptedData = secretsProtector.unwrap(data, 1, data.length - 1); + os.write(decryptedData); + } else { + LOG.info("Unwrap secrets data without protection."); + os.write(data, 1, data.length - 1); + } + } catch(IOException e) { + throw new CryptoException(e); + } + + return os.toByteArray(); + } + + protected static SecretsProtector getSecretsProtector(Configuration conf) + throws CryptoException { + String secretsProtectorClassName = conf.get( + KeyProviderCryptoContextProvider.MAPRED_CRYPTO_SECRETS_PROTECTOR_CLASS); + if(secretsProtectorClassName == null || + secretsProtectorClassName.isEmpty()) + return null; + + Class secretsProtectorClass = + HiveCryptoShims.getClassByName(conf, secretsProtectorClassName, null, SecretsProtector.class); + if(secretsProtectorClass == null) + return null; + + SecretsProtector scretsProtector = + (SecretsProtector) ReflectionUtils.newInstance(secretsProtectorClass, conf); + return scretsProtector; + } + + /** + * KeyMap class defines a set of of keys that can be retrieved by an unique id. + */ + public static class KeyMap implements Writable { + private Map map = new HashMap(); + + /** + * A new key map object. + */ + public KeyMap() { + } + + /** + * Returns whether the key map is empty or not. + */ + public boolean isEmpty() { + return map.isEmpty(); + } + + /** + * Clear the key map + */ + public void clear() { + map.clear(); + } + + /** + * Add a key map. + * + * @param id The id for the key + * @param keyContext The KeyContextof the match + */ + public void addKey(String id, KeyMeta keyMeta) { + if(id == null || + keyMeta == null) + return; + + map.put(id, keyMeta); + } + + /** + * The function to do the match with a file and returns + * the KeyContext that matches. The default key context will be used + * if none of the rules match. + */ + public KeyMeta getKey(String id) { + if(id == null) + return null; + + return map.get(id); + } + + @Override + public void write(DataOutput out) throws IOException { + // write out the maps + WritableUtils.writeVInt(out, map.size()); + Set> entries = map.entrySet(); + for(Entry entry : entries) { + Text id = new Text(entry.getKey()); + id.write(out); + entry.getValue().write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + //read the maps + int size = WritableUtils.readVInt(in); + for(int i=0; i matches = new ArrayList(); + + /** + * A match entry class which associates the regular expression + * with a KeyMeta + */ + public static class Match implements Writable { + private Text regex; + private KeyMeta keyMeta; + + private Pattern pattern; + + public Match() { + } + + public Match(Text regex, KeyMeta keyMeta) { + super(); + this.regex = regex; + this.keyMeta = keyMeta; + + init(); + } + + private void init() { + pattern = Pattern.compile(regex.toString()); + } + + public Text getRegex() { + return regex; + } + + public KeyMeta getKeyMeta() { + return keyMeta; + } + + public boolean isValid() { + if(pattern == null || + keyMeta == null) + return false; + + return true; + } + + public boolean matches(String name) { + if(pattern == null) + return false; + + Matcher matcher = pattern.matcher(name); + return matcher.matches(); + } + + @Override + public void write(DataOutput out) throws IOException { + regex.write(out); + keyMeta.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + regex = new Text(); + regex.readFields(in); + + keyMeta = new KeyMeta(); + keyMeta.readFields(in); + + init(); + } + } + + /** + * A new file matches object. + */ + public KeyMapping() { + } + + /** + * A new file matches object with default key meta. + */ + public KeyMapping(KeyMeta defaultKey) { + this.defaultKeyMeta = defaultKey; + } + + /** + * Return the default key meta. + */ + public KeyMeta getDefaultKeyMeta() { + return defaultKeyMeta; + } + + /** + * Set the default key meta. + */ + public void setDefaultKeyMeta(KeyMeta defaultKeyMeta) { + this.defaultKeyMeta = defaultKeyMeta; + } + + /** + * Add a new match. + * + * @param regex The regular expression of the match + * @param keyMeta The KeyMetaof the match + */ + public void addMatch(String regex, KeyMeta keyMeta) { + if(regex == null || + keyMeta == null) + return; + + Match match = new Match(new Text(regex), keyMeta); + matches.add(match); + } + + /** + * Add a new match. + * + * @param match The match to be added + */ + public void addMatch(Match match) { + if(match == null || + !match.isValid()) + return; + + matches.add(match); + } + + /** + * Return the list of the matches. + */ + public List getMatches() { + return matches; + } + + /** + * The function to do the match with a file and returns + * the KeyMeta that matches. The default key meta will be used + * if none of the rules match. + */ + public KeyMeta matches(String fileName) { + if(fileName == null || + matches == null || + matches.isEmpty()) + return null; + + for(Match match : matches) { + if(match.matches(fileName)) + return match.getKeyMeta(); + } + + return null; + } + + @Override + public void write(DataOutput out) throws IOException { + //write the defaut key meta + if(defaultKeyMeta == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + defaultKeyMeta.write(out); + } + + // write out the matches + WritableUtils.writeVInt(out, matches.size()); + for(Match match : matches) { + match.write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + //read default key meta + defaultKeyMeta = null; + + boolean defaultKeyPresent = in.readBoolean(); + if(defaultKeyPresent) { + defaultKeyMeta = new KeyMeta(); + defaultKeyMeta.readFields(in); + } + + //read the matches + int size = WritableUtils.readVInt(in); + for(int i=0; i writer, + Configuration conf, Path path) throws IOException { + if(!(conf instanceof JobConf)) + return; + + // The codec context is default to null + CodecContext codecContext = null; + final CryptoContext cryptoContext = + CryptoContextHelper.getOutputCryptoContext((JobConf)conf, path); + if(cryptoContext != null) { + codecContext = new AvroCodecContext(cryptoContext); + } + + writer.setCodecContext(codecContext); + } + + public static Object getReaderCodecContext(Configuration conf, Path path) + throws IOException { + if(!(conf instanceof JobConf)) + return null; + + // The codec context is default to null + final CryptoContext cryptoContext = + CryptoContextHelper.getInputCryptoContext((JobConf)conf, path); + if(cryptoContext == null) + return null; + + return new AvroCodecContext(cryptoContext); + } + + public static DataFileReader createReader(SeekableInput sin, GenericDatumReader reader, + Object codecContext) throws IOException { + return new DataFileReader(sin, reader, (CodecContext)codecContext); + } +} \ No newline at end of file Index: shims/src/no-crypto/java/org/apache/hadoop/hive/crypto/shims/HiveCryptoShims.java =================================================================== --- shims/src/no-crypto/java/org/apache/hadoop/hive/crypto/shims/HiveCryptoShims.java (revision 0) +++ shims/src/no-crypto/java/org/apache/hadoop/hive/crypto/shims/HiveCryptoShims.java (revision 0) @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.crypto.shims; + +import java.io.IOException; + +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.CreateTableDesc; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.mapred.JobConf; +import java.util.List; + + +public class HiveCryptoShims { + public static void setupTableForEncryption(Configuration conf, CreateTableDesc crtTbl) throws HiveException { + // Do nothing without crypto + } + + public static void setupMapRedJob(JobConf job, MapredWork work) throws HiveException { + // Do nothing without crypto + } + + public static void setupMapRedJob(JobConf job, MapWork work) throws HiveException { + // Do nothing without crypto + } + + public static void setupFetchJob(JobConf job, FetchWork work) throws HiveException { + // Do nothing without crypto + } + + public static void setupMapRedLocalJob(JobConf job, MapredLocalWork work) throws HiveException { + // Do nothing without crypto + } + + public static void setInputCryptoContext(Configuration conf, Path file) throws IOException { + // Do nothing without crypto + } + + public static void setInputCryptoContext(Configuration conf, TableDesc tableDesc, Path file) throws IOException { + // Do nothing without crypto + } + + public static boolean setOutputCryptoContext(Configuration conf, TableDesc tableDesc, + Path outputPath) throws IOException { + // Do nothing without crypto + return false; + } + + public static boolean hasOutputCryptoContext(Configuration conf, TableDesc tableDesc) + throws HiveException { + // Do nothing without crypto + return false; + } + + public static void setInputCryptoContext(CompressionCodec codec, + Configuration conf, Path file) throws IOException { + // Do nothing without crypto + } + + public static void setOutputCryptoContext(CompressionCodec codec, + Configuration conf, Path file) throws IOException { + // Do nothing without crypto + } + + public static Object getOutputCompressContext(Configuration conf, Path file, String codec) + throws IOException { + // Do nothing without crypto + return null; + } + + public static Object getInputCompressContext(Configuration conf, Path file) + throws IOException { + // Do nothing without crypto + return null; + } + + public static void setCryptoContext(CompressionCodec codec, Object codecContext) { + // Do nothing without crypto + } + + public static void setCryptoContext(CompressionCodec codec, Compressor compressor) { + // Do nothing without crypto + } + + public static void setCryptoContext(CompressionCodec codec, Decompressor decompressor) { + // Do nothing without crypto + } + + public static void setFileInputFormatCodec(String codecClassName, Configuration conf) { + // Do nothing without crypto + } + + public static void setFileOutputFormatCodec(String codecClassName, + Configuration conf) { + // Do nothing without crypto + } + + public static Object getAvroReaderCodecContext(Configuration conf, Path path) + throws IOException { + // Do nothing without crypto + return null; + } + + public static DataFileReader createAvroReader(SeekableInput sin, + GenericDatumReader reader, + Object codecContext) throws IOException { + // Do nothing without crypto + return null; + } + + public static void setAvroWriterCodecContext(DataFileWriter writer, + Configuration conf, Path path) throws IOException { + // Do nothing without crypto + } + + public static void exportKeyStore(List
tableDescs, String location, + String password, Configuration conf) throws Throwable{ + + } +} Index: build-common.xml =================================================================== --- build-common.xml (revision 1525799) +++ build-common.xml (working copy) @@ -191,7 +191,7 @@ - + Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (working copy) @@ -519,7 +519,7 @@ } MergeWork work = new MergeWork(inputDirs, finalName, - hasDynamicPartitions, fsInputDesc.getDynPartCtx()); + hasDynamicPartitions, tblDesc, fsInputDesc.getDynPartCtx()); LinkedHashMap> pathToAliases = new LinkedHashMap>(); pathToAliases.put(inputDir, (ArrayList) inputDirs.clone()); Index: ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (working copy) @@ -411,7 +411,9 @@ .getColumnNamesFromFieldSchema(fieldSchemas), serdeConstants.LIST_COLUMN_TYPES, MetaStoreUtils .getColumnTypesFromFieldSchema(fieldSchemas), - serdeConstants.ESCAPE_CHAR, "\\")); + serdeConstants.ESCAPE_CHAR, "\\", + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_NAME, + ".intermediatefile.")); } /** Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (working copy) @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; @@ -76,6 +77,10 @@ HiveInputFormat.pushFilters(job, ts); } sink = work.getSink(); + + // setup the crypto context for fetch task + HiveCryptoShims.setupFetchJob(job, work); + fetch = new FetchOperator(work, job, source, getVirtualColumns(source)); source.initialize(conf, new ObjectInspector[]{fetch.getOutputObjectInspector()}); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (working copy) @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; @@ -433,6 +434,14 @@ } private void createBucketFiles(FSPaths fsp) throws HiveException { + // Check whether table is encrypted + boolean isEncrypted = HiveCryptoShims.hasOutputCryptoContext(jc, + conf.getTableInfo()); + if(isEncrypted) { + isCompressed = true; + conf.setCompressed(isCompressed); + } + try { int filesIdx = 0; Set seenBuckets = new HashSet(); @@ -481,12 +490,21 @@ } else { fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath; } + + JobConf jc_output = jc; + if(isCompressed) + jc_output = new JobConf(jc); + + // set output crypto context for outputs + HiveCryptoShims.setOutputCryptoContext(jc_output, + conf.getTableInfo(), fsp.outPaths[filesIdx]); + try { // The reason to keep these instead of using // OutputFormat.getRecordWriter() is that // getRecordWriter does not give us enough control over the file name that // we create. - String extension = Utilities.getFileExtension(jc, isCompressed, + String extension = Utilities.getFileExtension(jc_output, isCompressed, hiveOutputFormat); if (!bDynParts && !this.isSkewedStoredAsSubDirectories) { fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, parent, extension); @@ -510,11 +528,11 @@ } } - Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc); + Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc_output); // only create bucket files only if no dynamic partitions, // buckets of dynamic partitions will be created for each newly created partition fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter( - jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx], + jc_output, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx], reporter); // increment the CREATED_FILES counter if (reporter != null) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (working copy) @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader; import org.apache.hadoop.hive.ql.io.HiveInputFormat; @@ -354,6 +355,7 @@ } private RecordReader getRecordReader() throws Exception { + TableDesc tableDesc = null; if (currPath == null) { getNextPath(); if (currPath == null) { @@ -374,6 +376,8 @@ } else { partDesc = new PartitionDesc(currTbl, null); } + + tableDesc = partDesc.getTableDesc(); Class formatter = partDesc.getInputFileFormatClass(); inputFormat = getInputFormatFromCache(formatter, job); @@ -416,6 +420,9 @@ if (currPart != null) { getRowInspectorFromPartition(currPart, outputOI); } + + // set the crypto context for reader + HiveCryptoShims.setInputCryptoContext(job, tableDesc, null); } if (splitNum >= inputSplits.length) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -95,6 +95,7 @@ import org.apache.hadoop.hive.common.HiveInterruptCallback; import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; @@ -1130,6 +1131,10 @@ Class codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc); + + // Set the crypto context for output codec if needed + HiveCryptoShims.setOutputCryptoContext(codec, jc, null); + return codec.createOutputStream(out); } else { return (out); @@ -1231,6 +1236,10 @@ codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc); } + + // Set the crypto context for output codec if needed + HiveCryptoShims.setOutputCryptoContext(codec, jc, file); + return (SequenceFile.createWriter(fs, jc, file, keyClass, valClass, compressionType, codec)); } @@ -1254,6 +1263,9 @@ if (isCompressed) { codecClass = FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc); + + // Set the crypto context for output codec if needed + HiveCryptoShims.setOutputCryptoContext(codec, jc, file); } return new RCFile.Writer(fs, jc, file, null, codec); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy) @@ -56,6 +56,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.ProtectMode; import org.apache.hadoop.hive.metastore.TableType; @@ -3523,6 +3524,9 @@ Table tbl = db.newTable(crtTbl.getTableName()); if (crtTbl.getTblProps() != null) { + //set up table for properties if encryption is needed + HiveCryptoShims.setupTableForEncryption(conf, crtTbl); + tbl.getTTable().getParameters().putAll(crtTbl.getTblProps()); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (working copy) @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -421,6 +422,8 @@ Utilities.createTmpDirs(job, mWork); Utilities.createTmpDirs(job, rWork); + HiveCryptoShims.setupMapRedJob(job, work); + // Finally SUBMIT the JOB! rj = jc.submitJob(job); // replace it back Index: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (working copy) @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.io.CachingPrintStream; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; @@ -299,6 +300,8 @@ execContext.setLocalWork(work); boolean inputFileChangeSenstive = work.getInputFileChangeSensitive(); try { + // setup the crypto context for fetch task + HiveCryptoShims.setupMapRedLocalJob(job, work); initializeOperators(fetchOpJobConfMap); // for each big table's bucket, call the start forward Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; @@ -1339,13 +1340,25 @@ /** Create a new RCFile reader. */ public Reader(FileSystem fs, Path file, Configuration conf) throws IOException { + this(fs, file, conf, null); + } + + /** Create a new RCFile reader. */ + public Reader(FileSystem fs, Path file, Configuration conf, + Object codecContext) throws IOException { this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, 0, fs - .getFileStatus(file).getLen()); + .getFileStatus(file).getLen(), codecContext); } + + /** Create a new RCFile reader. */ + public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf, + long start, long length) throws IOException { + this(fs, file, bufferSize, conf, start, length, null); + } /** Create a new RCFile reader. */ public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf, - long start, long length) throws IOException { + long start, long length, Object codecContext) throws IOException { tolerateCorruptions = conf.getBoolean( TOLERATE_CORRUPTIONS_CONF_STR, false); conf.setInt("io.file.buffer.size", bufferSize); @@ -1357,10 +1370,10 @@ try { if (start > 0) { seek(0); - init(); + init(codecContext); seek(start); } else { - init(); + init(codecContext); } succeed = true; } finally { @@ -1457,7 +1470,7 @@ return fs.open(file, bufferSize); } - private void init() throws IOException { + private void init(Object codecContext) throws IOException { byte[] magic = new byte[MAGIC.length]; in.readFully(magic); @@ -1515,6 +1528,10 @@ throw new IllegalArgumentException( "Unknown codec: " + codecClassname, cnfe); } + + //set the crypto context if needed + HiveCryptoShims.setCryptoContext(codec, codecContext); + keyDecompressor = CodecPool.getDecompressor(codec); } Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (working copy) @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -267,7 +268,6 @@ boolean isCompressed = conf.getCompressed(); JobConf jc_output = jc; if (isCompressed) { - jc_output = new JobConf(jc); String codecStr = conf.getCompressCodec(); if (codecStr != null && !codecStr.trim().equals("")) { Class codec = (Class) Class @@ -280,6 +280,7 @@ SequenceFileOutputFormat.setOutputCompressionType(jc, style); } } + return getRecordWriter(jc_output, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), outPath, reporter); } catch (Exception e) { Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (working copy) @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer; import org.apache.hadoop.hive.ql.io.RCFile.Reader; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; @@ -97,7 +98,11 @@ Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); - this.in = new RCFile.Reader(fs, path, conf); + + // set the crypto context for the reader + Object codecContext = HiveCryptoShims.getInputCompressContext(conf, path); + + this.in = new RCFile.Reader(fs, path, conf, codecContext); this.end = split.getStart() + split.getLength(); this.conf = conf; this.split = split; Index: ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java (working copy) @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; @@ -69,6 +70,10 @@ CodecFactory factory = codecName.equals(DEFLATE_CODEC) ? CodecFactory.deflateCodec(level) : CodecFactory.fromString(codecName); + + // set writer codec context if needed + HiveCryptoShims.setAvroWriterCodecContext(dfw, jobConf, path); + dfw.setCodec(factory); } Index: ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java (working copy) @@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -79,7 +80,15 @@ gdr.setExpected(latest); } - this.reader = new DataFileReader(new FsInput(split.getPath(), job), gdr); + // set codec context if needed + Object codecContext = HiveCryptoShims.getAvroReaderCodecContext(job, split.getPath()); + if(codecContext != null) { + this.reader = HiveCryptoShims.createAvroReader( + new FsInput(split.getPath(), job), gdr, codecContext); + } else { + this.reader = new DataFileReader(new FsInput(split.getPath(), job), gdr); + } + this.reader.sync(split.getStart()); this.start = reader.tell(); this.stop = split.getStart() + split.getLength(); Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (working copy) @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.io.Writable; @@ -83,6 +84,10 @@ Class codecClass = getOutputCompressorClass(job, DefaultCodec.class); codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, job); } + + // Set the crypto context for output codec if needed + HiveCryptoShims.setOutputCryptoContext(codec, job, file); + final RCFile.Writer out = new RCFile.Writer(fs, job, file, progress, codec); return new RecordWriter() { Index: ql/src/java/org/apache/hadoop/hive/ql/io/CodecPool.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/CodecPool.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/io/CodecPool.java (working copy) @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; @@ -103,6 +104,8 @@ compressor = codec.createCompressor(); LOG.info("Got brand-new compressor"); } else { + // reset the crypto context of recycled compressor + HiveCryptoShims.setCryptoContext(codec, compressor); LOG.debug("Got recycled compressor"); } return compressor; @@ -125,6 +128,8 @@ decompressor = codec.createDecompressor(); LOG.info("Got brand-new decompressor"); } else { + // reset the crypto context of recycled decompressor + HiveCryptoShims.setCryptoContext(codec, decompressor); LOG.debug("Got recycled decompressor"); } return decompressor; Index: ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.CombineHiveInputSplit; import org.apache.hadoop.hive.shims.HadoopShims.InputSplitShim; @@ -61,6 +62,10 @@ FileSplit fsplit = new FileSplit(hsplit.getPaths()[partition], hsplit .getStartOffsets()[partition], hsplit.getLengths()[partition], hsplit .getLocations()); + + // setup the input file crypto context + HiveCryptoShims.setInputCryptoContext(jobConf, + hsplit.getPaths()[partition]); this.setRecordReader(inputFormat.getRecordReader(fsplit, jobConf, reporter)); Index: ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeRecordReader.java (working copy) @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.RCFile.Reader; import org.apache.hadoop.mapred.FileSplit; @@ -43,7 +44,11 @@ throws IOException { path = split.getPath(); FileSystem fs = path.getFileSystem(conf); - this.in = new RCFile.Reader(fs, path, conf); + + // set the crypto context for the reader + Object codecContext = HiveCryptoShims.getInputCompressContext(conf, path); + + this.in = new RCFile.Reader(fs, path, conf, codecContext); this.end = split.getStart() + split.getLength(); this.conf = conf; Index: ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java (working copy) @@ -55,17 +55,18 @@ } public MergeWork(List inputPaths, String outputDir) { - this(inputPaths, outputDir, false, null); + this(inputPaths, outputDir, false, null, null); } public MergeWork(List inputPaths, String outputDir, - boolean hasDynamicPartitions, DynamicPartitionCtx dynPartCtx) { + boolean hasDynamicPartitions, TableDesc tableDesc, DynamicPartitionCtx dynPartCtx) { super(); this.inputPaths = inputPaths; this.outputDir = outputDir; this.hasDynamicPartitions = hasDynamicPartitions; this.dynPartCtx = dynPartCtx; PartitionDesc partDesc = new PartitionDesc(); + partDesc.setTableDesc(tableDesc); partDesc.setInputFileFormatClass(RCFileBlockMergeInputFormat.class); if(this.getPathToPartitionInfo() == null) { this.setPathToPartitionInfo(new LinkedHashMap()); Index: ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (working copy) @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; @@ -209,6 +210,8 @@ // make this client wait if job trcker is not behaving well. Throttle.checkJobTracker(job, LOG); + + HiveCryptoShims.setupMapRedJob(job, work); // Finally SUBMIT the JOB! rj = jc.submitJob(job); Index: ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (revision 1525799) +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (working copy) @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.crypto.shims.HiveCryptoShims; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; @@ -41,6 +42,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.shims.ShimLoader; @@ -230,6 +232,11 @@ pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath() .toString(), hsplit.getPath().toUri().getPath(), nonNative); + + // setup the input file crypto context + TableDesc tableDesc = (part != null) ? part.getTableDesc() : null; + HiveCryptoShims.setInputCryptoContext(job, tableDesc, + hsplit.getPath()); InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); RecordReader innerReader = null; Index: ql/build.xml =================================================================== --- ql/build.xml (revision 1525799) +++ ql/build.xml (working copy) @@ -37,6 +37,15 @@ + + + + + + + + + @@ -190,7 +199,7 @@