Index: src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java =================================================================== --- src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java (revision 0) +++ src/test/org/apache/hcatalog/mapreduce/TestSequenceFileReadWrite.java (working copy) @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hcatalog.MiniCluster; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.UDFContext; +import org.junit.Test; + +public class TestSequenceFileReadWrite { + + private static MiniCluster cluster = MiniCluster.buildCluster(); + private static Driver driver; + private static Properties props; + private static PigServer server; + private static final String basicFile = "/tmp/basic.input.data"; + private static String fullFileNameBasic; + private static String[] input; + private static HiveConf hiveConf; + + public void Initialize() throws Exception { + hiveConf = new HiveConf(this.getClass()); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, + "false"); + driver = new Driver(hiveConf); + SessionState.start(new CliSessionState(hiveConf)); + props = new Properties(); + props.setProperty("fs.default.name", cluster.getProperties() + .getProperty("fs.default.name")); + fullFileNameBasic = cluster.getProperties().getProperty( + "fs.default.name") + + basicFile; + + int numRows = 3; + input = new String[numRows]; + for (int i = 0; i < numRows; i++) { + String col1 = "a" + i; + String col2 = "b" + i; + input[i] = i + "," + col1 + "," + col2; + } + MiniCluster.deleteFile(cluster, basicFile); + MiniCluster.createInputFile(cluster, basicFile, input); + server = new PigServer(ExecType.LOCAL, props); + } + + @Test + public void testSequenceTableWriteRead() throws Exception{ + Initialize(); + String createTable = "CREATE TABLE demo_table(a0 int, a1 String, a2 String) STORED AS SEQUENCEFILE"; + driver.run("drop table demo_table"); + int retCode1 = driver.run(createTable).getResponseCode(); + assertTrue(retCode1 == 0); + + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '" + + fullFileNameBasic + + "' using PigStorage(',') as (a0:int,a1:chararray,a2:chararray);"); + server.registerQuery("store A into 'demo_table' using org.apache.hcatalog.pig.HCatStorer();"); + server.executeBatch(); + + server.registerQuery("B = load 'demo_table' using org.apache.hcatalog.pig.HCatLoader();"); + Iterator XIter = server.openIterator("B"); + int numTuplesRead = 0; + while (XIter.hasNext()) { + Tuple t = XIter.next(); + assertEquals(3, t.size()); + assertEquals(t.get(0).toString(), "" + numTuplesRead); + assertEquals(t.get(1).toString(), "a" + numTuplesRead); + assertEquals(t.get(2).toString(), "b" + numTuplesRead); + numTuplesRead++; + } + assertEquals(input.length, numTuplesRead); + } + + @Test + public void testTextTableWriteRead() throws Exception{ + Initialize(); + String createTable = "CREATE TABLE demo_table_1(a0 int, a1 String, a2 String) STORED AS TEXTFILE"; + driver.run("drop table demo_table_1"); + int retCode1 = driver.run(createTable).getResponseCode(); + assertTrue(retCode1 == 0); + + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '" + + fullFileNameBasic + + "' using PigStorage(',') as (a0:int,a1:chararray,a2:chararray);"); + server.registerQuery("store A into 'demo_table_1' using org.apache.hcatalog.pig.HCatStorer();"); + server.executeBatch(); + + server.registerQuery("B = load 'demo_table_1' using org.apache.hcatalog.pig.HCatLoader();"); + Iterator XIter = server.openIterator("B"); + int numTuplesRead = 0; + while (XIter.hasNext()) { + Tuple t = XIter.next(); + assertEquals(3, t.size()); + assertEquals(t.get(0).toString(), "" + numTuplesRead); + assertEquals(t.get(1).toString(), "a" + numTuplesRead); + assertEquals(t.get(2).toString(), "b" + numTuplesRead); + numTuplesRead++; + } + assertEquals(input.length, numTuplesRead); + } + + @Test + public void testSequenceTableWriteReadMR() throws Exception{ + Initialize(); + String createTable = "CREATE TABLE demo_table_2(a0 int, a1 String, a2 String) STORED AS SEQUENCEFILE"; + driver.run("drop table demo_table_2"); + int retCode1 = driver.run(createTable).getResponseCode(); + assertTrue(retCode1 == 0); + + Configuration conf = new Configuration(); + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, + HCatUtil.serialize(hiveConf.getAllProperties())); + Job job = new Job(conf, "Write-hcat-seq-table"); + job.setJarByClass(TestSequenceFileReadWrite.class); + + job.setMapperClass(Map.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(DefaultHCatRecord.class); + job.setInputFormatClass(TextInputFormat.class); + TextInputFormat.setInputPaths(job, this.fullFileNameBasic); + + HCatOutputFormat.setOutput(job, OutputJobInfo.create( + MetaStoreUtils.DEFAULT_DATABASE_NAME, "demo_table_2", null)); + job.setOutputFormatClass(HCatOutputFormat.class); + HCatOutputFormat.setSchema(job, getSchema()); + job.setNumReduceTasks(0); + assertTrue(job.waitForCompletion(true)); + new FileOutputCommitterContainer(job, null).cleanupJob(job); + assertTrue(job.isSuccessful()); + + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("C = load 'default.demo_table_2' using org.apache.hcatalog.pig.HCatLoader();"); + server.executeBatch(); + Iterator XIter = server.openIterator("C"); + int numTuplesRead = 0; + while (XIter.hasNext()) { + Tuple t = XIter.next(); + assertEquals(3, t.size()); + assertEquals(t.get(0).toString(), "" + numTuplesRead); + assertEquals(t.get(1).toString(), "a" + numTuplesRead); + assertEquals(t.get(2).toString(), "b" + numTuplesRead); + numTuplesRead++; + } + assertEquals(input.length, numTuplesRead); + } + + + public static class Map extends Mapper{ + + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String[] cols = value.toString().split(","); + DefaultHCatRecord record = new DefaultHCatRecord(3); + record.set(0,Integer.parseInt(cols[0])); + record.set(1,cols[1]); + record.set(2,cols[2]); + context.write(NullWritable.get(), record); + } + } + + private HCatSchema getSchema() throws HCatException { + HCatSchema schema = new HCatSchema(new ArrayList()); + schema.append(new HCatFieldSchema("a0", HCatFieldSchema.Type.INT, + "")); + schema.append(new HCatFieldSchema("a1", + HCatFieldSchema.Type.STRING, "")); + schema.append(new HCatFieldSchema("a2", + HCatFieldSchema.Type.STRING, "")); + return schema; + } + +} \ No newline at end of file Index: src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (revision 1300370) +++ src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (working copy) @@ -18,11 +18,18 @@ package org.apache.hcatalog.mapreduce; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.HCatMapRedUtil; @@ -38,12 +45,6 @@ import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** * Part of the FileOutput*Container classes * See {@link FileOutputFormatContainer} for more information @@ -246,7 +247,7 @@ //The key given by user is ignored try { - localWriter.write(null, localSerDe.serialize(value.getAll(), localObjectInspector)); + localWriter.write(NullWritable.get(), localSerDe.serialize(value.getAll(), localObjectInspector)); } catch (SerDeException e) { throw new IOException("Failed to serialize object",e); } Index: src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (revision 1300370) +++ src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (working copy) @@ -18,11 +18,13 @@ package org.apache.hcatalog.mapreduce; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.ql.io.RCFile; @@ -31,18 +33,13 @@ import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - /** * This class is used to encapsulate the InputFormat, OutputFormat and SerDe * artifacts of tables which don't define a SerDe. This StorageHandler assumes @@ -95,13 +92,13 @@ } @Override - public void configureInputJobProperties(TableDesc tableDesc, + public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { } @Override - public void configureOutputJobProperties(TableDesc tableDesc, + public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) { try { OutputJobInfo jobInfo = (OutputJobInfo) @@ -114,7 +111,7 @@ // For dynamic partitioned writes without all keyvalues specified, // we create a temp dir for the associated write job if (dynHash != null){ - parentPath = new Path(parentPath, + parentPath = new Path(parentPath, DYNTEMP_DIR_NAME+dynHash).toString(); } @@ -129,7 +126,7 @@ List values = new ArrayList(); //sort the cols and vals - for(String name: + for(String name: jobInfo.getTableInfo(). getPartitionColumns().getFieldNames()) { String value = jobInfo.getPartitionValues().get(name); @@ -145,7 +142,7 @@ jobInfo.setLocation(new Path(parentPath,outputLocation).toString()); //only set output dir if partition is fully materialized - if(jobInfo.getPartitionValues().size() + if(jobInfo.getPartitionValues().size() == jobInfo.getTableInfo().getPartitionColumns().size()) { jobProperties.put("mapred.output.dir", jobInfo.getLocation()); } @@ -156,6 +153,12 @@ jobInfo.getOutputSchema().getFields().size())); jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); + + // This is required for writing null as key for file based tables. + jobProperties.put("mapred.output.key.class", NullWritable.class.getName()); + SerDe sd = (SerDe)ReflectionUtils.newInstance(this.serDeClass, conf); + jobProperties.put("mapred.output.value.class", sd.getSerializedClass().getName()); + } catch (IOException e) { throw new IllegalStateException("Failed to set output path",e); } @@ -179,7 +182,7 @@ } @Override - public HiveAuthorizationProvider getAuthorizationProvider() + public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException { return new DefaultHiveAuthorizationProvider(); }