diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderEncryption.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderEncryption.java index 4661397..de5ca9e 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderEncryption.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderEncryption.java @@ -21,20 +21,25 @@ import java.io.File; import java.io.IOException; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.IOConstants; @@ -47,10 +52,18 @@ import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hive.hcatalog.HcatTestUtils; import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.Pair; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.data.Tuple; @@ -87,6 +100,7 @@ private boolean isEncryptionTestEnabled = true; private Driver driver; private Map> basicInputData; + private static List readRecords = new ArrayList(); private static final Map> DISABLED_STORAGE_FORMATS = new HashMap>() {{ @@ -300,7 +314,7 @@ private String getKeyProviderURI() { } @Test - public void testReadDataFromEncryptedHiveTable() throws IOException { + public void testReadDataFromEncryptedHiveTableByPig() throws IOException { assumeTrue(isEncryptionTestEnabled); assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS)); PigServer server = new PigServer(ExecType.LOCAL); @@ -322,6 +336,68 @@ public void testReadDataFromEncryptedHiveTable() throws IOException { assertEquals("failed with storage format: " + this.storageFormat, basicInputData.size(), numTuplesRead); } + @Test + public void testReadDataFromEncryptedHiveTableByHCatMR() throws Exception { + assumeTrue(isEncryptionTestEnabled); + assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS)); + + readRecords.clear(); + Configuration conf = new Configuration(); + Job job = new Job(conf, "hcat mapreduce read encryption test"); + job.setJarByClass(this.getClass()); + job.setMapperClass(TestHCatLoaderEncryption.MapRead.class); + + // input/output settings + job.setInputFormatClass(HCatInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + + HCatInputFormat.setInput(job, MetaStoreUtils.DEFAULT_DATABASE_NAME, ENCRYPTED_TABLE, null); + + job.setMapOutputKeyClass(BytesWritable.class); + job.setMapOutputValueClass(Text.class); + + job.setNumReduceTasks(0); + + FileSystem fs = new LocalFileSystem(); + Path path = new Path(TEST_DATA_DIR + "/testHCatMREncryptionOutput"); + if (fs.exists(path)) { + fs.delete(path, true); + } + + TextOutputFormat.setOutputPath(job, path); + + job.waitForCompletion(true); + + int numTuplesRead = 0; + for (HCatRecord hCatRecord : readRecords) { + assertEquals(2, hCatRecord.size()); + assertNotNull(hCatRecord.get(0)); + assertNotNull(hCatRecord.get(1)); + assertTrue(hCatRecord.get(0).getClass() == Integer.class); + assertTrue(hCatRecord.get(1).getClass() == String.class); + assertEquals(hCatRecord.get(0), basicInputData.get(numTuplesRead).first); + assertEquals(hCatRecord.get(1), basicInputData.get(numTuplesRead).second); + numTuplesRead++; + } + assertEquals("failed HCat MR read with storage format: " + this.storageFormat, + basicInputData.size(), numTuplesRead); + } + + public static class MapRead extends Mapper { + + @Override + public void map(WritableComparable key, HCatRecord value, Context context) + throws IOException, InterruptedException { + try { + readRecords.add(value); + } catch (Exception e) { + // Print since otherwise exception is lost. + e.printStackTrace(); + throw new IOException(e); + } + } + } + @After public void tearDown() throws Exception { try {