Index: src/test/org/apache/hcatalog/sequencefile/TestSequenceFileReadWrite.java =================================================================== --- src/test/org/apache/hcatalog/sequencefile/TestSequenceFileReadWrite.java (revision 0) +++ src/test/org/apache/hcatalog/sequencefile/TestSequenceFileReadWrite.java (working copy) @@ -0,0 +1,112 @@ +package org.apache.hcatalog.sequencefile; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Iterator; +import java.util.Properties; + +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hcatalog.MiniCluster; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.UDFContext; +import org.junit.Test; + +public class TestSequenceFileReadWrite { + + private static MiniCluster cluster = MiniCluster.buildCluster(); + private static Driver driver; + private static Properties props; + private static PigServer server; + private static final String basicFile = "/tmp/basic.input.data"; + private static String fullFileNameBasic; + private static String[] input; + + public void Initialize() throws Exception { + + HiveConf hiveConf = new HiveConf(this.getClass()); + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + driver = new Driver(hiveConf); + SessionState.start(new CliSessionState(hiveConf)); + props = new Properties(); + props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name")); + fullFileNameBasic = cluster.getProperties().getProperty("fs.default.name") + basicFile; + + int numRows = 3; + input = new String[numRows]; + for(int i = 0; i < numRows; i++) { + String col1 = "a" + i; + String col2 = "b" + i; + input[i] = i + "," + col1 + "," + col2; + } + + MiniCluster.deleteFile(cluster, basicFile); + MiniCluster.createInputFile(cluster, basicFile, input); + server = new PigServer(ExecType.LOCAL, props); + } + + @Test + public void testSequenceTableWriteRead() throws Exception{ + Initialize(); + String createTable = "CREATE TABLE demo_table(a0 int, a1 String, a2 String) STORED AS SEQUENCEFILE"; + driver.run("drop table demo_table"); + int retCode1 = driver.run(createTable).getResponseCode(); + assertTrue(retCode1 == 0); + + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '"+fullFileNameBasic+"' using PigStorage(',') as (a0:int,a1:chararray,a2:chararray);"); + server.registerQuery("store A into 'demo_table' using org.apache.hcatalog.pig.HCatStorer();"); + server.executeBatch(); + + server.registerQuery("B = load 'demo_table' using org.apache.hcatalog.pig.HCatLoader();"); + Iterator XIter = server.openIterator("B"); + int numTuplesRead = 0; + while( XIter.hasNext() ){ + Tuple t = XIter.next(); + assertEquals(3,t.size()); + assertEquals(t.get(0).toString(),""+ numTuplesRead); + assertEquals(t.get(1).toString(),"a" + numTuplesRead); + assertEquals(t.get(2).toString(),"b" + numTuplesRead); + numTuplesRead++; + } + assertEquals(input.length,numTuplesRead); + } + + @Test + public void testTextTableWriteRead() throws Exception{ + Initialize(); + String createTable = "CREATE TABLE demo_table_1(a0 int, a1 String, a2 String) STORED AS TEXTFILE"; + driver.run("drop table demo_table_1"); + int retCode1 = driver.run(createTable).getResponseCode(); + assertTrue(retCode1 == 0); + + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '"+fullFileNameBasic+"' using PigStorage(',') as (a0:int,a1:chararray,a2:chararray);"); + server.registerQuery("store A into 'demo_table_1' using org.apache.hcatalog.pig.HCatStorer();"); + server.executeBatch(); + + server.registerQuery("B = load 'demo_table_1' using org.apache.hcatalog.pig.HCatLoader();"); + Iterator XIter = server.openIterator("B"); + int numTuplesRead = 0; + while( XIter.hasNext() ){ + Tuple t = XIter.next(); + assertEquals(3,t.size()); + assertEquals(t.get(0).toString(),""+ numTuplesRead); + assertEquals(t.get(1).toString(),"a" + numTuplesRead); + assertEquals(t.get(2).toString(),"b" + numTuplesRead); + numTuplesRead++; + } + assertEquals(input.length,numTuplesRead); + } + + +} \ No newline at end of file Index: src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (revision 1299113) +++ src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (working copy) @@ -18,11 +18,18 @@ package org.apache.hcatalog.mapreduce; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.HCatMapRedUtil; @@ -38,12 +45,6 @@ import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** * Part of the FileOutput*Container classes * See {@link FileOutputFormatContainer} for more information @@ -72,6 +73,8 @@ private OutputJobInfo jobInfo; private TaskAttemptContext context; + private final static BytesWritable EMPTY = new BytesWritable(new byte[0]); + /** * @param baseWriter RecordWriter to contain * @param context current TaskAttemptContext @@ -246,7 +249,7 @@ //The key given by user is ignored try { - localWriter.write(null, localSerDe.serialize(value.getAll(), localObjectInspector)); + localWriter.write(EMPTY, localSerDe.serialize(value.getAll(), localObjectInspector)); } catch (SerDeException e) { throw new IOException("Failed to serialize object",e); } Index: src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (revision 1299126) +++ src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (working copy) @@ -13,6 +13,7 @@ import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; @@ -77,13 +78,13 @@ } @Override - public void configureInputJobProperties(TableDesc tableDesc, + public void configureInputJobProperties(TableDesc tableDesc, Map jobProperties) { } @Override - public void configureOutputJobProperties(TableDesc tableDesc, + public void configureOutputJobProperties(TableDesc tableDesc, Map jobProperties) { try { OutputJobInfo jobInfo = (OutputJobInfo) @@ -96,7 +97,7 @@ // For dynamic partitioned writes without all keyvalues specified, // we create a temp dir for the associated write job if (dynHash != null){ - parentPath = new Path(parentPath, + parentPath = new Path(parentPath, DYNTEMP_DIR_NAME+dynHash).toString(); } @@ -111,7 +112,7 @@ List values = new ArrayList(); //sort the cols and vals - for(String name: + for(String name: jobInfo.getTableInfo(). getPartitionColumns().getFieldNames()) { String value = jobInfo.getPartitionValues().get(name); @@ -127,7 +128,7 @@ jobInfo.setLocation(new Path(parentPath,outputLocation).toString()); //only set output dir if partition is fully materialized - if(jobInfo.getPartitionValues().size() + if(jobInfo.getPartitionValues().size() == jobInfo.getTableInfo().getPartitionColumns().size()) { jobProperties.put("mapred.output.dir", jobInfo.getLocation()); } @@ -138,6 +139,8 @@ jobInfo.getOutputSchema().getFields().size())); jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); + // This is required for writing null as key for file based tables. + jobProperties.put("mapred.output.key.class", BytesWritable.class.getName()); } catch (IOException e) { throw new IllegalStateException("Failed to set output path",e); } @@ -161,7 +164,7 @@ } @Override - public HiveAuthorizationProvider getAuthorizationProvider() + public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException { return new DefaultHiveAuthorizationProvider(); }