Index: src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java =================================================================== --- src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java (revision 1187028) +++ src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java (working copy) @@ -21,7 +21,9 @@ import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileWriter; import java.io.IOException; +import java.io.PrintWriter; import java.util.Iterator; import java.util.Map; @@ -47,6 +49,7 @@ import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; import org.apache.hcatalog.pig.HCatLoader; import org.apache.hcatalog.pig.drivers.PigStorageInputDriver; +import org.apache.hcatalog.pig.drivers.PigStorageOutputDriver; import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.data.Tuple; @@ -100,46 +103,69 @@ assertNull(resp.getErrorMessage()); resp = hcatDriver.run("alter table junit_pigstorage partition (b='2010-10-10') set fileformat inputformat '" + RCFileInputFormat.class.getName() - +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+PigStorageInputDriver.class.getName()+"' outputdriver 'non-existent'"); + +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+PigStorageInputDriver.class.getName()+"' outputdriver '" + PigStorageOutputDriver.class.getName() + "'"); assertEquals(0, resp.getResponseCode()); assertNull(resp.getErrorMessage()); resp = hcatDriver.run("desc extended junit_pigstorage partition (b='2010-10-10')"); assertEquals(0, resp.getResponseCode()); assertNull(resp.getErrorMessage()); + + resp = hcatDriver.run("alter table junit_pigstorage set fileformat inputformat '" + RCFileInputFormat.class.getName() + +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+PigStorageInputDriver.class.getName()+"' outputdriver '" + PigStorageOutputDriver.class.getName() + "'"); + assertEquals(0, resp.getResponseCode()); + assertNull(resp.getErrorMessage()); PigServer server = new PigServer(ExecType.LOCAL, hcatConf.getAllProperties()); UDFContext.getUDFContext().setClientSystemProps(); server.registerQuery(" a = load 'junit_pigstorage' using "+HCatLoader.class.getName()+";"); Iterator itr = server.openIterator("a"); - DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(new File(anyExistingFileInCurDir)))); - while(itr.hasNext()){ - Tuple t = itr.next(); - assertEquals(2, t.size()); - if(t.get(0) != null) { - // If underlying data-field is empty. PigStorage inserts null instead - // of empty String objects. - assertTrue(t.get(0) instanceof String); - assertEquals(stream.readLine(), t.get(0)); - } - else{ - assertTrue(stream.readLine().isEmpty()); - } - assertTrue(t.get(1) instanceof String); - - assertEquals("2010-10-10", t.get(1)); - } - assertEquals(0,stream.available()); - stream.close(); + boolean result = compareWithFile(itr, anyExistingFileInCurDir, 2, "2010-10-10"); + assertTrue(result); + + server.registerQuery("a = load '"+tblPath.toString()+"' using PigStorage() as (a:chararray);"); + server.store("a", "junit_pigstorage", HCatStorer.class.getName() + "('b=2010-10-11')"); + + server.registerQuery("a = load '/tmp/hcat_junit_warehouse/junit_pigstorage/b=2010-10-11' using PigStorage() as (a:chararray);"); + itr = server.openIterator("a"); + result = compareWithFile(itr, anyExistingFileInCurDir, 1, "2010-10-11"); + assertTrue(result); + hcatDriver.run("drop table junit_pigstorage"); } + + private boolean compareWithFile(Iterator itr, String factFile, int numColumn, String key) throws IOException { + DataInputStream stream = new DataInputStream(new BufferedInputStream(new FileInputStream(new File(factFile)))); + while(itr.hasNext()){ + Tuple t = itr.next(); + assertEquals(numColumn, t.size()); + if(t.get(0) != null) { + // If underlying data-field is empty. PigStorage inserts null instead + // of empty String objects. + assertTrue(t.get(0) instanceof String); + assertEquals(stream.readLine(), t.get(0)); + } + else{ + assertTrue(stream.readLine().isEmpty()); + } + + if (numColumn>1) { + // The second column must be key + assertTrue(t.get(1) instanceof String); + assertEquals(key, t.get(1)); + } + } + assertEquals(0,stream.available()); + stream.close(); + return true; + } public void testDelim() throws MetaException, TException, UnknownTableException, NoSuchObjectException, InvalidOperationException, IOException, CommandNeedRetryException{ hcatDriver.run("drop table junit_pigstorage_delim"); CommandProcessorResponse resp; - String createTable = "create table junit_pigstorage_delim (a string) partitioned by (b string) stored as RCFILE"; + String createTable = "create table junit_pigstorage_delim (a0 string, a1 string) partitioned by (b string) stored as RCFILE"; resp = hcatDriver.run(createTable); @@ -151,7 +177,7 @@ assertNull(resp.getErrorMessage()); resp = hcatDriver.run("alter table junit_pigstorage_delim partition (b='2010-10-10') set fileformat inputformat '" + RCFileInputFormat.class.getName() - +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+MyPigStorageDriver.class.getName()+"' outputdriver 'non-existent'"); + +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+MyPigStorageDriver.class.getName()+"' outputdriver '" + PigStorageOutputDriver.class.getName() + "'"); Partition part = msc.getPartition(MetaStoreUtils.DEFAULT_DATABASE_NAME, "junit_pigstorage_delim", "b=2010-10-10"); Map partParms = part.getParameters(); @@ -165,5 +191,35 @@ try{ server.openIterator("a"); }catch(FrontendException fe){} + + resp = hcatDriver.run("alter table junit_pigstorage_delim set fileformat inputformat '" + RCFileInputFormat.class.getName() + +"' outputformat '"+RCFileOutputFormat.class.getName()+"' inputdriver '"+MyPigStorageDriver.class.getName()+"' outputdriver '" + PigStorageOutputDriver.class.getName() + "'"); + assertEquals(0, resp.getResponseCode()); + assertNull(resp.getErrorMessage()); + resp = hcatDriver.run("alter table junit_pigstorage_delim set TBLPROPERTIES ('hcat.pigstorage.delim'=':')"); + assertEquals(0, resp.getResponseCode()); + assertNull(resp.getErrorMessage()); + + File inputFile = File.createTempFile("hcat_test", ""); + PrintWriter p = new PrintWriter(new FileWriter(inputFile)); + p.println("1\t2"); + p.println("3\t4"); + p.close(); + server.registerQuery("a = load '"+inputFile.toString()+"' using PigStorage() as (a0:chararray, a1:chararray);"); + server.store("a", "junit_pigstorage_delim", HCatStorer.class.getName() + "('b=2010-10-11')"); + + server.registerQuery("a = load '/tmp/hcat_junit_warehouse/junit_pigstorage_delim/b=2010-10-11' using PigStorage() as (a:chararray);"); + Iterator itr = server.openIterator("a"); + + assertTrue(itr.hasNext()); + Tuple t = itr.next(); + assertTrue(t.get(0).equals("1:2")); + + assertTrue(itr.hasNext()); + t = itr.next(); + assertTrue(t.get(0).equals("3:4")); + + assertFalse(itr.hasNext()); + inputFile.delete(); } } Index: src/java/org/apache/hcatalog/pig/HCatLoader.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatLoader.java (revision 1187028) +++ src/java/org/apache/hcatalog/pig/HCatLoader.java (working copy) @@ -53,6 +53,10 @@ private String hcatServerUri; private String partitionFilterString; private final PigHCatUtil phutil = new PigHCatUtil(); + + // Signature for wrapped loader, see comments in LoadFuncBasedInputDriver.initialize + final public static String INNER_SIGNATURE = "hcatloader.inner.signature"; + final public static String INNER_SIGNATURE_PREFIX = "hcatloader_inner_signature"; @Override public InputFormat getInputFormat() throws IOException { @@ -70,6 +74,7 @@ @Override public void setLocation(String location, Job job) throws IOException { + job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + signature); Pair dbTablePair = PigHCatUtil.getDBTableNames(location); dbName = dbTablePair.first; tableName = dbTablePair.second; Index: src/java/org/apache/hcatalog/pig/drivers/PigStorageOutputDriver.java =================================================================== --- src/java/org/apache/hcatalog/pig/drivers/PigStorageOutputDriver.java (revision 0) +++ src/java/org/apache/hcatalog/pig/drivers/PigStorageOutputDriver.java (revision 0) @@ -0,0 +1,22 @@ +package org.apache.hcatalog.pig.drivers; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.pig.builtin.PigStorage; + +public class PigStorageOutputDriver extends StoreFuncBasedOututDriver { + + public PigStorageOutputDriver() { + // This is a temporary StoreFunc only to check the existence of output files. + // We will create another StoreFunc with the right delimit in initialize + sf = new PigStorage(); + } + @Override + public void initialize(JobContext jobContext, Properties storageDriverArgs) throws IOException { + sf = storageDriverArgs.containsKey(PigStorageInputDriver.delim) ? + new PigStorage(storageDriverArgs.getProperty(PigStorageInputDriver.delim)) : new PigStorage(); + super.initialize(jobContext, storageDriverArgs); + } +} Index: src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java =================================================================== --- src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java (revision 0) +++ src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java (revision 0) @@ -0,0 +1,90 @@ +package org.apache.hcatalog.pig.drivers; + +import java.io.IOException; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hcatalog.pig.PigHCatUtil; +import org.apache.pig.ResourceSchema; +import org.apache.pig.StoreFunc; +import org.apache.pig.StoreFuncInterface; +import org.apache.pig.StoreMetadata; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.schema.Schema; + +public class StoreFuncBasedOutputFormat extends + OutputFormat { + + private final StoreFuncInterface storeFunc; + + public StoreFuncBasedOutputFormat(StoreFuncInterface storeFunc) { + + this.storeFunc = storeFunc; + } + + @Override + public void checkOutputSpecs(JobContext jobContext) throws IOException, + InterruptedException { + OutputFormat outputFormat = storeFunc.getOutputFormat(); + outputFormat.checkOutputSpecs(jobContext); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext ctx) + throws IOException, InterruptedException { + OutputFormat outputFormat = storeFunc.getOutputFormat(); + return outputFormat.getOutputCommitter(ctx); + } + + @Override + public RecordWriter getRecordWriter( + TaskAttemptContext ctx) throws IOException, InterruptedException { + RecordWriter writer = storeFunc.getOutputFormat().getRecordWriter(ctx); + String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); + OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(serializedJobInfo); + ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema()); + String location = outputJobInfo.getLocation(); + return new StoreFuncBasedRecordWriter(writer, storeFunc, location, rs); + } + + static class StoreFuncBasedRecordWriter extends RecordWriter { + private final RecordWriter writer; + private final StoreFuncInterface storeFunc; + private final ResourceSchema schema; + private final String location; + + public StoreFuncBasedRecordWriter(RecordWriter writer, StoreFuncInterface sf, String location, ResourceSchema rs) throws IOException { + this.writer = writer; + this.storeFunc = sf; + this.schema = rs; + this.location = location; + storeFunc.prepareToWrite(writer); + } + + @Override + public void close(TaskAttemptContext ctx) throws IOException, + InterruptedException { + if (storeFunc instanceof StoreMetadata) { + if (schema != null) { + ((StoreMetadata) storeFunc).storeSchema( + schema, location, new Job(ctx.getConfiguration()) ); + } + } + writer.close(ctx); + } + + @Override + public void write(BytesWritable key, Tuple value) throws IOException, + InterruptedException { + storeFunc.putNext(value); + } + } +} Index: src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java =================================================================== --- src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java (revision 1187028) +++ src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java (working copy) @@ -23,11 +23,13 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; +import org.apache.pig.LoadMetadata; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.builtin.PigStorage; @@ -42,10 +44,15 @@ private final LoadFunc loadFunc; private static ResourceFieldSchema[] fields; - public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema) { + public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema, String location) throws IOException { this.loadFunc = loadFunc; fields = dataSchema.getFields(); + + // Simulate the frontend call sequence for LoadFunc, in case LoadFunc need to store something into UDFContext (as JsonLoader does) + if (loadFunc instanceof LoadMetadata) { + ((LoadMetadata)loadFunc).getSchema(location, new Job()); + } } @Override @@ -59,7 +66,6 @@ @Override public List getSplits(JobContext jobContext) throws IOException, InterruptedException { - try { InputFormat inpFormat = loadFunc.getInputFormat(); return inpFormat.getSplits(jobContext); @@ -103,51 +109,57 @@ for(int i = 0; i < tupleFromDisk.size(); i++) { - DataByteArray dba = (DataByteArray) tupleFromDisk.get(i); - - if(dba == null) { - // PigStorage will insert nulls for empty fields. - tupleFromDisk.set(i, null); - continue; - } - - switch(fields[i].getType()) { - - case DataType.CHARARRAY: - tupleFromDisk.set(i, caster.bytesToCharArray(dba.get())); - break; - - case DataType.INTEGER: - tupleFromDisk.set(i, caster.bytesToInteger(dba.get())); - break; - - case DataType.FLOAT: - tupleFromDisk.set(i, caster.bytesToFloat(dba.get())); - break; - - case DataType.LONG: - tupleFromDisk.set(i, caster.bytesToLong(dba.get())); - break; - - case DataType.DOUBLE: - tupleFromDisk.set(i, caster.bytesToDouble(dba.get())); - break; - - case DataType.MAP: - tupleFromDisk.set(i, caster.bytesToMap(dba.get())); - break; - - case DataType.BAG: - tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i])); - break; - - case DataType.TUPLE: - tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i])); - break; - - default: - throw new IOException("Unknown Pig type in data: "+fields[i].getType()); - } + Object data = tupleFromDisk.get(i); + + // We will do conversion for bytes only for now + if (data instanceof DataByteArray) { + + DataByteArray dba = (DataByteArray) data; + + if(dba == null) { + // PigStorage will insert nulls for empty fields. + tupleFromDisk.set(i, null); + continue; + } + + switch(fields[i].getType()) { + + case DataType.CHARARRAY: + tupleFromDisk.set(i, caster.bytesToCharArray(dba.get())); + break; + + case DataType.INTEGER: + tupleFromDisk.set(i, caster.bytesToInteger(dba.get())); + break; + + case DataType.FLOAT: + tupleFromDisk.set(i, caster.bytesToFloat(dba.get())); + break; + + case DataType.LONG: + tupleFromDisk.set(i, caster.bytesToLong(dba.get())); + break; + + case DataType.DOUBLE: + tupleFromDisk.set(i, caster.bytesToDouble(dba.get())); + break; + + case DataType.MAP: + tupleFromDisk.set(i, caster.bytesToMap(dba.get())); + break; + + case DataType.BAG: + tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i])); + break; + + case DataType.TUPLE: + tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i])); + break; + + default: + throw new IOException("Unknown Pig type in data: "+fields[i].getType()); + } + } } return tupleFromDisk; Index: src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOututDriver.java =================================================================== --- src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOututDriver.java (revision 0) +++ src/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOututDriver.java (revision 0) @@ -0,0 +1,89 @@ +package org.apache.hcatalog.pig.drivers; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.FileOutputStorageDriver; +import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; +import org.apache.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hcatalog.pig.HCatLoader; +import org.apache.hcatalog.pig.HCatStorer; +import org.apache.hcatalog.pig.PigHCatUtil; +import org.apache.pig.StoreFunc; +import org.apache.pig.StoreFuncInterface; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.data.DefaultTupleFactory; +import org.apache.pig.data.Tuple; + +public abstract class StoreFuncBasedOututDriver extends FileOutputStorageDriver { + + protected StoreFuncInterface sf; + + @Override + public void initialize(JobContext jobContext, Properties hcatProperties) throws IOException { + super.initialize(jobContext, hcatProperties); + } + + @Override + public OutputFormat, ? extends Writable> getOutputFormat() + throws IOException { + StoreFuncBasedOutputFormat outputFormat = new StoreFuncBasedOutputFormat(sf); + return outputFormat; + } + + @Override + public void setOutputPath(JobContext jobContext, String location) + throws IOException { + Job job = new Job(jobContext.getConfiguration()); + // Set signature before invoking StoreFunc methods, see comment in + // see comments in LoadFuncBasedInputDriver.initialize + String innerSignature = jobContext.getConfiguration().get(HCatStorer.INNER_SIGNATURE); + sf.setStoreFuncUDFContextSignature(innerSignature); + sf.setStoreLocation(location, job); + ConfigurationUtil.mergeConf(jobContext.getConfiguration(), + job.getConfiguration()); + } + + @Override + public void setSchema(JobContext jobContext, HCatSchema schema) + throws IOException { + // Set signature before invoking StoreFunc methods, see comment in + // see comments in LoadFuncBasedInputDriver.initialize + String innerSignature = jobContext.getConfiguration().get(HCatStorer.INNER_SIGNATURE); + sf.setStoreFuncUDFContextSignature(innerSignature); + sf.checkSchema(PigHCatUtil.getResourceSchema(schema)); + } + + @Override + public void setPartitionValues(JobContext jobContext, + Map partitionValues) throws IOException { + // Doing nothing, partition keys are not stored along with the data, so ignore it + } + + @Override + public WritableComparable generateKey(HCatRecord value) + throws IOException { + return null; + } + + @Override + public Writable convertValue(HCatRecord value) throws IOException { + Tuple t = new DefaultTupleFactory().newTuple(); + for (Object o : value.getAll()) { + t.append(o); + } + return t; + } + +} Index: src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java =================================================================== --- src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java (revision 1187028) +++ src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java (working copy) @@ -32,6 +32,7 @@ import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.mapreduce.HCatInputStorageDriver; +import org.apache.hcatalog.pig.HCatLoader; import org.apache.hcatalog.pig.PigHCatUtil; import org.apache.pig.LoadFunc; import org.apache.pig.data.Tuple; @@ -99,9 +100,15 @@ @Override public void initialize(JobContext context, Properties storageDriverArgs) throws IOException { - + // Need to set the right signature in setLocation. The original signature is used by HCatLoader + // and it does use this signature to access UDFContext, so we need to invent a new signature for + // the wrapped loader. + // As for PigStorage/JsonStorage, set signature right before setLocation seems to be good enough, + // we may need to set signature more aggressively if we support more loaders + String innerSignature = context.getConfiguration().get(HCatLoader.INNER_SIGNATURE); + lf.setUDFContextSignature(innerSignature); lf.setLocation(location, new Job(context.getConfiguration())); - inputFormat = new LoadFuncBasedInputFormat(lf, PigHCatUtil.getResourceSchema(dataSchema)); + inputFormat = new LoadFuncBasedInputFormat(lf, PigHCatUtil.getResourceSchema(dataSchema), location); } private String location; Index: src/java/org/apache/hcatalog/pig/HCatStorer.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatStorer.java (revision 1187028) +++ src/java/org/apache/hcatalog/pig/HCatStorer.java (working copy) @@ -48,10 +48,11 @@ public class HCatStorer extends HCatBaseStorer { - /** - * - */ + // Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize + final public static String INNER_SIGNATURE = "hcatstorer.inner.signature"; + final public static String INNER_SIGNATURE_PREFIX = "hcatstorer_inner_signature"; + public HCatStorer(String partSpecs, String schema) throws Exception { super(partSpecs, schema); } @@ -72,6 +73,7 @@ @Override public void setStoreLocation(String location, Job job) throws IOException { + job.getConfiguration().set(INNER_SIGNATURE, INNER_SIGNATURE_PREFIX + "_" + sign); Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}); String[] userStr = location.split("\\."); Index: src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java =================================================================== --- src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java (revision 1187028) +++ src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java (working copy) @@ -95,7 +95,7 @@ */ @SuppressWarnings("unchecked") @Override - public OutputFormat, ? super Writable> getOutputFormat() throws IOException { + public OutputFormat, ? extends Writable> getOutputFormat() throws IOException { if( outputFormat == null ) { outputFormat = new RCFileMapReduceOutputFormat(); } Index: src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java (revision 1187028) +++ src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java (working copy) @@ -52,7 +52,7 @@ * @return the OutputFormat instance * @throws IOException Signals that an I/O exception has occurred. */ - public abstract OutputFormat, ? super Writable> getOutputFormat() throws IOException; + public abstract OutputFormat, ? extends Writable> getOutputFormat() throws IOException; /** * Set the data location for the output. Index: src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java =================================================================== --- src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java (revision 1187028) +++ src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java (working copy) @@ -97,11 +97,13 @@ + "You may specify it through INPUT/OUTPUT storage drivers."); case HiveParser.TOK_TBLTEXTFILE: - throw new SemanticException( - "Operation not supported. HCatalog doesn't support " + - "Text File by default yet. " - + "You may specify it through INPUT/OUTPUT storage drivers."); + inputFormat = org.apache.hadoop.mapred.TextInputFormat.class.getName(); + outputFormat = org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat.class.getName(); + inStorageDriver = org.apache.hcatalog.pig.drivers.PigStorageInputDriver.class.getName(); + outStorageDriver = org.apache.hcatalog.pig.drivers.PigStorageOutputDriver.class.getName(); + break; + case HiveParser.TOK_LIKETABLE: String likeTableName;