Index: hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java =================================================================== --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java (revision 1398101) +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java (working copy) @@ -46,7 +46,7 @@ * Base class for HCatLoader and HCatEximLoader */ -public abstract class HCatBaseLoader extends LoadFunc implements LoadMetadata, LoadPushDown { +abstract class HCatBaseLoader extends LoadFunc implements LoadMetadata, LoadPushDown { protected static final String PRUNE_PROJECTION_INFO = "prune.projection.info"; Index: hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java =================================================================== --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java (revision 1398101) +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java (working copy) @@ -58,7 +58,7 @@ * */ -public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata { +abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata { private static final List SUPPORTED_INTEGER_CONVERSIONS = Lists.newArrayList(Type.TINYINT, Type.SMALLINT, Type.INT); Index: hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java =================================================================== --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java (revision 1398101) +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java (working copy) @@ -58,7 +58,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PigHCatUtil { +class PigHCatUtil { private static final Logger LOG = LoggerFactory.getLogger(PigHCatUtil.class); Index: hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/PigStorageInputDriver.java.broken =================================================================== --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/PigStorageInputDriver.java.broken (revision 1398101) +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/PigStorageInputDriver.java.broken (working copy) @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hcatalog.pig.drivers; - -import java.io.IOException; -import java.util.Properties; - -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.pig.builtin.PigStorage; - -public class PigStorageInputDriver extends LoadFuncBasedInputDriver { - - public static final String delim = "hcat.pigstorage.delim"; - - @Override - public void initialize(JobContext context, Properties storageDriverArgs) throws IOException { - - lf = storageDriverArgs.containsKey(delim) ? - new PigStorage(storageDriverArgs.getProperty(delim)) : new PigStorage(); - super.initialize(context, storageDriverArgs); - } -} Index: hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java =================================================================== --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java (revision 1398101) +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java (working copy) @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hcatalog.pig.drivers; - -import java.io.IOException; - -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.common.HCatUtil; -import org.apache.hcatalog.mapreduce.OutputJobInfo; -import org.apache.hcatalog.pig.PigHCatUtil; -import org.apache.pig.ResourceSchema; -import org.apache.pig.StoreFuncInterface; -import org.apache.pig.StoreMetadata; -import org.apache.pig.data.Tuple; - -public class StoreFuncBasedOutputFormat extends - OutputFormat { - - private final StoreFuncInterface storeFunc; - - public StoreFuncBasedOutputFormat(StoreFuncInterface storeFunc) { - - this.storeFunc = storeFunc; - } - - @Override - public void checkOutputSpecs(JobContext jobContext) throws IOException, - InterruptedException { - OutputFormat outputFormat = storeFunc.getOutputFormat(); - outputFormat.checkOutputSpecs(jobContext); - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext ctx) - throws IOException, InterruptedException { - String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); - OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(serializedJobInfo); - ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema()); - String location = outputJobInfo.getLocation(); - OutputFormat outputFormat = storeFunc.getOutputFormat(); - return new StoreFuncBasedOutputCommitter(storeFunc, outputFormat.getOutputCommitter(ctx), location, rs); - } - - @Override - public RecordWriter getRecordWriter( - TaskAttemptContext ctx) throws IOException, InterruptedException { - RecordWriter writer = storeFunc.getOutputFormat().getRecordWriter(ctx); - String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); - OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(serializedJobInfo); - ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema()); - String location = outputJobInfo.getLocation(); - return new StoreFuncBasedRecordWriter(writer, storeFunc, location, rs); - } - - static class StoreFuncBasedRecordWriter extends RecordWriter { - private final RecordWriter writer; - private final StoreFuncInterface storeFunc; - private final ResourceSchema schema; - private final String location; - - public StoreFuncBasedRecordWriter(RecordWriter writer, StoreFuncInterface sf, String location, ResourceSchema rs) throws IOException { - this.writer = writer; - this.storeFunc = sf; - this.schema = rs; - this.location = location; - storeFunc.prepareToWrite(writer); - } - - @Override - public void close(TaskAttemptContext ctx) throws IOException, - InterruptedException { - writer.close(ctx); - } - - @Override - public void write(BytesWritable key, Tuple value) throws IOException, - InterruptedException { - storeFunc.putNext(value); - } - } - - static class StoreFuncBasedOutputCommitter extends OutputCommitter { - StoreFuncInterface sf; - OutputCommitter wrappedOutputCommitter; - String location; - ResourceSchema rs; - - public StoreFuncBasedOutputCommitter(StoreFuncInterface sf, OutputCommitter outputCommitter, String location, ResourceSchema rs) { - this.sf = sf; - this.wrappedOutputCommitter = outputCommitter; - this.location = location; - this.rs = rs; - } - - @Override - public void abortTask(TaskAttemptContext context) throws IOException { - wrappedOutputCommitter.abortTask(context); - } - - @Override - public void commitTask(TaskAttemptContext context) throws IOException { - wrappedOutputCommitter.commitTask(context); - } - - @Override - public boolean needsTaskCommit(TaskAttemptContext context) - throws IOException { - return wrappedOutputCommitter.needsTaskCommit(context); - } - - @Override - public void setupJob(JobContext context) throws IOException { - wrappedOutputCommitter.setupJob(context); - } - - @Override - public void setupTask(TaskAttemptContext context) throws IOException { - wrappedOutputCommitter.setupTask(context); - } - - public void commitJob(JobContext context) throws IOException { - wrappedOutputCommitter.commitJob(context); - if (sf instanceof StoreMetadata) { - if (rs != null) { - ((StoreMetadata) sf).storeSchema( - rs, location, new Job(context.getConfiguration())); - } - } - } - - @Override - public void cleanupJob(JobContext context) throws IOException { - wrappedOutputCommitter.cleanupJob(context); - if (sf instanceof StoreMetadata) { - if (rs != null) { - ((StoreMetadata) sf).storeSchema( - rs, location, new Job(context.getConfiguration())); - } - } - } - - public void abortJob(JobContext context, JobStatus.State state) throws IOException { - wrappedOutputCommitter.abortJob(context, state); - } - } -} Index: hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java =================================================================== --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java (revision 1398101) +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java (working copy) @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hcatalog.pig.drivers; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.pig.LoadCaster; -import org.apache.pig.LoadFunc; -import org.apache.pig.LoadMetadata; -import org.apache.pig.ResourceSchema; -import org.apache.pig.ResourceSchema.ResourceFieldSchema; -import org.apache.pig.data.DataByteArray; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; - -/** - * based on {@link org.apache.pig.builtin.PigStorage} - */ -public class LoadFuncBasedInputFormat extends InputFormat { - - private final LoadFunc loadFunc; - private static ResourceFieldSchema[] fields; - - public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema, String location, Configuration conf) throws IOException { - - this.loadFunc = loadFunc; - fields = dataSchema.getFields(); - - // Simulate the frontend call sequence for LoadFunc, in case LoadFunc need to store something into UDFContext (as JsonLoader does) - if (loadFunc instanceof LoadMetadata) { - ((LoadMetadata) loadFunc).getSchema(location, new Job(conf)); - } - } - - @Override - public RecordReader createRecordReader( - InputSplit split, TaskAttemptContext taskContext) throws IOException, - InterruptedException { - RecordReader reader = loadFunc.getInputFormat().createRecordReader(split, taskContext); - return new LoadFuncBasedRecordReader(reader, loadFunc); - } - - @Override - public List getSplits(JobContext jobContext) throws IOException, - InterruptedException { - try { - InputFormat inpFormat = loadFunc.getInputFormat(); - return inpFormat.getSplits(jobContext); - - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - static class LoadFuncBasedRecordReader extends RecordReader { - - private Tuple tupleFromDisk; - private final RecordReader reader; - private final LoadFunc loadFunc; - private final LoadCaster caster; - - /** - * @param reader - * @param loadFunc - * @throws IOException - */ - public LoadFuncBasedRecordReader(RecordReader reader, LoadFunc loadFunc) throws IOException { - this.reader = reader; - this.loadFunc = loadFunc; - this.caster = loadFunc.getLoadCaster(); - } - - @Override - public void close() throws IOException { - reader.close(); - } - - @Override - public BytesWritable getCurrentKey() throws IOException, - InterruptedException { - return null; - } - - @Override - public Tuple getCurrentValue() throws IOException, InterruptedException { - - for (int i = 0; i < tupleFromDisk.size(); i++) { - - Object data = tupleFromDisk.get(i); - - // We will do conversion for bytes only for now - if (data instanceof DataByteArray) { - - DataByteArray dba = (DataByteArray) data; - - if (dba == null) { - // PigStorage will insert nulls for empty fields. - tupleFromDisk.set(i, null); - continue; - } - - switch (fields[i].getType()) { - - case DataType.CHARARRAY: - tupleFromDisk.set(i, caster.bytesToCharArray(dba.get())); - break; - - case DataType.INTEGER: - tupleFromDisk.set(i, caster.bytesToInteger(dba.get())); - break; - - case DataType.FLOAT: - tupleFromDisk.set(i, caster.bytesToFloat(dba.get())); - break; - - case DataType.LONG: - tupleFromDisk.set(i, caster.bytesToLong(dba.get())); - break; - - case DataType.DOUBLE: - tupleFromDisk.set(i, caster.bytesToDouble(dba.get())); - break; - - case DataType.MAP: - tupleFromDisk.set(i, caster.bytesToMap(dba.get())); - break; - - case DataType.BAG: - tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i])); - break; - - case DataType.TUPLE: - tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i])); - break; - - default: - throw new IOException("Unknown Pig type in data: " + fields[i].getType()); - } - } - } - - return tupleFromDisk; - } - - - @Override - public void initialize(InputSplit split, TaskAttemptContext ctx) - throws IOException, InterruptedException { - - reader.initialize(split, ctx); - loadFunc.prepareToRead(reader, null); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - - // even if we don't need any data from disk, we will need to call - // getNext() on pigStorage() so we know how many rows to emit in our - // final output - getNext() will eventually return null when it has - // read all disk data and we will know to stop emitting final output - tupleFromDisk = loadFunc.getNext(); - return tupleFromDisk != null; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return 0; - } - - } -} Index: hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java.broken =================================================================== --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java.broken (revision 1398101) +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java.broken (working copy) @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hcatalog.pig.drivers; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hcatalog.common.ErrorType; -import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.common.HCatException; -import org.apache.hcatalog.common.HCatUtil; -import org.apache.hcatalog.data.HCatRecord; -import org.apache.hcatalog.data.schema.HCatSchema; -import org.apache.hcatalog.mapreduce.FileOutputStorageDriver; -import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; -import org.apache.hcatalog.mapreduce.OutputJobInfo; -import org.apache.hcatalog.pig.HCatLoader; -import org.apache.hcatalog.pig.HCatStorer; -import org.apache.hcatalog.pig.PigHCatUtil; -import org.apache.pig.LoadFunc; -import org.apache.pig.StoreFunc; -import org.apache.pig.StoreFuncInterface; -import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; -import org.apache.pig.data.DefaultTupleFactory; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; - -public class StoreFuncBasedOutputDriver extends FileOutputStorageDriver { - - protected StoreFuncInterface sf; - private TupleFactory factory = TupleFactory.getInstance(); - private HCatSchema schema; - private String location; - - @Override - public void initialize(JobContext jobContext, Properties hcatProperties) throws IOException { - String storerString = hcatProperties.getProperty(HCatConstants.HCAT_PIG_STORER); - if (storerString==null) { - throw new HCatException(ErrorType.ERROR_INIT_STORER, "Don't know how to instantiate storer, " + HCatConstants.HCAT_PIG_STORER + " property is not defined for table "); - } - String storerArgs = hcatProperties.getProperty(HCatConstants.HCAT_PIG_STORER_ARGS); - - String[] args; - if (storerArgs!=null) { - String delimit = hcatProperties.getProperty(HCatConstants.HCAT_PIG_ARGS_DELIMIT); - if (delimit==null) { - delimit = HCatConstants.HCAT_PIG_ARGS_DELIMIT_DEFAULT; - } - args = storerArgs.split(delimit); - } else { - args = new String[0]; - } - - try { - Class storerClass = Class.forName(storerString); - - Constructor[] constructors = storerClass.getConstructors(); - for (Constructor constructor : constructors) { - if (constructor.getParameterTypes().length==args.length) { - sf = (StoreFuncInterface)constructor.newInstance(args); - break; - } - } - } catch (Exception e) { - throw new HCatException(ErrorType.ERROR_INIT_STORER, "Cannot instantiate " + storerString, e); - } - - if (sf==null) { - throw new HCatException(ErrorType.ERROR_INIT_STORER, "Cannot instantiate " + storerString + " with construct args " + storerArgs); - } - - super.initialize(jobContext, hcatProperties); - - Job job = new Job(jobContext.getConfiguration()); - String innerSignature = jobContext.getConfiguration().get(HCatStorer.INNER_SIGNATURE); - - // Set signature before invoking StoreFunc methods, see comment in - // see comments in LoadFuncBasedInputDriver.initialize - sf.setStoreFuncUDFContextSignature(innerSignature); - sf.checkSchema(PigHCatUtil.getResourceSchema(schema)); - - sf.setStoreLocation(location, job); - ConfigurationUtil.mergeConf(jobContext.getConfiguration(), - job.getConfiguration()); - } - - @Override - public OutputFormat, ? extends Writable> getOutputFormat() - throws IOException { - StoreFuncBasedOutputFormat outputFormat = new StoreFuncBasedOutputFormat(sf); - return outputFormat; - } - - @Override - public void setOutputPath(JobContext jobContext, String location) - throws IOException { - this.location = location; - } - - @Override - public void setSchema(JobContext jobContext, HCatSchema schema) - throws IOException { - this.schema = schema; - } - - @Override - public void setPartitionValues(JobContext jobContext, - Map partitionValues) throws IOException { - // Doing nothing, partition keys are not stored along with the data, so ignore it - } - - @Override - public WritableComparable generateKey(HCatRecord value) - throws IOException { - return null; - } - - @Override - public Writable convertValue(HCatRecord value) throws IOException { - Tuple t = factory.newTupleNoCopy(value.getAll()); - return t; - } - -} Index: hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java.broken =================================================================== --- hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java.broken (revision 1398101) +++ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java.broken (working copy) @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hcatalog.pig.drivers; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hcatalog.common.ErrorType; -import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.common.HCatException; -import org.apache.hcatalog.data.DefaultHCatRecord; -import org.apache.hcatalog.data.HCatRecord; -import org.apache.hcatalog.data.schema.HCatSchema; -import org.apache.hcatalog.mapreduce.HCatInputStorageDriver; -import org.apache.hcatalog.pig.HCatLoader; -import org.apache.hcatalog.pig.PigHCatUtil; -import org.apache.pig.LoadFunc; -import org.apache.pig.builtin.PigStorage; -import org.apache.pig.data.Tuple; - - -/** - * This is a base class which wraps a Load func in HCatInputStorageDriver. - * If you already have a LoadFunc, then this class along with LoadFuncBasedInputFormat - * is doing all the heavy lifting. For a new HCat Input Storage Driver just extend it - * and override the initialize(). {@link PigStorageInputDriver} illustrates - * that well. - */ -public class LoadFuncBasedInputDriver extends HCatInputStorageDriver{ - - private LoadFuncBasedInputFormat inputFormat; - private HCatSchema dataSchema; - private Map partVals; - private List desiredColNames; - protected LoadFunc lf; - - @Override - public HCatRecord convertToHCatRecord(WritableComparable baseKey, Writable baseValue) - throws IOException { - - List data = ((Tuple)baseValue).getAll(); - List hcatRecord = new ArrayList(desiredColNames.size()); - - /* Iterate through columns asked for in output schema, look them up in - * original data schema. If found, put it. Else look up in partition columns - * if found, put it. Else, its a new column, so need to put null. Map lookup - * on partition map will return null, if column is not found. - */ - for(String colName : desiredColNames){ - Integer idx = dataSchema.getPosition(colName); - hcatRecord.add( idx != null ? data.get(idx) : partVals.get(colName)); - } - return new DefaultHCatRecord(hcatRecord); - } - - @Override - public InputFormat getInputFormat( - Properties hcatProperties) { - - return inputFormat; - } - - @Override - public void setOriginalSchema(JobContext jobContext, HCatSchema hcatSchema) throws IOException { - - dataSchema = hcatSchema; - } - - @Override - public void setOutputSchema(JobContext jobContext, HCatSchema hcatSchema) throws IOException { - - desiredColNames = hcatSchema.getFieldNames(); - } - - @Override - public void setPartitionValues(JobContext jobContext, Map partitionValues) - throws IOException { - - partVals = partitionValues; - } - - @Override - public void initialize(JobContext context, Properties storageDriverArgs) throws IOException { - - String loaderString = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_LOADER); - if (loaderString==null) { - throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Don't know how to instantiate loader, " + HCatConstants.HCAT_PIG_LOADER + " property is not defined for table "); - } - String loaderArgs = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_LOADER_ARGS); - - String[] args; - if (loaderArgs!=null) { - String delimit = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_ARGS_DELIMIT); - if (delimit==null) { - delimit = HCatConstants.HCAT_PIG_ARGS_DELIMIT_DEFAULT; - } - args = loaderArgs.split(delimit); - } else { - args = new String[0]; - } - - try { - Class loaderClass = Class.forName(loaderString); - - Constructor[] constructors = loaderClass.getConstructors(); - for (Constructor constructor : constructors) { - if (constructor.getParameterTypes().length==args.length) { - lf = (LoadFunc)constructor.newInstance(args); - break; - } - } - } catch (Exception e) { - throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Cannot instantiate " + loaderString, e); - } - - if (lf==null) { - throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Cannot instantiate " + loaderString + " with construct args " + loaderArgs); - } - - // Need to set the right signature in setLocation. The original signature is used by HCatLoader - // and it does use this signature to access UDFContext, so we need to invent a new signature for - // the wrapped loader. - // As for PigStorage/JsonStorage, set signature right before setLocation seems to be good enough, - // we may need to set signature more aggressively if we support more loaders - String innerSignature = context.getConfiguration().get(HCatLoader.INNER_SIGNATURE); - lf.setUDFContextSignature(innerSignature); - lf.setLocation(location, new Job(context.getConfiguration())); - inputFormat = new LoadFuncBasedInputFormat(lf, PigHCatUtil.getResourceSchema(dataSchema), location, context.getConfiguration()); - } - - private String location; - - @Override - public void setInputPath(JobContext jobContext, String location) throws IOException { - - this.location = location; - super.setInputPath(jobContext, location); - } -}