diff --git src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java new file mode 100644 index 0000000..0e23822 --- /dev/null +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; + +public abstract class HCatBaseInputFormat extends InputFormat { + + /** + * get the schema for the HowlRecord data returned by HowlInputFormat. + * + * @param job + * the job object + * @param howlSchema + * the schema to use as the consolidated schema + * @throws IllegalArgumentException + */ + public static HCatSchema getOutputSchema(JobContext context) throws Exception { + String os = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); + if (os == null) { + return getTableSchema(context); + } else { + return (HCatSchema) HCatUtil.deserialize(os); + } + } + + /** + * Set the schema for the HowlRecord data returned by HowlInputFormat. + * @param job the job object + * @param hcatSchema the schema to use as the consolidated schema + */ + public static void setOutputSchema(Job job,HCatSchema hcatSchema) throws Exception { + job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(hcatSchema)); + } + + + /** + * Logically split the set of input files for the job. Returns the + * underlying InputFormat's splits + * @param jobContext the job context object + * @return the splits, an HowlInputSplit wrapper over the storage + * driver InputSplits + * @throws IOException or InterruptedException + */ + @Override + public List getSplits(JobContext jobContext) + throws IOException, InterruptedException { + + //Get the job info from the configuration, + //throws exception if not initialized + JobInfo jobInfo; + try { + jobInfo = getJobInfo(jobContext); + } catch (Exception e) { + throw new IOException(e); + } + + List splits = new ArrayList(); + List partitionInfoList = jobInfo.getPartitions(); + if(partitionInfoList == null ) { + //No partitions match the specified partition filter + return splits; + } + + //For each matching partition, call getSplits on the underlying InputFormat + for(PartInfo partitionInfo : partitionInfoList) { + Job localJob = new Job(jobContext.getConfiguration()); + HCatInputStorageDriver storageDriver; + try { + storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); + } catch (Exception e) { + throw new IOException(e); + } + + //Pass all required information to the storage driver + initStorageDriver(storageDriver, localJob, partitionInfo, jobInfo.getTableSchema()); + + //Get the input format for the storage driver + InputFormat inputFormat = + storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); + + //Call getSplit on the storage drivers InputFormat, create an + //HCatSplit for each underlying split + List baseSplits = inputFormat.getSplits(localJob); + + for(InputSplit split : baseSplits) { + splits.add(new HCatSplit( + partitionInfo, + split, + jobInfo.getTableSchema())); + } + } + + return splits; + } + + /** + * Create the RecordReader for the given InputSplit. Returns the underlying + * RecordReader if the required operations are supported and schema matches + * with HowlTable schema. Returns an HowlRecordReader if operations need to + * be implemented in Howl. + * @param split the split + * @param taskContext the task attempt context + * @return the record reader instance, either an HowlRecordReader(later) or + * the underlying storage driver's RecordReader + * @throws IOException or InterruptedException + */ + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext taskContext) throws IOException, InterruptedException { + + HCatSplit howlSplit = (HCatSplit) split; + PartInfo partitionInfo = howlSplit.getPartitionInfo(); + + //If running through a Pig job, the JobInfo will not be available in the + //backend process context (since HowlLoader works on a copy of the JobContext and does + //not call HowlInputFormat.setInput in the backend process). + //So this function should NOT attempt to read the JobInfo. + + HCatInputStorageDriver storageDriver; + try { + storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); + } catch (Exception e) { + throw new IOException(e); + } + + //Pass all required information to the storage driver + initStorageDriver(storageDriver, taskContext, partitionInfo, howlSplit.getTableSchema()); + + //Get the input format for the storage driver + InputFormat inputFormat = + storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); + + //Create the underlying input formats record record and an Howl wrapper + RecordReader recordReader = + inputFormat.createRecordReader(howlSplit.getBaseSplit(), taskContext); + + return new HCatRecordReader(storageDriver,recordReader); + } + + /** + * Gets the HowlTable schema for the table specified in the HowlInputFormat.setInput call + * on the specified job context. This information is available only after HowlInputFormat.setInput + * has been called for a JobContext. + * @param context the context + * @return the table schema + * @throws Exception if HowlInputFromat.setInput has not been called for the current context + */ + public static HCatSchema getTableSchema(JobContext context) throws Exception { + JobInfo jobInfo = getJobInfo(context); + return jobInfo.getTableSchema(); + } + + /** + * Gets the JobInfo object by reading the Configuration and deserializing + * the string. If JobInfo is not present in the configuration, throws an + * exception since that means HowlInputFormat.setInput has not been called. + * @param jobContext the job context + * @return the JobInfo object + * @throws Exception the exception + */ + private static JobInfo getJobInfo(JobContext jobContext) throws Exception { + String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); + if( jobString == null ) { + throw new Exception("job information not found in JobContext. HowlInputFormat.setInput() not called?"); + } + + return (JobInfo) HCatUtil.deserialize(jobString); + } + + + /** + * Initializes the storage driver instance. Passes on the required + * schema information, path info and arguments for the supported + * features to the storage driver. + * @param storageDriver the storage driver + * @param context the job context + * @param partitionInfo the partition info + * @param tableSchema the table level schema + * @throws IOException Signals that an I/O exception has occurred. + */ + private void initStorageDriver(HCatInputStorageDriver storageDriver, + JobContext context, PartInfo partitionInfo, + HCatSchema tableSchema) throws IOException { + + storageDriver.setInputPath(context, partitionInfo.getLocation()); + + if( partitionInfo.getPartitionSchema() != null ) { + storageDriver.setOriginalSchema(context, partitionInfo.getPartitionSchema()); + } + + storageDriver.setPartitionValues(context, partitionInfo.getPartitionValues()); + + //Set the output schema. Use the schema given by user if set, otherwise use the + //table level schema + HCatSchema outputSchema = null; + String outputSchemaString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); + if( outputSchemaString != null ) { + outputSchema = (HCatSchema) HCatUtil.deserialize(outputSchemaString); + } else { + outputSchema = tableSchema; + } + + storageDriver.setOutputSchema(context, outputSchema); + + storageDriver.initialize(context, partitionInfo.getInputStorageDriverProperties()); + } + + /** + * Gets the input driver instance. + * @param inputStorageDriverClass the input storage driver classname + * @return the input driver instance + * @throws Exception + */ + @SuppressWarnings("unchecked") + private HCatInputStorageDriver getInputDriverInstance( + String inputStorageDriverClass) throws Exception { + try { + Class driverClass = + (Class) + Class.forName(inputStorageDriverClass); + return driverClass.newInstance(); + } catch(Exception e) { + throw new Exception("error creating storage driver " + + inputStorageDriverClass, e); + } + } + +} diff --git src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java new file mode 100644 index 0000000..b10b734 --- /dev/null +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus.State; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.common.HCatException; + +public abstract class HCatBaseOutputCommitter extends OutputCommitter { + + /** The underlying output committer */ + protected final OutputCommitter baseCommitter; + + public HCatBaseOutputCommitter(OutputCommitter baseCommitter) { + this.baseCommitter = baseCommitter; + } + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + baseCommitter.abortTask(context); + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + baseCommitter.commitTask(context); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { + return baseCommitter.needsTaskCommit(context); + } + + @Override + public void setupJob(JobContext context) throws IOException { + if( baseCommitter != null ) { + baseCommitter.setupJob(context); + } + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + baseCommitter.setupTask(context); + } + + @Override + public void abortJob(JobContext jobContext, State state) throws IOException { + if(baseCommitter != null) { + baseCommitter.abortJob(jobContext, state); + } + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); + + doAbortJob(jobContext, jobInfo); + + Path src = new Path(jobInfo.getLocation()); + FileSystem fs = src.getFileSystem(jobContext.getConfiguration()); + fs.delete(src, true); + } + + protected void doAbortJob(JobContext jobContext, OutputJobInfo jobInfo) throws HCatException { + } + + public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; + static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = + "mapreduce.fileoutputcommitter.marksuccessfuljobs"; + + private static boolean getOutputDirMarking(Configuration conf) { + return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, + false); + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + if(baseCommitter != null) { + baseCommitter.commitJob(jobContext); + } + // create _SUCCESS FILE if so requested. + OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); + if(getOutputDirMarking(jobContext.getConfiguration())) { + Path outputPath = new Path(jobInfo.getLocation()); + if (outputPath != null) { + FileSystem fileSys = outputPath.getFileSystem(jobContext.getConfiguration()); + // create a file in the folder to mark it + if (fileSys.exists(outputPath)) { + Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME); + if(!fileSys.exists(filePath)) { // may have been created by baseCommitter.commitJob() + fileSys.create(filePath).close(); + } + } + } + } + cleanupJob(jobContext); + } +} diff --git src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java new file mode 100644 index 0000000..6a992b2 --- /dev/null +++ src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hcatalog.common.ErrorType; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; + +public abstract class HCatBaseOutputFormat extends OutputFormat, HCatRecord> { + + /** + * Gets the table schema for the table specified in the HowlOutputFormat.setOutput call + * on the specified job context. + * @param context the context + * @return the table schema + * @throws IOException if HowlOutputFromat.setOutput has not been called for the passed context + */ + public static HCatSchema getTableSchema(JobContext context) throws IOException { + OutputJobInfo jobInfo = getJobInfo(context); + return jobInfo.getTableSchema(); + } + + /** + * Check for validity of the output-specification for the job. + * @param context information about the job + * @throws IOException when output should not be attempted + */ + @Override + public void checkOutputSpecs(JobContext context + ) throws IOException, InterruptedException { + OutputFormat, ? super Writable> outputFormat = getOutputFormat(context); + outputFormat.checkOutputSpecs(context); + } + + /** + * Gets the output format instance. + * @param context the job context + * @return the output format instance + * @throws IOException + */ + protected OutputFormat, ? super Writable> getOutputFormat(JobContext context) throws IOException { + OutputJobInfo jobInfo = getJobInfo(context); + HCatOutputStorageDriver driver = getOutputDriverInstance(context, jobInfo); + + OutputFormat, ? super Writable> outputFormat = + driver.getOutputFormat(); + return outputFormat; + } + + /** + * Gets the HowlOuputJobInfo object by reading the Configuration and deserializing + * the string. If JobInfo is not present in the configuration, throws an + * exception since that means HowlOutputFormat.setOutput has not been called. + * @param jobContext the job context + * @return the OutputJobInfo object + * @throws IOException the IO exception + */ + static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException { + String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); + if( jobString == null ) { + throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED); + } + + return (OutputJobInfo) HCatUtil.deserialize(jobString); + } + + /** + * Gets the output storage driver instance. + * @param jobContext the job context + * @param jobInfo the output job info + * @return the output driver instance + * @throws IOException + */ + @SuppressWarnings("unchecked") + static HCatOutputStorageDriver getOutputDriverInstance( + JobContext jobContext, OutputJobInfo jobInfo) throws IOException { + try { + Class driverClass = + (Class) + Class.forName(jobInfo.getStorerInfo().getOutputSDClass()); + HCatOutputStorageDriver driver = driverClass.newInstance(); + + //Initialize the storage driver + driver.setSchema(jobContext, jobInfo.getOutputSchema()); + driver.setPartitionValues(jobContext, jobInfo.getTableInfo().getPartitionValues()); + driver.setOutputPath(jobContext, jobInfo.getLocation()); + + driver.initialize(jobContext, jobInfo.getStorerInfo().getProperties()); + + return driver; + } catch(Exception e) { + throw new HCatException(ErrorType.ERROR_INIT_STORAGE_DRIVER, e); + } + } + + protected static void setPartDetails(OutputJobInfo jobInfo, final HCatSchema schema, + Map partMap) throws HCatException, IOException { + List posOfPartCols = new ArrayList(); + + // If partition columns occur in data, we want to remove them. + // So, find out positions of partition columns in schema provided by user. + // We also need to update the output Schema with these deletions. + + // Note that, output storage drivers never sees partition columns in data + // or schema. + + HCatSchema schemaWithoutParts = new HCatSchema(schema.getFields()); + for(String partKey : partMap.keySet()){ + Integer idx; + if((idx = schema.getPosition(partKey)) != null){ + posOfPartCols.add(idx); + schemaWithoutParts.remove(schema.get(partKey)); + } + } + HCatUtil.validatePartitionSchema(jobInfo.getTable(), schemaWithoutParts); + jobInfo.setPosOfPartCols(posOfPartCols); + jobInfo.setOutputSchema(schemaWithoutParts); + } +} diff --git src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java new file mode 100644 index 0000000..515707a --- /dev/null +++ src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; + +/** The InputFormat to use to read data from Howl */ +public class HCatEximInputFormat extends HCatBaseInputFormat { + + /** + * Set the input to use for the Job. This queries the metadata file with + * the specified partition predicates, gets the matching partitions, puts + * the information in the conf object. The inputInfo object is updated with + * information needed in the client context + * + * @param job + * the job object + * @param inputInfo + * the table input info + * @return two howl schemas, for the table columns and the partition keys + * @throws IOException + * the exception in communicating with the metadata server + */ + public static List setInput(Job job, + String location, + Map partitionFilter) throws IOException { + FileSystem fs; + try { + fs = FileSystem.get(new URI(location), job.getConfiguration()); + } catch (URISyntaxException e) { + throw new IOException(e); + } + Path fromPath = new Path(location); + Path metadataPath = new Path(fromPath, "_metadata"); + try { + Map.Entry> tp = EximUtil + .readMetaData(fs, metadataPath); + org.apache.hadoop.hive.metastore.api.Table table = tp.getKey(); + HCatTableInfo inputInfo = HCatTableInfo.getInputTableInfo(null, + null, table.getDbName(), table.getTableName()); + List partCols = table.getPartitionKeys(); + List partInfoList = null; + if (partCols.size() > 0) { + List partColNames = new ArrayList(partCols.size()); + for (FieldSchema fsc : partCols) { + partColNames.add(fsc.getName()); + } + List partitions = tp.getValue(); + partInfoList = filterPartitions(partitionFilter, partitions, table.getPartitionKeys()); + } else { + partInfoList = new ArrayList(1); + HCatSchema schema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getSd().getCols())); + Map parameters = table.getParameters(); + String inputStorageDriverClass = null; + if (parameters.containsKey(HCatConstants.HCAT_ISD_CLASS)){ + inputStorageDriverClass = parameters.get(HCatConstants.HCAT_ISD_CLASS); + }else{ + throw new IOException("No input storage driver classname found, cannot read partition"); + } + Properties howlProperties = new Properties(); + for (String key : parameters.keySet()){ + if (key.startsWith(InitializeInput.HCAT_KEY_PREFIX)){ + howlProperties.put(key, parameters.get(key)); + } + } + PartInfo partInfo = new PartInfo(schema, inputStorageDriverClass, location + "/data", howlProperties); + partInfoList.add(partInfo); + } + JobInfo howlJobInfo = new JobInfo(inputInfo, + HCatUtil.getTableSchemaWithPtnCols(table), partInfoList); + job.getConfiguration().set( + HCatConstants.HCAT_KEY_JOB_INFO, + HCatUtil.serialize(howlJobInfo)); + List rv = new ArrayList(2); + rv.add(HCatSchemaUtils.getHCatSchema(table.getSd().getCols())); + rv.add(HCatSchemaUtils.getHCatSchema(partCols)); + return rv; + } catch(SemanticException e) { + throw new IOException(e); + } + } + + private static List filterPartitions(Map partitionFilter, + List partitions, List partCols) throws IOException { + List partInfos = new LinkedList(); + for (Partition partition : partitions) { + boolean matches = true; + List partVals = partition.getValues(); + assert partCols.size() == partVals.size(); + Map partSpec = EximUtil.makePartSpec(partCols, partVals); + if (partitionFilter != null) { + for (Map.Entry constraint : partitionFilter.entrySet()) { + String partVal = partSpec.get(constraint.getKey()); + if ((partVal == null) || !partVal.equals(constraint.getValue())) { + matches = false; + break; + } + } + } + if (matches) { + PartInfo partInfo = InitializeInput.extractPartInfo(partition.getSd(), + partition.getParameters()); + partInfo.setPartitionValues(partSpec); + partInfos.add(partInfo); + } + } + return partInfos; + } +} diff --git src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java new file mode 100644 index 0000000..877320e --- /dev/null +++ src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hcatalog.common.ErrorType; +import org.apache.hcatalog.common.HCatException; + +public class HCatEximOutputCommitter extends HCatBaseOutputCommitter { + + private static final Log LOG = LogFactory.getLog(HCatEximOutputCommitter.class); + + public HCatEximOutputCommitter(OutputCommitter baseCommitter) { + super(baseCommitter); + } + + @Override + public void cleanupJob(JobContext jobContext) throws IOException { + LOG.info("HowlEximOutputCommitter.cleanup invoked; m.o.d : " + + jobContext.getConfiguration().get("mapred.output.dir")); + if (baseCommitter != null) { + LOG.info("baseCommitter.class = " + baseCommitter.getClass().getName()); + baseCommitter.cleanupJob(jobContext); + } + + OutputJobInfo jobInfo = HCatBaseOutputFormat.getJobInfo(jobContext); + Configuration conf = jobContext.getConfiguration(); + FileSystem fs; + try { + fs = FileSystem.get(new URI(jobInfo.getTable().getSd().getLocation()), conf); + } catch (URISyntaxException e) { + throw new IOException(e); + } + doCleanup(jobInfo, fs); + } + + private static void doCleanup(OutputJobInfo jobInfo, FileSystem fs) throws IOException, + HCatException { + try { + Table ttable = jobInfo.getTable(); + org.apache.hadoop.hive.ql.metadata.Table table = new org.apache.hadoop.hive.ql.metadata.Table( + ttable); + StorageDescriptor tblSD = ttable.getSd(); + Path tblPath = new Path(tblSD.getLocation()); + Path path = new Path(tblPath, "_metadata"); + List tpartitions = null; + try { + Map.Entry> rv = EximUtil + .readMetaData(fs, path); + tpartitions = rv.getValue(); + } catch (IOException e) { + } + List partitions = + new ArrayList(); + if (tpartitions != null) { + for (Partition tpartition : tpartitions) { + partitions.add(new org.apache.hadoop.hive.ql.metadata.Partition(table, tpartition)); + } + } + if (!table.getPartitionKeys().isEmpty()) { + Map partitionValues = jobInfo.getTableInfo().getPartitionValues(); + org.apache.hadoop.hive.ql.metadata.Partition partition = + new org.apache.hadoop.hive.ql.metadata.Partition(table, + partitionValues, + new Path(tblPath, Warehouse.makePartPath(partitionValues))); + partition.getTPartition().setParameters(table.getParameters()); + partitions.add(partition); + } + EximUtil.createExportDump(fs, path, (table), partitions); + } catch (SemanticException e) { + throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); + } catch (HiveException e) { + throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); + } catch (MetaException e) { + throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e); + } + } +} diff --git src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java new file mode 100644 index 0000000..ca2f1cb --- /dev/null +++ src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hcatalog.common.ErrorType; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hcatalog.rcfile.RCFileInputDriver; +import org.apache.hcatalog.rcfile.RCFileOutputDriver; + +/** + * The OutputFormat to use to write data to Howl without a howl server. This can then + * be imported into a howl instance, or used with a HowlEximInputFormat. As in + * HowlOutputFormat, the key value is ignored and + * and should be given as null. The value is the HowlRecord to write. + */ +public class HCatEximOutputFormat extends HCatBaseOutputFormat { + + private static final Log LOG = LogFactory.getLog(HCatEximOutputFormat.class); + + /** + * Get the record writer for the job. Uses the Table's default OutputStorageDriver + * to get the record writer. + * + * @param context + * the information about the current task. + * @return a RecordWriter to write the output for the job. + * @throws IOException + */ + @Override + public RecordWriter, HCatRecord> + getRecordWriter(TaskAttemptContext context + ) throws IOException, InterruptedException { + HCatRecordWriter rw = new HCatRecordWriter(context); + return rw; + } + + /** + * Get the output committer for this output format. This is responsible + * for ensuring the output is committed correctly. + * @param context the task context + * @return an output committer + * @throws IOException + * @throws InterruptedException + */ + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + OutputFormat, ? super Writable> outputFormat = getOutputFormat(context); + return new HCatEximOutputCommitter(outputFormat.getOutputCommitter(context)); + } + + public static void setOutput(Job job, String dbname, String tablename, String location, + HCatSchema partitionSchema, List partitionValues, HCatSchema columnSchema) throws HCatException { + setOutput(job, dbname, tablename, location, partitionSchema, partitionValues, columnSchema, + RCFileInputDriver.class.getName(), + RCFileOutputDriver.class.getName(), + RCFileInputFormat.class.getName(), + RCFileOutputFormat.class.getName(), + ColumnarSerDe.class.getName()); + } + + @SuppressWarnings("unchecked") + public static void setOutput(Job job, String dbname, String tablename, String location, + HCatSchema partitionSchema, + List partitionValues, + HCatSchema columnSchema, + String isdname, String osdname, + String ifname, String ofname, + String serializationLib) throws HCatException { + Map partSpec = new TreeMap(); + List partKeys = null; + if (partitionSchema != null) { + partKeys = partitionSchema.getFields(); + if (partKeys.size() != partitionValues.size()) { + throw new IllegalArgumentException("Partition key size differs from partition value size"); + } + for (int i = 0; i < partKeys.size(); ++i) { + HCatFieldSchema partKey = partKeys.get(i); + if (partKey.getType() != HCatFieldSchema.Type.STRING) { + throw new IllegalArgumentException("Partition key type string is only supported"); + } + partSpec.put(partKey.getName(), partitionValues.get(i)); + } + } + StorerInfo storerInfo = new StorerInfo(isdname, osdname, new Properties()); + HCatTableInfo outputInfo = HCatTableInfo.getOutputTableInfo(null, null, dbname, tablename, + partSpec); + org.apache.hadoop.hive.ql.metadata.Table tbl = new + org.apache.hadoop.hive.ql.metadata.Table(dbname, tablename); + Table table = tbl.getTTable(); + table.getParameters().put(HCatConstants.HCAT_ISD_CLASS, isdname); + table.getParameters().put(HCatConstants.HCAT_OSD_CLASS, osdname); + try { + String partname = null; + if ((partKeys != null) && !partKeys.isEmpty()) { + table.setPartitionKeys(HCatSchemaUtils.getFieldSchemas(partKeys)); + partname = Warehouse.makePartPath(partSpec); + } else { + partname = "data"; + } + StorageDescriptor sd = table.getSd(); + sd.setLocation(location); + String dataLocation = location + "/" + partname; + OutputJobInfo jobInfo = new OutputJobInfo(outputInfo, + columnSchema, columnSchema, storerInfo, dataLocation, table); + setPartDetails(jobInfo, columnSchema, partSpec); + sd.setCols(HCatUtil.getFieldSchemaList(jobInfo.getOutputSchema().getFields())); + sd.setInputFormat(ifname); + sd.setOutputFormat(ofname); + SerDeInfo serdeInfo = sd.getSerdeInfo(); + serdeInfo.setSerializationLib(serializationLib); + Configuration conf = job.getConfiguration(); + conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); + } catch (IOException e) { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } catch (MetaException e) { + throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e); + } + } +} diff --git src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java index c12a72e..2219e03 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java +++ src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java @@ -19,23 +19,11 @@ package org.apache.hcatalog.mapreduce; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.common.HCatUtil; -import org.apache.hcatalog.data.HCatRecord; -import org.apache.hcatalog.data.schema.HCatSchema; /** The InputFormat to use to read data from Howl */ -public class HCatInputFormat extends InputFormat { +public class HCatInputFormat extends HCatBaseInputFormat { /** * Set the input to use for the Job. This queries the metadata server with @@ -55,206 +43,5 @@ public class HCatInputFormat extends InputFormat } } - /** - * Set the schema for the HowlRecord data returned by HowlInputFormat. - * @param job the job object - * @param hcatSchema the schema to use as the consolidated schema - */ - public static void setOutputSchema(Job job,HCatSchema hcatSchema) throws Exception { - job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(hcatSchema)); - } - - - /** - * Logically split the set of input files for the job. Returns the - * underlying InputFormat's splits - * @param jobContext the job context object - * @return the splits, an HowlInputSplit wrapper over the storage - * driver InputSplits - * @throws IOException or InterruptedException - */ - @Override - public List getSplits(JobContext jobContext) - throws IOException, InterruptedException { - - //Get the job info from the configuration, - //throws exception if not initialized - JobInfo jobInfo; - try { - jobInfo = getJobInfo(jobContext); - } catch (Exception e) { - throw new IOException(e); - } - - List splits = new ArrayList(); - List partitionInfoList = jobInfo.getPartitions(); - if(partitionInfoList == null ) { - //No partitions match the specified partition filter - return splits; - } - - //For each matching partition, call getSplits on the underlying InputFormat - for(PartInfo partitionInfo : partitionInfoList) { - Job localJob = new Job(jobContext.getConfiguration()); - HCatInputStorageDriver storageDriver; - try { - storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); - } catch (Exception e) { - throw new IOException(e); - } - - //Pass all required information to the storage driver - initStorageDriver(storageDriver, localJob, partitionInfo, jobInfo.getTableSchema()); - - //Get the input format for the storage driver - InputFormat inputFormat = - storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); - - //Call getSplit on the storage drivers InputFormat, create an - //HCatSplit for each underlying split - List baseSplits = inputFormat.getSplits(localJob); - - for(InputSplit split : baseSplits) { - splits.add(new HCatSplit( - partitionInfo, - split, - jobInfo.getTableSchema())); - } - } - - return splits; - } - - /** - * Create the RecordReader for the given InputSplit. Returns the underlying - * RecordReader if the required operations are supported and schema matches - * with HowlTable schema. Returns an HowlRecordReader if operations need to - * be implemented in Howl. - * @param split the split - * @param taskContext the task attempt context - * @return the record reader instance, either an HowlRecordReader(later) or - * the underlying storage driver's RecordReader - * @throws IOException or InterruptedException - */ - @Override - public RecordReader createRecordReader(InputSplit split, - TaskAttemptContext taskContext) throws IOException, InterruptedException { - - HCatSplit howlSplit = (HCatSplit) split; - PartInfo partitionInfo = howlSplit.getPartitionInfo(); - - //If running through a Pig job, the JobInfo will not be available in the - //backend process context (since HowlLoader works on a copy of the JobContext and does - //not call HowlInputFormat.setInput in the backend process). - //So this function should NOT attempt to read the JobInfo. - - HCatInputStorageDriver storageDriver; - try { - storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass()); - } catch (Exception e) { - throw new IOException(e); - } - - //Pass all required information to the storage driver - initStorageDriver(storageDriver, taskContext, partitionInfo, howlSplit.getTableSchema()); - - //Get the input format for the storage driver - InputFormat inputFormat = - storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties()); - - //Create the underlying input formats record record and an Howl wrapper - RecordReader recordReader = - inputFormat.createRecordReader(howlSplit.getBaseSplit(), taskContext); - - return new HCatRecordReader(storageDriver,recordReader); - } - - /** - * Gets the HowlTable schema for the table specified in the HowlInputFormat.setInput call - * on the specified job context. This information is available only after HowlInputFormat.setInput - * has been called for a JobContext. - * @param context the context - * @return the table schema - * @throws Exception if HowlInputFromat.setInput has not been called for the current context - */ - public static HCatSchema getTableSchema(JobContext context) throws Exception { - JobInfo jobInfo = getJobInfo(context); - return jobInfo.getTableSchema(); - } - - /** - * Gets the JobInfo object by reading the Configuration and deserializing - * the string. If JobInfo is not present in the configuration, throws an - * exception since that means HowlInputFormat.setInput has not been called. - * @param jobContext the job context - * @return the JobInfo object - * @throws Exception the exception - */ - private static JobInfo getJobInfo(JobContext jobContext) throws Exception { - String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO); - if( jobString == null ) { - throw new Exception("job information not found in JobContext. HowlInputFormat.setInput() not called?"); - } - - return (JobInfo) HCatUtil.deserialize(jobString); - } - - - /** - * Initializes the storage driver instance. Passes on the required - * schema information, path info and arguments for the supported - * features to the storage driver. - * @param storageDriver the storage driver - * @param context the job context - * @param partitionInfo the partition info - * @param tableSchema the table level schema - * @throws IOException Signals that an I/O exception has occurred. - */ - private void initStorageDriver(HCatInputStorageDriver storageDriver, - JobContext context, PartInfo partitionInfo, - HCatSchema tableSchema) throws IOException { - - storageDriver.setInputPath(context, partitionInfo.getLocation()); - - if( partitionInfo.getPartitionSchema() != null ) { - storageDriver.setOriginalSchema(context, partitionInfo.getPartitionSchema()); - } - - storageDriver.setPartitionValues(context, partitionInfo.getPartitionValues()); - - //Set the output schema. Use the schema given by user if set, otherwise use the - //table level schema - HCatSchema outputSchema = null; - String outputSchemaString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA); - if( outputSchemaString != null ) { - outputSchema = (HCatSchema) HCatUtil.deserialize(outputSchemaString); - } else { - outputSchema = tableSchema; - } - - storageDriver.setOutputSchema(context, outputSchema); - - storageDriver.initialize(context, partitionInfo.getInputStorageDriverProperties()); - } - - /** - * Gets the input driver instance. - * @param inputStorageDriverClass the input storage driver classname - * @return the input driver instance - * @throws Exception - */ - @SuppressWarnings("unchecked") - private HCatInputStorageDriver getInputDriverInstance( - String inputStorageDriverClass) throws Exception { - try { - Class driverClass = - (Class) - Class.forName(inputStorageDriverClass); - return driverClass.newInstance(); - } catch(Exception e) { - throw new Exception("error creating storage driver " + - inputStorageDriverClass, e); - } - } } diff --git src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java index 8c77570..2a4e4a1 100644 --- src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java +++ src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java @@ -64,7 +64,7 @@ import org.apache.thrift.TException; /** The OutputFormat to use to write data to Howl. The key value is ignored and * and should be given as null. The value is the HowlRecord to write.*/ -public class HCatOutputFormat extends OutputFormat, HCatRecord> { +public class HCatOutputFormat extends HCatBaseOutputFormat { /** The directory under which data is initially written for a non partitioned table */ protected static final String TEMP_DIR_NAME = "_TEMP"; @@ -283,42 +283,11 @@ public class HCatOutputFormat extends OutputFormat, HCatRe OutputJobInfo jobInfo = getJobInfo(job); Map partMap = jobInfo.getTableInfo().getPartitionValues(); - List posOfPartCols = new ArrayList(); - - // If partition columns occur in data, we want to remove them. - // So, find out positions of partition columns in schema provided by user. - // We also need to update the output Schema with these deletions. - - // Note that, output storage drivers never sees partition columns in data - // or schema. - - HCatSchema schemaWithoutParts = new HCatSchema(schema.getFields()); - for(String partKey : partMap.keySet()){ - Integer idx; - if((idx = schema.getPosition(partKey)) != null){ - posOfPartCols.add(idx); - schemaWithoutParts.remove(schema.get(partKey)); - } - } - HCatUtil.validatePartitionSchema(jobInfo.getTable(), schemaWithoutParts); - jobInfo.setPosOfPartCols(posOfPartCols); - jobInfo.setOutputSchema(schemaWithoutParts); + setPartDetails(jobInfo, schema, partMap); job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo)); } /** - * Gets the table schema for the table specified in the HowlOutputFormat.setOutput call - * on the specified job context. - * @param context the context - * @return the table schema - * @throws IOException if HowlOutputFromat.setOutput has not been called for the passed context - */ - public static HCatSchema getTableSchema(JobContext context) throws IOException { - OutputJobInfo jobInfo = getJobInfo(context); - return jobInfo.getTableSchema(); - } - - /** * Get the record writer for the job. Uses the Table's default OutputStorageDriver * to get the record writer. * @param context the information about the current task. @@ -349,18 +318,6 @@ public class HCatOutputFormat extends OutputFormat, HCatRe } /** - * Check for validity of the output-specification for the job. - * @param context information about the job - * @throws IOException when output should not be attempted - */ - @Override - public void checkOutputSpecs(JobContext context - ) throws IOException, InterruptedException { - OutputFormat, ? super Writable> outputFormat = getOutputFormat(context); - outputFormat.checkOutputSpecs(context); - } - - /** * Get the output committer for this output format. This is responsible * for ensuring the output is committed correctly. * @param context the task context @@ -375,68 +332,6 @@ public class HCatOutputFormat extends OutputFormat, HCatRe return new HCatOutputCommitter(outputFormat.getOutputCommitter(context)); } - - /** - * Gets the output format instance. - * @param context the job context - * @return the output format instance - * @throws IOException - */ - private OutputFormat, ? super Writable> getOutputFormat(JobContext context) throws IOException { - OutputJobInfo jobInfo = getJobInfo(context); - HCatOutputStorageDriver driver = getOutputDriverInstance(context, jobInfo); - - OutputFormat, ? super Writable> outputFormat = - driver.getOutputFormat(); - return outputFormat; - } - - /** - * Gets the HowlOuputJobInfo object by reading the Configuration and deserializing - * the string. If JobInfo is not present in the configuration, throws an - * exception since that means HowlOutputFormat.setOutput has not been called. - * @param jobContext the job context - * @return the OutputJobInfo object - * @throws IOException the IO exception - */ - static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException { - String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO); - if( jobString == null ) { - throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED); - } - - return (OutputJobInfo) HCatUtil.deserialize(jobString); - } - - /** - * Gets the output storage driver instance. - * @param jobContext the job context - * @param jobInfo the output job info - * @return the output driver instance - * @throws IOException - */ - @SuppressWarnings("unchecked") - static HCatOutputStorageDriver getOutputDriverInstance( - JobContext jobContext, OutputJobInfo jobInfo) throws IOException { - try { - Class driverClass = - (Class) - Class.forName(jobInfo.getStorerInfo().getOutputSDClass()); - HCatOutputStorageDriver driver = driverClass.newInstance(); - - //Initialize the storage driver - driver.setSchema(jobContext, jobInfo.getOutputSchema()); - driver.setPartitionValues(jobContext, jobInfo.getTableInfo().getPartitionValues()); - driver.setOutputPath(jobContext, jobInfo.getLocation()); - - driver.initialize(jobContext, jobInfo.getStorerInfo().getProperties()); - - return driver; - } catch(Exception e) { - throw new HCatException(ErrorType.ERROR_INIT_STORAGE_DRIVER, e); - } - } - static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException { HiveConf hiveConf = new HiveConf(HCatOutputFormat.class); diff --git src/java/org/apache/hcatalog/mapreduce/InitializeInput.java src/java/org/apache/hcatalog/mapreduce/InitializeInput.java index 75d50ab..6d1e797 100644 --- src/java/org/apache/hcatalog/mapreduce/InitializeInput.java +++ src/java/org/apache/hcatalog/mapreduce/InitializeInput.java @@ -47,7 +47,7 @@ import org.apache.hcatalog.data.schema.HCatSchema; public class InitializeInput { /** The prefix for keys used for storage driver arguments */ - private static final String HCAT_KEY_PREFIX = "hcat."; + static final String HCAT_KEY_PREFIX = "hcat."; private static final HiveConf hiveConf = new HiveConf(HCatInputFormat.class); private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, HCatTableInfo inputInfo) throws Exception { @@ -146,7 +146,7 @@ public class InitializeInput { return ptnKeyValues; } - private static PartInfo extractPartInfo(StorageDescriptor sd, Map parameters) throws IOException{ + static PartInfo extractPartInfo(StorageDescriptor sd, Map parameters) throws IOException{ HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd); String inputStorageDriverClass = null; Properties howlProperties = new Properties(); diff --git src/java/org/apache/hcatalog/pig/HCatBaseLoader.java src/java/org/apache/hcatalog/pig/HCatBaseLoader.java new file mode 100644 index 0000000..31ff9da --- /dev/null +++ src/java/org/apache/hcatalog/pig/HCatBaseLoader.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.pig; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.pig.LoadFunc; +import org.apache.pig.LoadMetadata; +import org.apache.pig.LoadPushDown; +import org.apache.pig.PigException; +import org.apache.pig.ResourceStatistics; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.UDFContext; + +/** + * Base class for HCatLoader and HCatEximLoader + */ + +public abstract class HCatBaseLoader extends LoadFunc implements LoadMetadata, LoadPushDown { + + protected static final String PRUNE_PROJECTION_INFO = "prune.projection.info"; + + private RecordReader reader; + protected String signature; + + HCatSchema outputSchema = null; + + + @Override + public Tuple getNext() throws IOException { + try { + HCatRecord hr = (HCatRecord) (reader.nextKeyValue() ? reader.getCurrentValue() : null); + Tuple t = PigHCatUtil.transformToTuple(hr,outputSchema); + // TODO : we were discussing an iter interface, and also a LazyTuple + // change this when plans for that solidifies. + return t; + } catch (ExecException e) { + int errCode = 6018; + String errMsg = "Error while reading input"; + throw new ExecException(errMsg, errCode, + PigException.REMOTE_ENVIRONMENT, e); + } catch (Exception eOther){ + int errCode = 6018; + String errMsg = "Error converting read value to tuple"; + throw new ExecException(errMsg, errCode, + PigException.REMOTE_ENVIRONMENT, eOther); + } + + } + + @Override + public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException { + this.reader = reader; + } + + @Override + public ResourceStatistics getStatistics(String location, Job job) throws IOException { + // statistics not implemented currently + return null; + } + + @Override + public List getFeatures() { + return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION); + } + + @Override + public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldsInfo) throws FrontendException { + // Store the required fields information in the UDFContext so that we + // can retrieve it later. + storeInUDFContext(signature, PRUNE_PROJECTION_INFO, requiredFieldsInfo); + + // Howl will always prune columns based on what we ask of it - so the + // response is true + return new RequiredFieldResponse(true); + } + + @Override + public void setUDFContextSignature(String signature) { + this.signature = signature; + } + + + // helper methods + protected void storeInUDFContext(String signature, String key, Object value) { + UDFContext udfContext = UDFContext.getUDFContext(); + Properties props = udfContext.getUDFProperties( + this.getClass(), new String[] {signature}); + props.put(key, value); + } + +} diff --git src/java/org/apache/hcatalog/pig/HCatBaseStorer.java src/java/org/apache/hcatalog/pig/HCatBaseStorer.java new file mode 100644 index 0000000..398b27e --- /dev/null +++ src/java/org/apache/hcatalog/pig/HCatBaseStorer.java @@ -0,0 +1,438 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.pig; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatFieldSchema.Type; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceStatistics; +import org.apache.pig.StoreFunc; +import org.apache.pig.StoreMetadata; +import org.apache.pig.backend.BackendException; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.parser.ParseException; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.impl.util.Utils; + +/** + * Base class for HCatStorer and HCatEximStorer + * + */ + +public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata { + + /** + * + */ + protected static final String COMPUTED_OUTPUT_SCHEMA = "howl.output.schema"; + protected final Map partitions; + protected Schema pigSchema; + private RecordWriter, HCatRecord> writer; + protected HCatSchema computedSchema; + protected static final String PIG_SCHEMA = "howl.pig.store.schema"; + protected String sign; + + public HCatBaseStorer(String partSpecs, String schema) throws ParseException, FrontendException { + + partitions = new HashMap(); + if(partSpecs != null && !partSpecs.trim().isEmpty()){ + String[] partKVPs = partSpecs.split(","); + for(String partKVP : partKVPs){ + String[] partKV = partKVP.split("="); + if(partKV.length == 2) { + partitions.put(partKV[0].trim(), partKV[1].trim()); + } else { + throw new FrontendException("Invalid partition column specification. "+partSpecs, PigHCatUtil.PIG_EXCEPTION_CODE); + } + } + } + + if(schema != null) { + pigSchema = Utils.getSchemaFromString(schema); + } + + } + + @Override + public void checkSchema(ResourceSchema resourceSchema) throws IOException { + + /* Schema provided by user and the schema computed by Pig + * at the time of calling store must match. + */ + Schema runtimeSchema = Schema.getPigSchema(resourceSchema); + if(pigSchema != null){ + if(! Schema.equals(runtimeSchema, pigSchema, false, true) ){ + throw new FrontendException("Schema provided in store statement doesn't match with the Schema" + + "returned by Pig run-time. Schema provided in HowlStorer: "+pigSchema.toString()+ " Schema received from Pig runtime: "+runtimeSchema.toString(), PigHCatUtil.PIG_EXCEPTION_CODE); + } + } else { + pigSchema = runtimeSchema; + } + UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).setProperty(PIG_SCHEMA,ObjectSerializer.serialize(pigSchema)); + } + + /** Constructs HCatSchema from pigSchema. Passed tableSchema is the existing + * schema of the table in metastore. + */ + protected HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException{ + + List fieldSchemas = new ArrayList(pigSchema.size()); + for(FieldSchema fSchema : pigSchema.getFields()){ + byte type = fSchema.type; + HCatFieldSchema howlFSchema; + + try { + + // Find out if we need to throw away the tuple or not. + if(type == DataType.BAG && removeTupleFromBag(tableSchema, fSchema)){ + List arrFields = new ArrayList(1); + arrFields.add(getHowlFSFromPigFS(fSchema.schema.getField(0).schema.getField(0), tableSchema)); + howlFSchema = new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), null); + } + else{ + howlFSchema = getHowlFSFromPigFS(fSchema, tableSchema); + } + fieldSchemas.add(howlFSchema); + } catch (HCatException he){ + throw new FrontendException(he.getMessage(),PigHCatUtil.PIG_EXCEPTION_CODE,he); + } + } + + return new HCatSchema(fieldSchemas); + } + + private void validateUnNested(Schema innerSchema) throws FrontendException{ + + for(FieldSchema innerField : innerSchema.getFields()){ + validateAlias(innerField.alias); + if(DataType.isComplex(innerField.type)) { + throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE); + } + } + } + + private boolean removeTupleFromBag(HCatSchema tableSchema, FieldSchema bagFieldSchema) throws HCatException{ + + String colName = bagFieldSchema.alias; + for(HCatFieldSchema field : tableSchema.getFields()){ + if(colName.equalsIgnoreCase(field.getName())){ + return (field.getArrayElementSchema().get(0).getType() == Type.STRUCT) ? false : true; + } + } + // Column was not found in table schema. Its a new column + List tupSchema = bagFieldSchema.schema.getFields(); + return (tupSchema.size() == 1 && tupSchema.get(0).schema == null) ? true : false; + } + + + private HCatFieldSchema getHowlFSFromPigFS(FieldSchema fSchema, HCatSchema hcatTblSchema) throws FrontendException, HCatException{ + + byte type = fSchema.type; + switch(type){ + + case DataType.CHARARRAY: + case DataType.BIGCHARARRAY: + return new HCatFieldSchema(fSchema.alias, Type.STRING, null); + + case DataType.INTEGER: + return new HCatFieldSchema(fSchema.alias, Type.INT, null); + + case DataType.LONG: + return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null); + + case DataType.FLOAT: + return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null); + + case DataType.DOUBLE: + return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null); + + case DataType.BAG: + Schema bagSchema = fSchema.schema; + List arrFields = new ArrayList(1); + arrFields.add(getHowlFSFromPigFS(bagSchema.getField(0), hcatTblSchema)); + return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), ""); + + case DataType.TUPLE: + List fieldNames = new ArrayList(); + List howlFSs = new ArrayList(); + for( FieldSchema fieldSchema : fSchema.schema.getFields()){ + fieldNames.add( fieldSchema.alias); + howlFSs.add(getHowlFSFromPigFS(fieldSchema, hcatTblSchema)); + } + return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(howlFSs), ""); + + case DataType.MAP:{ + // Pig's schema contain no type information about map's keys and + // values. So, if its a new column assume if its existing + // return whatever is contained in the existing column. + HCatFieldSchema mapField = getTableCol(fSchema.alias, hcatTblSchema); + HCatFieldSchema valFS; + List valFSList = new ArrayList(1); + + if(mapField != null){ + Type mapValType = mapField.getMapValueSchema().get(0).getType(); + + switch(mapValType){ + case STRING: + case BIGINT: + case INT: + case FLOAT: + case DOUBLE: + valFS = new HCatFieldSchema(fSchema.alias, mapValType, null); + break; + default: + throw new FrontendException("Only pig primitive types are supported as map value types.", PigHCatUtil.PIG_EXCEPTION_CODE); + } + valFSList.add(valFS); + return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),""); + } + + // Column not found in target table. Its a new column. Its schema is map + valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, ""); + valFSList.add(valFS); + return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),""); + } + + default: + throw new FrontendException("Unsupported type: "+type+" in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE); + } + } + + @Override + public void prepareToWrite(RecordWriter writer) throws IOException { + this.writer = writer; + computedSchema = (HCatSchema)ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).getProperty(COMPUTED_OUTPUT_SCHEMA)); + } + + @Override + public void putNext(Tuple tuple) throws IOException { + + List outgoing = new ArrayList(tuple.size()); + + int i = 0; + for(HCatFieldSchema fSchema : computedSchema.getFields()){ + outgoing.add(getJavaObj(tuple.get(i++), fSchema)); + } + try { + writer.write(null, new DefaultHCatRecord(outgoing)); + } catch (InterruptedException e) { + throw new BackendException("Error while writing tuple: "+tuple, PigHCatUtil.PIG_EXCEPTION_CODE, e); + } + } + + private Object getJavaObj(Object pigObj, HCatFieldSchema howlFS) throws ExecException, HCatException{ + + // The real work-horse. Spend time and energy in this method if there is + // need to keep HowlStorer lean and go fast. + Type type = howlFS.getType(); + + switch(type){ + + case STRUCT: + // Unwrap the tuple. + return ((Tuple)pigObj).getAll(); + // Tuple innerTup = (Tuple)pigObj; + // + // List innerList = new ArrayList(innerTup.size()); + // int i = 0; + // for(HowlTypeInfo structFieldTypeInfo : typeInfo.getAllStructFieldTypeInfos()){ + // innerList.add(getJavaObj(innerTup.get(i++), structFieldTypeInfo)); + // } + // return innerList; + case ARRAY: + // Unwrap the bag. + DataBag pigBag = (DataBag)pigObj; + HCatFieldSchema tupFS = howlFS.getArrayElementSchema().get(0); + boolean needTuple = tupFS.getType() == Type.STRUCT; + List bagContents = new ArrayList((int)pigBag.size()); + Iterator bagItr = pigBag.iterator(); + + while(bagItr.hasNext()){ + // If there is only one element in tuple contained in bag, we throw away the tuple. + bagContents.add(needTuple ? getJavaObj(bagItr.next(), tupFS) : bagItr.next().get(0)); + + } + return bagContents; + + // case MAP: + // Map pigMap = (Map)pigObj; + // Map typeMap = new HashMap(); + // for(Entry entry: pigMap.entrySet()){ + // typeMap.put(entry.getKey(), new Long(entry.getValue().toString())); + // } + // return typeMap; + default: + return pigObj; + } + } + + @Override + public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { + + // Need to necessarily override this method since default impl assumes HDFS + // based location string. + return location; + } + + @Override + public void setStoreFuncUDFContextSignature(String signature) { + sign = signature; + } + + + protected void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throws FrontendException, HCatException{ + + // Iterate through all the elements in Pig Schema and do validations as + // dictated by semantics, consult HCatSchema of table when need be. + + for(FieldSchema pigField : pigSchema.getFields()){ + byte type = pigField.type; + String alias = pigField.alias; + validateAlias(alias); + HCatFieldSchema howlField = getTableCol(alias, tblSchema); + + if(DataType.isComplex(type)){ + switch(type){ + + case DataType.MAP: + if(howlField != null){ + if(howlField.getMapKeyType() != Type.STRING){ + throw new FrontendException("Key Type of map must be String "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE); + } + if(howlField.getMapValueSchema().get(0).isComplex()){ + throw new FrontendException("Value type of map cannot be complex" + howlField, PigHCatUtil.PIG_EXCEPTION_CODE); + } + } + break; + + case DataType.BAG: + // Only map is allowed as complex type in tuples inside bag. + for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){ + if(innerField.type == DataType.BAG || innerField.type == DataType.TUPLE) { + throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE); + } + validateAlias(innerField.alias); + } + if(howlField != null){ + // Do the same validation for HCatSchema. + HCatFieldSchema arrayFieldScehma = howlField.getArrayElementSchema().get(0); + Type hType = arrayFieldScehma.getType(); + if(hType == Type.STRUCT){ + for(HCatFieldSchema structFieldInBag : arrayFieldScehma.getStructSubSchema().getFields()){ + if(structFieldInBag.getType() == Type.STRUCT || structFieldInBag.getType() == Type.ARRAY){ + throw new FrontendException("Nested Complex types not allowed "+ howlField, PigHCatUtil.PIG_EXCEPTION_CODE); + } + } + } + if(hType == Type.MAP){ + if(arrayFieldScehma.getMapKeyType() != Type.STRING){ + throw new FrontendException("Key Type of map must be String "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE); + } + if(arrayFieldScehma.getMapValueSchema().get(0).isComplex()){ + throw new FrontendException("Value type of map cannot be complex "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE); + } + } + if(hType == Type.ARRAY) { + throw new FrontendException("Arrays cannot contain array within it. "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE); + } + } + break; + + case DataType.TUPLE: + validateUnNested(pigField.schema); + if(howlField != null){ + for(HCatFieldSchema structFieldSchema : howlField.getStructSubSchema().getFields()){ + if(structFieldSchema.isComplex()){ + throw new FrontendException("Nested Complex types are not allowed."+howlField, PigHCatUtil.PIG_EXCEPTION_CODE); + } + } + } + break; + + default: + throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE); + } + } + } + + for(HCatFieldSchema howlField : tblSchema.getFields()){ + + // We dont do type promotion/demotion. + Type hType = howlField.getType(); + switch(hType){ + case SMALLINT: + case TINYINT: + case BOOLEAN: + throw new FrontendException("Incompatible type found in howl table schema: "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE); + } + } + } + + private void validateAlias(String alias) throws FrontendException{ + if(alias == null) { + throw new FrontendException("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.", PigHCatUtil.PIG_EXCEPTION_CODE); + } + if(alias.matches(".*[A-Z]+.*")) { + throw new FrontendException("Column names should all be in lowercase. Invalid name found: "+alias, PigHCatUtil.PIG_EXCEPTION_CODE); + } + } + + // Finds column by name in HCatSchema, if not found returns null. + private HCatFieldSchema getTableCol(String alias, HCatSchema tblSchema){ + + for(HCatFieldSchema howlField : tblSchema.getFields()){ + if(howlField.getName().equalsIgnoreCase(alias)){ + return howlField; + } + } + // Its a new column + return null; + } + + @Override + public void cleanupOnFailure(String location, Job job) throws IOException { + // No-op. + } + + @Override + public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException { + } +} diff --git src/java/org/apache/hcatalog/pig/HCatEximLoader.java src/java/org/apache/hcatalog/pig/HCatEximLoader.java new file mode 100644 index 0000000..a36f808 --- /dev/null +++ src/java/org/apache/hcatalog/pig/HCatEximLoader.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.pig; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatBaseInputFormat; +import org.apache.hcatalog.mapreduce.HCatEximInputFormat; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.pig.Expression; +import org.apache.pig.LoadFunc; +import org.apache.pig.ResourceSchema; +import org.apache.pig.impl.util.UDFContext; + +/** + * Pig {@link LoadFunc} to read data/metadata from hcatalog exported location + */ + +public class HCatEximLoader extends HCatBaseLoader { + + private static final Log LOG = LogFactory.getLog(HCatEximLoader.class); + + private HCatSchema tableSchema; + private HCatSchema partitionSchema; + private HCatEximInputFormat inputFormat; + + public HCatEximLoader() { + LOG.debug("HCatEximLoader ctored"); + } + + @Override + public ResourceSchema getSchema(String location, Job job) throws IOException { + LOG.debug("getSchema with location :" + location); + if (tableSchema == null) { + List rv = HCatEximInputFormat.setInput(job, location, null); + tableSchema = rv.get(0); + partitionSchema = rv.get(1); + } + LOG.debug("getSchema got schema :" + tableSchema.toString()); + List colsPlusPartKeys = new ArrayList(); + colsPlusPartKeys.addAll(tableSchema.getFields()); + colsPlusPartKeys.addAll(partitionSchema.getFields()); + outputSchema = new HCatSchema(colsPlusPartKeys); + return PigHCatUtil.getResourceSchema(outputSchema); + } + + @Override + public String[] getPartitionKeys(String location, Job job) throws IOException { + LOG.warn("getPartitionKeys with location :" + location); + /* + if (tableSchema == null) { + List rv = HCatEximInputFormat.setInput(job, location, null); + tableSchema = rv.get(0); + partitionSchema = rv.get(1); + } + return partitionSchema.getFieldNames().toArray(new String[0]); + */ + return null; + } + + @Override + public void setPartitionFilter(Expression partitionFilter) throws IOException { + LOG.debug("setPartitionFilter with filter :" + partitionFilter.toString()); + } + + @Override + public void setLocation(String location, Job job) throws IOException { + LOG.debug("setLocation with location :" + location); + List rv = HCatEximInputFormat.setInput(job, location, null); + tableSchema = rv.get(0); + partitionSchema = rv.get(1); + List colsPlusPartKeys = new ArrayList(); + colsPlusPartKeys.addAll(tableSchema.getFields()); + colsPlusPartKeys.addAll(partitionSchema.getFields()); + outputSchema = new HCatSchema(colsPlusPartKeys); + UDFContext udfContext = UDFContext.getUDFContext(); + Properties props = udfContext.getUDFProperties(this.getClass(), + new String[] {signature}); + RequiredFieldList requiredFieldsInfo = + (RequiredFieldList) props.get(PRUNE_PROJECTION_INFO); + if (requiredFieldsInfo != null) { + ArrayList fcols = new ArrayList(); + for (RequiredField rf : requiredFieldsInfo.getFields()) { + fcols.add(tableSchema.getFields().get(rf.getIndex())); + } + outputSchema = new HCatSchema(fcols); + try { + HCatBaseInputFormat.setOutputSchema(job, outputSchema); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + + @Override + public InputFormat getInputFormat() throws IOException { + if (inputFormat == null) { + inputFormat = new HCatEximInputFormat(); + } + return inputFormat; + } + +} diff --git src/java/org/apache/hcatalog/pig/HCatEximStorer.java src/java/org/apache/hcatalog/pig/HCatEximStorer.java new file mode 100644 index 0000000..74f030f --- /dev/null +++ src/java/org/apache/hcatalog/pig/HCatEximStorer.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.pig; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.mapreduce.HCatEximOutputCommitter; +import org.apache.hcatalog.mapreduce.HCatEximOutputFormat; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.pig.ResourceSchema; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.parser.ParseException; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.UDFContext; + +/** + * HCatEximStorer. + * + */ + +public class HCatEximStorer extends HCatBaseStorer { + + private static final Log LOG = LogFactory.getLog(HCatEximStorer.class); + + private final String outputLocation; + + public HCatEximStorer(String outputLocation) throws FrontendException, ParseException { + this(outputLocation, null, null); + } + + public HCatEximStorer(String outputLocation, String partitionSpec) throws FrontendException, + ParseException { + this(outputLocation, partitionSpec, null); + } + + public HCatEximStorer(String outputLocation, String partitionSpec, String schema) + throws FrontendException, ParseException { + super(partitionSpec, schema); + this.outputLocation = outputLocation; + LOG.debug("HCatEximStorer called"); + } + + @Override + public OutputFormat getOutputFormat() throws IOException { + LOG.debug("getOutputFormat called"); + return new HCatEximOutputFormat(); + } + + @Override + public void setStoreLocation(String location, Job job) throws IOException { + LOG.debug("setStoreLocation called with :" + location); + String[] userStr = location.split("\\."); + String dbname = MetaStoreUtils.DEFAULT_DATABASE_NAME; + String tablename = null; + if (userStr.length == 2) { + dbname = userStr[0]; + tablename = userStr[1]; + } else { + tablename = userStr[0]; + } + Properties p = UDFContext.getUDFContext() + .getUDFProperties(this.getClass(), new String[] {sign}); + Configuration config = job.getConfiguration(); + if (!HCatUtil.checkJobContextIfRunningFromBackend(job)) { + Schema schema = (Schema) ObjectSerializer.deserialize(p.getProperty(PIG_SCHEMA)); + if (schema != null) { + pigSchema = schema; + } + if (pigSchema == null) { + throw new FrontendException("Schema for data cannot be determined.", + PigHCatUtil.PIG_EXCEPTION_CODE); + } + HCatSchema hcatTblSchema = new HCatSchema(new ArrayList()); + try { + doSchemaValidations(pigSchema, hcatTblSchema); + } catch (HCatException he) { + throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he); + } + + List hcatFields = new ArrayList(); + List partVals = new ArrayList(); + for (String key : partitions.keySet()) { + hcatFields.add(new HCatFieldSchema(key, HCatFieldSchema.Type.STRING, "")); + partVals.add(partitions.get(key)); + } + + HCatSchema outputSchema = convertPigSchemaToHCatSchema(pigSchema, + hcatTblSchema); + LOG.debug("Pig Schema '" + pigSchema.toString() + "' was converted to HCatSchema '" + + outputSchema); + HCatEximOutputFormat.setOutput(job, + dbname, tablename, + outputLocation, + new HCatSchema(hcatFields), + partVals, + outputSchema); + p.setProperty(COMPUTED_OUTPUT_SCHEMA, ObjectSerializer.serialize(outputSchema)); + p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO, + config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); + if (config.get(HCatConstants.HCAT_KEY_HIVE_CONF) != null) { + p.setProperty(HCatConstants.HCAT_KEY_HIVE_CONF, + config.get(HCatConstants.HCAT_KEY_HIVE_CONF)); + } + } else { + config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, + p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO)); + if (p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF) != null) { + config.set(HCatConstants.HCAT_KEY_HIVE_CONF, + p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF)); + } + } + } + + @Override + public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException { + if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) { + //In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob. + //Calling it from here so that the partition publish happens. + //This call needs to be removed after MAPREDUCE-1447 is fixed. + new HCatEximOutputCommitter(null).cleanupJob(job); + } + } +} diff --git src/java/org/apache/hcatalog/pig/HCatLoader.java src/java/org/apache/hcatalog/pig/HCatLoader.java index a532f93..c3776e5 100644 --- src/java/org/apache/hcatalog/pig/HCatLoader.java +++ src/java/org/apache/hcatalog/pig/HCatLoader.java @@ -18,7 +18,6 @@ package org.apache.hcatalog.pig; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -27,48 +26,34 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatUtil; -import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.Pair; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.mapreduce.HCatInputFormat; import org.apache.hcatalog.mapreduce.HCatTableInfo; import org.apache.pig.Expression; +import org.apache.pig.Expression.BinaryExpression; import org.apache.pig.LoadFunc; -import org.apache.pig.LoadMetadata; -import org.apache.pig.LoadPushDown; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; -import org.apache.pig.ResourceStatistics; -import org.apache.pig.Expression.BinaryExpression; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; -import org.apache.pig.data.Tuple; -import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.util.UDFContext; /** * Pig {@link LoadFunc} to read data from Howl */ -public class HCatLoader extends LoadFunc implements LoadMetadata, LoadPushDown { +public class HCatLoader extends HCatBaseLoader { - private static final String PRUNE_PROJECTION_INFO = "prune.projection.info"; private static final String PARTITION_FILTER = "partition.filter"; // for future use private HCatInputFormat howlInputFormat = null; - private RecordReader reader; private String dbName; private String tableName; private String howlServerUri; - private String signature; private String partitionFilterString; private final PigHCatUtil phutil = new PigHCatUtil(); - HCatSchema outputSchema = null; - @Override public InputFormat getInputFormat() throws IOException { if(howlInputFormat == null) { @@ -78,34 +63,6 @@ public class HCatLoader extends LoadFunc implements LoadMetadata, LoadPushDown { } @Override - public Tuple getNext() throws IOException { - try { - HCatRecord hr = (HCatRecord) (reader.nextKeyValue() ? reader.getCurrentValue() : null); - Tuple t = PigHCatUtil.transformToTuple(hr,outputSchema); - // TODO : we were discussing an iter interface, and also a LazyTuple - // change this when plans for that solidifies. - return t; - } catch (ExecException e) { - int errCode = 6018; - String errMsg = "Error while reading input"; - throw new ExecException(errMsg, errCode, - PigException.REMOTE_ENVIRONMENT, e); - } catch (Exception eOther){ - int errCode = 6018; - String errMsg = "Error converting read value to tuple"; - throw new ExecException(errMsg, errCode, - PigException.REMOTE_ENVIRONMENT, eOther); - } - - } - - @SuppressWarnings("unchecked") - @Override - public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException { - this.reader = reader; - } - - @Override public String relativeToAbsolutePath(String location, Path curDir) throws IOException { return location; } @@ -207,12 +164,6 @@ public class HCatLoader extends LoadFunc implements LoadMetadata, LoadPushDown { } @Override - public ResourceStatistics getStatistics(String location, Job job) throws IOException { - // statistics not implemented currently - return null; - } - - @Override public void setPartitionFilter(Expression partitionFilter) throws IOException { // convert the partition filter expression into a string expected by // howl and pass it in setLocation() @@ -224,37 +175,6 @@ public class HCatLoader extends LoadFunc implements LoadMetadata, LoadPushDown { PARTITION_FILTER, partitionFilterString); } - @Override - public List getFeatures() { - return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION); - } - - @Override - public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldsInfo) throws FrontendException { - // Store the required fields information in the UDFContext so that we - // can retrieve it later. - storeInUDFContext(signature, PRUNE_PROJECTION_INFO, requiredFieldsInfo); - - // Howl will always prune columns based on what we ask of it - so the - // response is true - return new RequiredFieldResponse(true); - } - - @Override - public void setUDFContextSignature(String signature) { - this.signature = signature; - } - - - // helper methods - private void storeInUDFContext(String signature, String key, Object value) { - UDFContext udfContext = UDFContext.getUDFContext(); - Properties props = udfContext.getUDFProperties( - this.getClass(), new String[] {signature}); - props.put(key, value); - } - - private String getPartitionFilterString() { if(partitionFilterString == null) { Properties props = UDFContext.getUDFContext().getUDFProperties( diff --git src/java/org/apache/hcatalog/pig/HCatStorer.java src/java/org/apache/hcatalog/pig/HCatStorer.java index 87e2a51..0778167 100644 --- src/java/org/apache/hcatalog/pig/HCatStorer.java +++ src/java/org/apache/hcatalog/pig/HCatStorer.java @@ -19,85 +19,39 @@ package org.apache.hcatalog.pig; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.common.HCatUtil; -import org.apache.hcatalog.data.DefaultHCatRecord; -import org.apache.hcatalog.data.HCatRecord; -import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; -import org.apache.hcatalog.data.schema.HCatFieldSchema.Type; import org.apache.hcatalog.mapreduce.HCatOutputCommitter; import org.apache.hcatalog.mapreduce.HCatOutputFormat; import org.apache.hcatalog.mapreduce.HCatTableInfo; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; -import org.apache.pig.ResourceStatistics; -import org.apache.pig.StoreFunc; -import org.apache.pig.StoreMetadata; -import org.apache.pig.backend.BackendException; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.parser.ParseException; import org.apache.pig.impl.logicalLayer.schema.Schema; -import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.UDFContext; -import org.apache.pig.impl.util.Utils; /** * HowlStorer. * */ -public class HCatStorer extends StoreFunc implements StoreMetadata { +public class HCatStorer extends HCatBaseStorer { /** * */ - private static final String COMPUTED_OUTPUT_SCHEMA = "howl.output.schema"; - private final Map partitions; - private Schema pigSchema; - private RecordWriter, HCatRecord> writer; - private HCatSchema computedSchema; - private static final String PIG_SCHEMA = "howl.pig.store.schema"; - private String sign; public HCatStorer(String partSpecs, String schema) throws ParseException, FrontendException { - - partitions = new HashMap(); - if(partSpecs != null && !partSpecs.trim().isEmpty()){ - String[] partKVPs = partSpecs.split(","); - for(String partKVP : partKVPs){ - String[] partKV = partKVP.split("="); - if(partKV.length == 2) { - partitions.put(partKV[0].trim(), partKV[1].trim()); - } else { - throw new FrontendException("Invalid partition column specification. "+partSpecs, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - } - - if(schema != null) { - pigSchema = Utils.getSchemaFromString(schema); - } - + super(partSpecs, schema); } public HCatStorer(String partSpecs) throws ParseException, FrontendException { @@ -109,353 +63,11 @@ public class HCatStorer extends StoreFunc implements StoreMetadata { } @Override - public void checkSchema(ResourceSchema resourceSchema) throws IOException { - - /* Schema provided by user and the schema computed by Pig - * at the time of calling store must match. - */ - Schema runtimeSchema = Schema.getPigSchema(resourceSchema); - if(pigSchema != null){ - if(! Schema.equals(runtimeSchema, pigSchema, false, true) ){ - throw new FrontendException("Schema provided in store statement doesn't match with the Schema" + - "returned by Pig run-time. Schema provided in HowlStorer: "+pigSchema.toString()+ " Schema received from Pig runtime: "+runtimeSchema.toString(), PigHCatUtil.PIG_EXCEPTION_CODE); - } - } else { - pigSchema = runtimeSchema; - } - UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).setProperty(PIG_SCHEMA,ObjectSerializer.serialize(pigSchema)); - } - - /** Constructs HCatSchema from pigSchema. Passed tableSchema is the existing - * schema of the table in metastore. - */ - private HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException{ - - List fieldSchemas = new ArrayList(pigSchema.size()); - for(FieldSchema fSchema : pigSchema.getFields()){ - byte type = fSchema.type; - HCatFieldSchema howlFSchema; - - try { - - // Find out if we need to throw away the tuple or not. - if(type == DataType.BAG && removeTupleFromBag(tableSchema, fSchema)){ - List arrFields = new ArrayList(1); - arrFields.add(getHowlFSFromPigFS(fSchema.schema.getField(0).schema.getField(0))); - howlFSchema = new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), null); - } - else{ - howlFSchema = getHowlFSFromPigFS(fSchema); - } - fieldSchemas.add(howlFSchema); - } catch (HCatException he){ - throw new FrontendException(he.getMessage(),PigHCatUtil.PIG_EXCEPTION_CODE,he); - } - } - - return new HCatSchema(fieldSchemas); - } - - private void validateUnNested(Schema innerSchema) throws FrontendException{ - - for(FieldSchema innerField : innerSchema.getFields()){ - validateAlias(innerField.alias); - if(DataType.isComplex(innerField.type)) { - throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - } - - private boolean removeTupleFromBag(HCatSchema tableSchema, FieldSchema bagFieldSchema) throws HCatException{ - - String colName = bagFieldSchema.alias; - for(HCatFieldSchema field : tableSchema.getFields()){ - if(colName.equalsIgnoreCase(field.getName())){ - return (field.getArrayElementSchema().get(0).getType() == Type.STRUCT) ? false : true; - } - } - // Column was not found in table schema. Its a new column - List tupSchema = bagFieldSchema.schema.getFields(); - return (tupSchema.size() == 1 && tupSchema.get(0).schema == null) ? true : false; - } - - - private HCatFieldSchema getHowlFSFromPigFS(FieldSchema fSchema) throws FrontendException, HCatException{ - - byte type = fSchema.type; - switch(type){ - - case DataType.CHARARRAY: - case DataType.BIGCHARARRAY: - return new HCatFieldSchema(fSchema.alias, Type.STRING, null); - - case DataType.INTEGER: - return new HCatFieldSchema(fSchema.alias, Type.INT, null); - - case DataType.LONG: - return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null); - - case DataType.FLOAT: - return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null); - - case DataType.DOUBLE: - return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null); - - case DataType.BAG: - Schema bagSchema = fSchema.schema; - List arrFields = new ArrayList(1); - arrFields.add(getHowlFSFromPigFS(bagSchema.getField(0))); - return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), ""); - - case DataType.TUPLE: - List fieldNames = new ArrayList(); - List howlFSs = new ArrayList(); - for( FieldSchema fieldSchema : fSchema.schema.getFields()){ - fieldNames.add( fieldSchema.alias); - howlFSs.add(getHowlFSFromPigFS(fieldSchema)); - } - return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(howlFSs), ""); - - case DataType.MAP:{ - // Pig's schema contain no type information about map's keys and - // values. So, if its a new column assume if its existing - // return whatever is contained in the existing column. - HCatFieldSchema mapField = getTableCol(fSchema.alias, howlTblSchema); - HCatFieldSchema valFS; - List valFSList = new ArrayList(1); - - if(mapField != null){ - Type mapValType = mapField.getMapValueSchema().get(0).getType(); - - switch(mapValType){ - case STRING: - case BIGINT: - case INT: - case FLOAT: - case DOUBLE: - valFS = new HCatFieldSchema(fSchema.alias, mapValType, null); - break; - default: - throw new FrontendException("Only pig primitive types are supported as map value types.", PigHCatUtil.PIG_EXCEPTION_CODE); - } - valFSList.add(valFS); - return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),""); - } - - // Column not found in target table. Its a new column. Its schema is map - valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, ""); - valFSList.add(valFS); - return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),""); - } - - default: - throw new FrontendException("Unsupported type: "+type+" in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - - @Override public OutputFormat getOutputFormat() throws IOException { return new HCatOutputFormat(); } @Override - public void prepareToWrite(RecordWriter writer) throws IOException { - this.writer = writer; - computedSchema = (HCatSchema)ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).getProperty(COMPUTED_OUTPUT_SCHEMA)); - } - - @Override - public void putNext(Tuple tuple) throws IOException { - - List outgoing = new ArrayList(tuple.size()); - - int i = 0; - for(HCatFieldSchema fSchema : computedSchema.getFields()){ - outgoing.add(getJavaObj(tuple.get(i++), fSchema)); - } - try { - writer.write(null, new DefaultHCatRecord(outgoing)); - } catch (InterruptedException e) { - throw new BackendException("Error while writing tuple: "+tuple, PigHCatUtil.PIG_EXCEPTION_CODE, e); - } - } - - private Object getJavaObj(Object pigObj, HCatFieldSchema howlFS) throws ExecException, HCatException{ - - // The real work-horse. Spend time and energy in this method if there is - // need to keep HowlStorer lean and go fast. - Type type = howlFS.getType(); - - switch(type){ - - case STRUCT: - // Unwrap the tuple. - return ((Tuple)pigObj).getAll(); - // Tuple innerTup = (Tuple)pigObj; - // - // List innerList = new ArrayList(innerTup.size()); - // int i = 0; - // for(HowlTypeInfo structFieldTypeInfo : typeInfo.getAllStructFieldTypeInfos()){ - // innerList.add(getJavaObj(innerTup.get(i++), structFieldTypeInfo)); - // } - // return innerList; - case ARRAY: - // Unwrap the bag. - DataBag pigBag = (DataBag)pigObj; - HCatFieldSchema tupFS = howlFS.getArrayElementSchema().get(0); - boolean needTuple = tupFS.getType() == Type.STRUCT; - List bagContents = new ArrayList((int)pigBag.size()); - Iterator bagItr = pigBag.iterator(); - - while(bagItr.hasNext()){ - // If there is only one element in tuple contained in bag, we throw away the tuple. - bagContents.add(needTuple ? getJavaObj(bagItr.next(), tupFS) : bagItr.next().get(0)); - - } - return bagContents; - - // case MAP: - // Map pigMap = (Map)pigObj; - // Map typeMap = new HashMap(); - // for(Entry entry: pigMap.entrySet()){ - // typeMap.put(entry.getKey(), new Long(entry.getValue().toString())); - // } - // return typeMap; - default: - return pigObj; - } - } - - @Override - public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { - - // Need to necessarily override this method since default impl assumes HDFS - // based location string. - return location; - } - - @Override - public void setStoreFuncUDFContextSignature(String signature) { - sign = signature; - } - - - private void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throws FrontendException, HCatException{ - - // Iterate through all the elements in Pig Schema and do validations as - // dictated by semantics, consult HCatSchema of table when need be. - - for(FieldSchema pigField : pigSchema.getFields()){ - byte type = pigField.type; - String alias = pigField.alias; - validateAlias(alias); - HCatFieldSchema howlField = getTableCol(alias, tblSchema); - - if(DataType.isComplex(type)){ - switch(type){ - - case DataType.MAP: - if(howlField != null){ - if(howlField.getMapKeyType() != Type.STRING){ - throw new FrontendException("Key Type of map must be String "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - if(howlField.getMapValueSchema().get(0).isComplex()){ - throw new FrontendException("Value type of map cannot be complex" + howlField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - break; - - case DataType.BAG: - // Only map is allowed as complex type in tuples inside bag. - for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){ - if(innerField.type == DataType.BAG || innerField.type == DataType.TUPLE) { - throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - validateAlias(innerField.alias); - } - if(howlField != null){ - // Do the same validation for HCatSchema. - HCatFieldSchema arrayFieldScehma = howlField.getArrayElementSchema().get(0); - Type hType = arrayFieldScehma.getType(); - if(hType == Type.STRUCT){ - for(HCatFieldSchema structFieldInBag : arrayFieldScehma.getStructSubSchema().getFields()){ - if(structFieldInBag.getType() == Type.STRUCT || structFieldInBag.getType() == Type.ARRAY){ - throw new FrontendException("Nested Complex types not allowed "+ howlField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - } - if(hType == Type.MAP){ - if(arrayFieldScehma.getMapKeyType() != Type.STRING){ - throw new FrontendException("Key Type of map must be String "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - if(arrayFieldScehma.getMapValueSchema().get(0).isComplex()){ - throw new FrontendException("Value type of map cannot be complex "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - if(hType == Type.ARRAY) { - throw new FrontendException("Arrays cannot contain array within it. "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - break; - - case DataType.TUPLE: - validateUnNested(pigField.schema); - if(howlField != null){ - for(HCatFieldSchema structFieldSchema : howlField.getStructSubSchema().getFields()){ - if(structFieldSchema.isComplex()){ - throw new FrontendException("Nested Complex types are not allowed."+howlField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - } - break; - - default: - throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - } - - for(HCatFieldSchema howlField : tblSchema.getFields()){ - - // We dont do type promotion/demotion. - Type hType = howlField.getType(); - switch(hType){ - case SMALLINT: - case TINYINT: - case BOOLEAN: - throw new FrontendException("Incompatible type found in howl table schema: "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - } - - private void validateAlias(String alias) throws FrontendException{ - if(alias == null) { - throw new FrontendException("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.", PigHCatUtil.PIG_EXCEPTION_CODE); - } - if(alias.matches(".*[A-Z]+.*")) { - throw new FrontendException("Column names should all be in lowercase. Invalid name found: "+alias, PigHCatUtil.PIG_EXCEPTION_CODE); - } - } - - // Finds column by name in HCatSchema, if not found returns null. - private HCatFieldSchema getTableCol(String alias, HCatSchema tblSchema){ - - for(HCatFieldSchema howlField : tblSchema.getFields()){ - if(howlField.getName().equalsIgnoreCase(alias)){ - return howlField; - } - } - // Its a new column - return null; - } - HCatSchema howlTblSchema; - - @Override - public void cleanupOnFailure(String location, Job job) throws IOException { - // No-op. - } - - @Override public void setStoreLocation(String location, Job job) throws IOException { Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}); @@ -489,7 +101,7 @@ public class HCatStorer extends StoreFunc implements StoreMetadata { // information passed to HCatOutputFormat was not right throw new PigException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he); } - howlTblSchema = HCatOutputFormat.getTableSchema(job); + HCatSchema howlTblSchema = HCatOutputFormat.getTableSchema(job); try{ doSchemaValidations(pigSchema, howlTblSchema); } catch(HCatException he){ @@ -528,8 +140,4 @@ public class HCatStorer extends StoreFunc implements StoreMetadata { new HCatOutputCommitter(null).cleanupJob(job); } } - - @Override - public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException { - } } diff --git src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java new file mode 100644 index 0000000..61ae4ac --- /dev/null +++ src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java @@ -0,0 +1,428 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hcatalog.mapreduce.TestHCatEximInputFormat.TestImport.EmpDetails; + +/** + * + * TestHCatEximInputFormat. tests primarily HCatEximInputFormat but + * also HCatEximOutputFormat. + * + */ +public class TestHCatEximInputFormat extends TestCase { + + public static class TestExport extends + org.apache.hadoop.mapreduce.Mapper { + + private HCatSchema recordSchema; + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + recordSchema = HCatEximOutputFormat.getTableSchema(context); + } + + @Override + public void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + String[] cols = value.toString().split(","); + HCatRecord record = new DefaultHCatRecord(recordSchema.size()); + record.setInteger("emp_id", recordSchema, Integer.parseInt(cols[0])); + record.setString("emp_name", recordSchema, cols[1]); + record.setString("emp_dob", recordSchema, cols[2]); + record.setString("emp_sex", recordSchema, cols[3]); + context.write(key, record); + } + } + + public static class TestImport extends + org.apache.hadoop.mapreduce.Mapper< + org.apache.hadoop.io.LongWritable, HCatRecord, + org.apache.hadoop.io.Text, + org.apache.hadoop.io.Text> { + + private HCatSchema recordSchema; + + public static class EmpDetails { + public String emp_name; + public String emp_dob; + public String emp_sex; + public String emp_country; + public String emp_state; + } + + public static Map empRecords = new TreeMap(); + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + try { + recordSchema = HCatBaseInputFormat.getOutputSchema(context); + } catch (Exception e) { + throw new IOException("Error getting outputschema from job configuration", e); + } + System.out.println("RecordSchema : " + recordSchema.toString()); + } + + @Override + public void map(LongWritable key, HCatRecord value, Context context) + throws IOException, InterruptedException { + EmpDetails empDetails = new EmpDetails(); + Integer emp_id = value.getInteger("emp_id", recordSchema); + String emp_name = value.getString("emp_name", recordSchema); + empDetails.emp_name = emp_name; + if (recordSchema.getPosition("emp_dob") != null) { + empDetails.emp_dob = value.getString("emp_dob", recordSchema); + } + if (recordSchema.getPosition("emp_sex") != null) { + empDetails.emp_sex = value.getString("emp_sex", recordSchema); + } + if (recordSchema.getPosition("emp_country") != null) { + empDetails.emp_country = value.getString("emp_country", recordSchema); + } + if (recordSchema.getPosition("emp_state") != null) { + empDetails.emp_state = value.getString("emp_state", recordSchema); + } + empRecords.put(emp_id, empDetails); + } + } + + private static final String dbName = "hcatEximOutputFormatTestDB"; + private static final String tblName = "hcatEximOutputFormatTestTable"; + Configuration conf; + Job job; + List columns; + HCatSchema schema; + FileSystem fs; + Path inputLocation; + Path outputLocation; + private HCatSchema partSchema; + + + @Override + protected void setUp() throws Exception { + System.out.println("Setup started"); + super.setUp(); + conf = new Configuration(); + job = new Job(conf, "test eximinputformat"); + columns = new ArrayList(); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", + Constants.INT_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", + Constants.STRING_TYPE_NAME, ""))); + schema = new HCatSchema(columns); + + fs = new LocalFileSystem(); + fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration()); + inputLocation = new Path(fs.getWorkingDirectory(), "tmp/exports"); + outputLocation = new Path(fs.getWorkingDirectory(), "tmp/data"); + + job.setJarByClass(this.getClass()); + job.setNumReduceTasks(0); + System.out.println("Setup done"); + } + + private void setupMRExport(String[] records) throws IOException { + if (fs.exists(outputLocation)) { + fs.delete(outputLocation, true); + } + FSDataOutputStream ds = fs.create(outputLocation, true); + for (String record : records) { + ds.writeBytes(record); + } + ds.close(); + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(HCatEximOutputFormat.class); + TextInputFormat.setInputPaths(job, outputLocation); + job.setMapperClass(TestExport.class); + } + + private void setupMRImport() throws IOException { + if (fs.exists(outputLocation)) { + fs.delete(outputLocation, true); + } + job.setInputFormatClass(HCatEximInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + TextOutputFormat.setOutputPath(job, outputLocation); + job.setMapperClass(TestImport.class); + TestImport.empRecords.clear(); + } + + + @Override + protected void tearDown() throws Exception { + System.out.println("Teardown started"); + super.tearDown(); + // fs.delete(inputLocation, true); + // fs.delete(outputLocation, true); + System.out.println("Teardown done"); + } + + + private void runNonPartExport() throws IOException, InterruptedException, ClassNotFoundException { + if (fs.exists(inputLocation)) { + fs.delete(inputLocation, true); + } + setupMRExport(new String[] { + "237,Krishna,01/01/1990,M,IN,TN\n", + "238,Kalpana,01/01/2000,F,IN,KA\n", + "239,Satya,01/01/2001,M,US,TN\n", + "240,Kavya,01/01/2002,F,US,KA\n" + + }); + HCatEximOutputFormat.setOutput( + job, + dbName, + tblName, + inputLocation.toString(), + null, + null, + schema); + + job.waitForCompletion(true); + HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null); + committer.cleanupJob(job); + } + + private void runPartExport(String record, String country, String state) throws IOException, InterruptedException, ClassNotFoundException { + setupMRExport(new String[] {record}); + List partValues = new ArrayList(2); + partValues.add(country); + partValues.add(state); + HCatEximOutputFormat.setOutput( + job, + dbName, + tblName, + inputLocation.toString(), + partSchema , + partValues , + schema); + + job.waitForCompletion(true); + HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null); + committer.cleanupJob(job); + } + + public void testNonPart() throws Exception { + try { + runNonPartExport(); + setUp(); + setupMRImport(); + HCatEximInputFormat.setInput(job, "tmp/exports", null); + job.waitForCompletion(true); + + assertEquals(4, TestImport.empRecords.size()); + assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", null, null); + assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", null, null); + assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", null, null); + assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", null, null); + } catch (Exception e) { + System.out.println("Test failed with " + e.getMessage()); + e.printStackTrace(); + throw e; + } + } + + public void testNonPartProjection() throws Exception { + try { + + runNonPartExport(); + setUp(); + setupMRImport(); + HCatEximInputFormat.setInput(job, "tmp/exports", null); + + List readColumns = new ArrayList(); + readColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", + Constants.INT_TYPE_NAME, ""))); + readColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", + Constants.STRING_TYPE_NAME, ""))); + + HCatEximInputFormat.setOutputSchema(job, new HCatSchema(readColumns)); + job.waitForCompletion(true); + + assertEquals(4, TestImport.empRecords.size()); + assertEmpDetail(TestImport.empRecords.get(237), "Krishna", null, null, null, null); + assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", null, null, null, null); + assertEmpDetail(TestImport.empRecords.get(239), "Satya", null, null, null, null); + assertEmpDetail(TestImport.empRecords.get(240), "Kavya", null, null, null, null); + } catch (Exception e) { + System.out.println("Test failed with " + e.getMessage()); + e.printStackTrace(); + throw e; + } + } + + public void testPart() throws Exception { + try { + if (fs.exists(inputLocation)) { + fs.delete(inputLocation, true); + } + + List partKeys = new ArrayList(2); + partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, "")); + partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, "")); + partSchema = new HCatSchema(partKeys); + + runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn"); + setUp(); + runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka"); + setUp(); + runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn"); + setUp(); + runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka"); + + setUp(); + setupMRImport(); + HCatEximInputFormat.setInput(job, "tmp/exports", null); + job.waitForCompletion(true); + + assertEquals(4, TestImport.empRecords.size()); + assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", "in", "tn"); + assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka"); + assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", "us", "tn"); + assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka"); + } catch (Exception e) { + System.out.println("Test failed with " + e.getMessage()); + e.printStackTrace(); + throw e; + } + } + + public void testPartWithPartCols() throws Exception { + try { + if (fs.exists(inputLocation)) { + fs.delete(inputLocation, true); + } + + List partKeys = new ArrayList(2); + partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, "")); + partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, "")); + partSchema = new HCatSchema(partKeys); + + runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn"); + setUp(); + runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka"); + setUp(); + runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn"); + setUp(); + runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka"); + + setUp(); + setupMRImport(); + HCatEximInputFormat.setInput(job, "tmp/exports", null); + + List colsPlusPartKeys = new ArrayList(); + colsPlusPartKeys.addAll(columns); + colsPlusPartKeys.addAll(partKeys); + + HCatBaseInputFormat.setOutputSchema(job, new HCatSchema(colsPlusPartKeys)); + job.waitForCompletion(true); + + assertEquals(4, TestImport.empRecords.size()); + assertEmpDetail(TestImport.empRecords.get(237), "Krishna", "01/01/1990", "M", "in", "tn"); + assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka"); + assertEmpDetail(TestImport.empRecords.get(239), "Satya", "01/01/2001", "M", "us", "tn"); + assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka"); + } catch (Exception e) { + System.out.println("Test failed with " + e.getMessage()); + e.printStackTrace(); + throw e; + } + } + + + public void testPartSelection() throws Exception { + try { + if (fs.exists(inputLocation)) { + fs.delete(inputLocation, true); + } + + List partKeys = new ArrayList(2); + partKeys.add(new HCatFieldSchema("emp_country", HCatFieldSchema.Type.STRING, "")); + partKeys.add(new HCatFieldSchema("emp_state", HCatFieldSchema.Type.STRING, "")); + partSchema = new HCatSchema(partKeys); + + runPartExport("237,Krishna,01/01/1990,M,IN,TN", "in", "tn"); + setUp(); + runPartExport("238,Kalpana,01/01/2000,F,IN,KA\n", "in", "ka"); + setUp(); + runPartExport("239,Satya,01/01/2001,M,US,TN\n", "us", "tn"); + setUp(); + runPartExport("240,Kavya,01/01/2002,F,US,KA\n", "us", "ka"); + + setUp(); + setupMRImport(); + Map filter = new TreeMap(); + filter.put("emp_state", "ka"); + HCatEximInputFormat.setInput(job, "tmp/exports", filter); + job.waitForCompletion(true); + + assertEquals(2, TestImport.empRecords.size()); + assertEmpDetail(TestImport.empRecords.get(238), "Kalpana", "01/01/2000", "F", "in", "ka"); + assertEmpDetail(TestImport.empRecords.get(240), "Kavya", "01/01/2002", "F", "us", "ka"); + } catch (Exception e) { + System.out.println("Test failed with " + e.getMessage()); + e.printStackTrace(); + throw e; + } + } + + + private void assertEmpDetail(EmpDetails empDetails, String name, String dob, String mf, String country, String state) { + assertNotNull(empDetails); + assertEquals(name, empDetails.emp_name); + assertEquals(dob, empDetails.emp_dob); + assertEquals(mf, empDetails.emp_sex); + assertEquals(country, empDetails.emp_country); + assertEquals(state, empDetails.emp_state); + } + +} diff --git src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java new file mode 100644 index 0000000..b2e1184 --- /dev/null +++ src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; + +/** + * + * TestHCatEximOutputFormat. Some basic testing here. More testing done via + * TestHCatEximInputFormat + * + */ +public class TestHCatEximOutputFormat extends TestCase { + + public static class TestMap extends + Mapper { + + private HCatSchema recordSchema; + + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); + recordSchema = HCatEximOutputFormat.getTableSchema(context); + System.out.println("TestMap/setup called"); + } + + @Override + public void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + String[] cols = value.toString().split(","); + HCatRecord record = new DefaultHCatRecord(recordSchema.size()); + System.out.println("TestMap/map called. Cols[0]:" + cols[0]); + System.out.println("TestMap/map called. Cols[1]:" + cols[1]); + System.out.println("TestMap/map called. Cols[2]:" + cols[2]); + System.out.println("TestMap/map called. Cols[3]:" + cols[3]); + record.setInteger("emp_id", recordSchema, Integer.parseInt(cols[0])); + record.setString("emp_name", recordSchema, cols[1]); + record.setString("emp_dob", recordSchema, cols[2]); + record.setString("emp_sex", recordSchema, cols[3]); + context.write(key, record); + } + } + + + private static final String dbName = "hcatEximOutputFormatTestDB"; + private static final String tblName = "hcatEximOutputFormatTestTable"; + Configuration conf; + Job job; + List columns; + HCatSchema schema; + FileSystem fs; + Path outputLocation; + Path dataLocation; + + public void testNonPart() throws Exception { + try { + HCatEximOutputFormat.setOutput( + job, + dbName, + tblName, + outputLocation.toString(), + null, + null, + schema); + + job.waitForCompletion(true); + HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null); + committer.cleanupJob(job); + + Path metadataPath = new Path(outputLocation, "_metadata"); + Map.Entry> rv = EximUtil.readMetaData(fs, metadataPath); + Table table = rv.getKey(); + List partitions = rv.getValue(); + + assertEquals(dbName, table.getDbName()); + assertEquals(tblName, table.getTableName()); + assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), + HCatUtil.getFieldSchemaList(columns))); + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", + table.getSd().getInputFormat()); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", + table.getSd().getOutputFormat()); + assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", + table.getSd().getSerdeInfo().getSerializationLib()); + assertEquals(0, table.getPartitionKeys().size()); + + assertEquals(0, partitions.size()); + } catch (Exception e) { + System.out.println("Test failed with " + e.getMessage()); + e.printStackTrace(); + throw e; + } + + } + + public void testPart() throws Exception { + try { + List partKeys = new ArrayList(); + partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_country", + Constants.STRING_TYPE_NAME, ""))); + partKeys.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_state", + Constants.STRING_TYPE_NAME, ""))); + HCatSchema partitionSchema = new HCatSchema(partKeys); + + List partitionVals = new ArrayList(); + partitionVals.add("IN"); + partitionVals.add("TN"); + + HCatEximOutputFormat.setOutput( + job, + dbName, + tblName, + outputLocation.toString(), + partitionSchema, + partitionVals, + schema); + + job.waitForCompletion(true); + HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null); + committer.cleanupJob(job); + Path metadataPath = new Path(outputLocation, "_metadata"); + Map.Entry> rv = EximUtil.readMetaData(fs, metadataPath); + Table table = rv.getKey(); + List partitions = rv.getValue(); + + assertEquals(dbName, table.getDbName()); + assertEquals(tblName, table.getTableName()); + assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), + HCatUtil.getFieldSchemaList(columns))); + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", + table.getSd().getInputFormat()); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", + table.getSd().getOutputFormat()); + assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", + table.getSd().getSerdeInfo().getSerializationLib()); + assertEquals(2, table.getPartitionKeys().size()); + List partSchema = table.getPartitionKeys(); + assertEquals("emp_country", partSchema.get(0).getName()); + assertEquals("emp_state", partSchema.get(1).getName()); + + assertEquals(1, partitions.size()); + Partition partition = partitions.get(0); + assertEquals("IN", partition.getValues().get(0)); + assertEquals("TN", partition.getValues().get(1)); + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + } catch (Exception e) { + System.out.println("Test failed with " + e.getMessage()); + e.printStackTrace(); + throw e; + } + } + + @Override + protected void setUp() throws Exception { + System.out.println("Setup started"); + super.setUp(); + conf = new Configuration(); + job = new Job(conf, "test eximoutputformat"); + columns = new ArrayList(); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", + Constants.INT_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", + Constants.STRING_TYPE_NAME, ""))); + schema = new HCatSchema(columns); + + fs = new LocalFileSystem(); + fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration()); + outputLocation = new Path(fs.getWorkingDirectory(), "tmp/exports"); + if (fs.exists(outputLocation)) { + fs.delete(outputLocation, true); + } + dataLocation = new Path(fs.getWorkingDirectory(), "tmp/data"); + if (fs.exists(dataLocation)) { + fs.delete(dataLocation, true); + } + FSDataOutputStream ds = fs.create(dataLocation, true); + ds.writeBytes("237,Krishna,01/01/1990,M,IN,TN\n"); + ds.writeBytes("238,Kalpana,01/01/2000,F,IN,KA\n"); + ds.writeBytes("239,Satya,01/01/2001,M,US,TN\n"); + ds.writeBytes("240,Kavya,01/01/2002,F,US,KA\n"); + ds.close(); + + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(HCatEximOutputFormat.class); + TextInputFormat.setInputPaths(job, dataLocation); + job.setJarByClass(this.getClass()); + job.setMapperClass(TestMap.class); + job.setNumReduceTasks(0); + System.out.println("Setup done"); + } + + @Override + protected void tearDown() throws Exception { + System.out.println("Teardown started"); + super.tearDown(); + fs.delete(dataLocation, true); + fs.delete(outputLocation, true); + System.out.println("Teardown done"); + } +} diff --git src/test/org/apache/hcatalog/pig/TestHCatEximLoader.java src/test/org/apache/hcatalog/pig/TestHCatEximLoader.java new file mode 100644 index 0000000..7691fb9 --- /dev/null +++ src/test/org/apache/hcatalog/pig/TestHCatEximLoader.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.pig; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.TreeMap; + +import junit.framework.TestCase; + +import org.apache.hcatalog.MiniCluster; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.UDFContext; + +/** + * + * TestHCatEximLoader. Assumes Exim storer is working well + * + */ +public class TestHCatEximLoader extends TestCase { + + private static final String NONPART_TABLE = "junit_unparted"; + private static final String PARTITIONED_TABLE = "junit_parted"; + private static MiniCluster cluster = MiniCluster.buildCluster(); + + private static final String dataLocation = "/tmp/data"; + private static String fqdataLocation; + private static final String exportLocation = "/tmp/export"; + private static String fqexportLocation; + + private static Properties props; + + private void cleanup() throws IOException { + MiniCluster.deleteFile(cluster, dataLocation); + MiniCluster.deleteFile(cluster, exportLocation); + } + + @Override + protected void setUp() throws Exception { + props = new Properties(); + props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name")); + System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + + ", fs.default.name : " + props.getProperty("fs.default.name")); + fqdataLocation = cluster.getProperties().getProperty("fs.default.name") + dataLocation; + fqexportLocation = cluster.getProperties().getProperty("fs.default.name") + exportLocation; + System.out.println("FQ Data Location :" + fqdataLocation); + System.out.println("FQ Export Location :" + fqexportLocation); + cleanup(); + } + + @Override + protected void tearDown() throws Exception { + cleanup(); + } + + private void populateDataFile() throws IOException { + MiniCluster.deleteFile(cluster, dataLocation); + String[] input = new String[] { + "237,Krishna,01/01/1990,M,IN,TN", + "238,Kalpana,01/01/2000,F,IN,KA", + "239,Satya,01/01/2001,M,US,TN", + "240,Kavya,01/01/2002,F,US,KA" + }; + MiniCluster.createInputFile(cluster, dataLocation, input); + } + + private static class EmpDetail { + String name; + String dob; + String mf; + String country; + String state; + } + + private void assertEmpDetail(Tuple t, Map eds) throws ExecException { + assertNotNull(t); + assertEquals(6, t.size()); + + assertTrue(t.get(0).getClass() == Integer.class); + assertTrue(t.get(1).getClass() == String.class); + assertTrue(t.get(2).getClass() == String.class); + assertTrue(t.get(3).getClass() == String.class); + assertTrue(t.get(4).getClass() == String.class); + assertTrue(t.get(5).getClass() == String.class); + + EmpDetail ed = eds.remove(t.get(0)); + assertNotNull(ed); + + assertEquals(ed.name, t.get(1)); + assertEquals(ed.dob, t.get(2)); + assertEquals(ed.mf, t.get(3)); + assertEquals(ed.country, t.get(4)); + assertEquals(ed.state, t.get(5)); + } + + private void addEmpDetail(Map empDetails, int id, String name, + String dob, String mf, String country, String state) { + EmpDetail ed = new EmpDetail(); + ed.name = name; + ed.dob = dob; + ed.mf = mf; + ed.country = country; + ed.state = state; + empDetails.put(id, ed); + } + + + + private void assertEmpDetail(Tuple t, Integer id, String name, String dob, String mf) + throws ExecException { + assertNotNull(t); + assertEquals(4, t.size()); + assertTrue(t.get(0).getClass() == Integer.class); + assertTrue(t.get(1).getClass() == String.class); + assertTrue(t.get(2).getClass() == String.class); + assertTrue(t.get(3).getClass() == String.class); + + assertEquals(id, t.get(0)); + assertEquals(name, t.get(1)); + assertEquals(dob, t.get(2)); + assertEquals(mf, t.get(3)); + } + + private void assertEmpDetail(Tuple t, String mf, String name) + throws ExecException { + assertNotNull(t); + assertEquals(2, t.size()); + assertTrue(t.get(0).getClass() == String.class); + assertTrue(t.get(1).getClass() == String.class); + + assertEquals(mf, t.get(0)); + assertEquals(name, t.get(1)); + } + + + + public void testLoadNonPartTable() throws Exception { + populateDataFile(); + { + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server + .registerQuery("A = load '" + + fqdataLocation + + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); + server.registerQuery("store A into '" + NONPART_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');"); + server.executeBatch(); + } + { + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + + server + .registerQuery("A = load '" + + fqexportLocation + + "' using org.apache.hcatalog.pig.HCatEximLoader();"); + Iterator XIter = server.openIterator("A"); + assertTrue(XIter.hasNext()); + Tuple t = XIter.next(); + assertEmpDetail(t, 237, "Krishna", "01/01/1990", "M"); + assertTrue(XIter.hasNext()); + t = XIter.next(); + assertEmpDetail(t, 238, "Kalpana", "01/01/2000", "F"); + assertTrue(XIter.hasNext()); + t = XIter.next(); + assertEmpDetail(t, 239, "Satya", "01/01/2001", "M"); + assertTrue(XIter.hasNext()); + t = XIter.next(); + assertEmpDetail(t, 240, "Kavya", "01/01/2002", "F"); + assertFalse(XIter.hasNext()); + } + } + + public void testLoadNonPartProjection() throws Exception { + populateDataFile(); + { + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server + .registerQuery("A = load '" + + fqdataLocation + + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); + server.registerQuery("store A into '" + NONPART_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');"); + server.executeBatch(); + } + { + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + + server + .registerQuery("A = load '" + + fqexportLocation + + "' using org.apache.hcatalog.pig.HCatEximLoader();"); + server.registerQuery("B = foreach A generate emp_sex, emp_name;"); + + Iterator XIter = server.openIterator("B"); + assertTrue(XIter.hasNext()); + Tuple t = XIter.next(); + assertEmpDetail(t, "M", "Krishna"); + assertTrue(XIter.hasNext()); + t = XIter.next(); + assertEmpDetail(t, "F", "Kalpana"); + assertTrue(XIter.hasNext()); + t = XIter.next(); + assertEmpDetail(t, "M", "Satya"); + assertTrue(XIter.hasNext()); + t = XIter.next(); + assertEmpDetail(t, "F", "Kavya"); + assertFalse(XIter.hasNext()); + } + } + + + public void testLoadMultiPartTable() throws Exception { + { + populateDataFile(); + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server + .registerQuery("A = load '" + + fqdataLocation + + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);" + ); + server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';"); + server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';"); + server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';"); + server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';"); + server.registerQuery("store INTN into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + + "', 'emp_country=in,emp_state=tn');"); + server.registerQuery("store INKA into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + + "', 'emp_country=in,emp_state=ka');"); + server.registerQuery("store USTN into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + + "', 'emp_country=us,emp_state=tn');"); + server.registerQuery("store USKA into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + + "', 'emp_country=us,emp_state=ka');"); + server.executeBatch(); + } + { + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + + server + .registerQuery("A = load '" + + fqexportLocation + + "' using org.apache.hcatalog.pig.HCatEximLoader() " + //+ "as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"); + + ";"); + + Iterator XIter = server.openIterator("A"); + + Map empDetails = new TreeMap(); + addEmpDetail(empDetails, 237, "Krishna", "01/01/1990", "M", "in", "tn"); + addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka"); + addEmpDetail(empDetails, 239, "Satya", "01/01/2001", "M", "us", "tn"); + addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka"); + + while(XIter.hasNext()) { + Tuple t = XIter.next(); + assertNotSame(0, empDetails.size()); + assertEmpDetail(t, empDetails); + } + assertEquals(0, empDetails.size()); + } + } + + public void testLoadMultiPartFilter() throws Exception { + { + populateDataFile(); + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server + .registerQuery("A = load '" + + fqdataLocation + + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);" + ); + server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';"); + server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';"); + server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';"); + server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';"); + server.registerQuery("store INTN into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + + "', 'emp_country=in,emp_state=tn');"); + server.registerQuery("store INKA into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + + "', 'emp_country=in,emp_state=ka');"); + server.registerQuery("store USTN into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + + "', 'emp_country=us,emp_state=tn');"); + server.registerQuery("store USKA into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + + "', 'emp_country=us,emp_state=ka');"); + server.executeBatch(); + } + { + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + + server + .registerQuery("A = load '" + + fqexportLocation + + "' using org.apache.hcatalog.pig.HCatEximLoader() " + + ";"); + server.registerQuery("B = filter A by emp_state == 'ka';"); + + Iterator XIter = server.openIterator("B"); + + Map empDetails = new TreeMap(); + addEmpDetail(empDetails, 238, "Kalpana", "01/01/2000", "F", "in", "ka"); + addEmpDetail(empDetails, 240, "Kavya", "01/01/2002", "F", "us", "ka"); + + while(XIter.hasNext()) { + Tuple t = XIter.next(); + assertNotSame(0, empDetails.size()); + assertEmpDetail(t, empDetails); + } + assertEquals(0, empDetails.size()); + } + } + + +} diff --git src/test/org/apache/hcatalog/pig/TestHCatEximStorer.java src/test/org/apache/hcatalog/pig/TestHCatEximStorer.java new file mode 100644 index 0000000..2d2f0c4 --- /dev/null +++ src/test/org/apache/hcatalog/pig/TestHCatEximStorer.java @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.pig; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; + +import junit.framework.TestCase; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hcatalog.MiniCluster; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.UDFContext; + +public class TestHCatEximStorer extends TestCase { + + private static final String NONPART_TABLE = "junit_unparted"; + private static final String PARTITIONED_TABLE = "junit_parted"; + private static MiniCluster cluster = MiniCluster.buildCluster(); + + private static final String dataLocation = "/tmp/data"; + private static String fqdataLocation; + private static final String exportLocation = "/tmp/export"; + private static String fqexportLocation; + + private static Properties props; + + private void cleanup() throws IOException { + MiniCluster.deleteFile(cluster, dataLocation); + MiniCluster.deleteFile(cluster, exportLocation); + } + + @Override + protected void setUp() throws Exception { + props = new Properties(); + props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name")); + System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); + fqdataLocation = cluster.getProperties().getProperty("fs.default.name") + dataLocation; + fqexportLocation = cluster.getProperties().getProperty("fs.default.name") + exportLocation; + System.out.println("FQ Data Location :" + fqdataLocation); + System.out.println("FQ Export Location :" + fqexportLocation); + cleanup(); + } + + @Override + protected void tearDown() throws Exception { + cleanup(); + } + + private void populateDataFile() throws IOException { + MiniCluster.deleteFile(cluster, dataLocation); + String[] input = new String[] { + "237,Krishna,01/01/1990,M,IN,TN", + "238,Kalpana,01/01/2000,F,IN,KA", + "239,Satya,01/01/2001,M,US,TN", + "240,Kavya,01/01/2002,F,US,KA" + }; + MiniCluster.createInputFile(cluster, dataLocation, input); + } + + public void testStoreNonPartTable() throws Exception { + populateDataFile(); + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); + server.registerQuery("store A into '" + NONPART_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "');"); + server.executeBatch(); + + FileSystem fs = cluster.getFileSystem(); + + System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); + + Map.Entry> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata")); + Table table = metadata.getKey(); + List partitions = metadata.getValue(); + + List columns = new ArrayList(); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", + Constants.INT_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", + Constants.STRING_TYPE_NAME, ""))); + + + assertEquals("default", table.getDbName()); + assertEquals(NONPART_TABLE, table.getTableName()); + assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), + HCatUtil.getFieldSchemaList(columns))); + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", + table.getSd().getInputFormat()); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", + table.getSd().getOutputFormat()); + assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", + table.getSd().getSerdeInfo().getSerializationLib()); + assertEquals(0, table.getPartitionKeys().size()); + + assertEquals(0, partitions.size()); + } + + public void testStorePartTable() throws Exception { + populateDataFile(); + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); + server.registerQuery("store A into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=tn');"); + server.executeBatch(); + + FileSystem fs = cluster.getFileSystem(); + + System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); + + Map.Entry> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata")); + Table table = metadata.getKey(); + List partitions = metadata.getValue(); + + List columns = new ArrayList(); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", + Constants.INT_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", + Constants.STRING_TYPE_NAME, ""))); + + + assertEquals("default", table.getDbName()); + assertEquals(PARTITIONED_TABLE, table.getTableName()); + assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), + HCatUtil.getFieldSchemaList(columns))); + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", + table.getSd().getInputFormat()); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", + table.getSd().getOutputFormat()); + assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", + table.getSd().getSerdeInfo().getSerializationLib()); + assertEquals(2, table.getPartitionKeys().size()); + List partSchema = table.getPartitionKeys(); + assertEquals("emp_country", partSchema.get(0).getName()); + assertEquals("emp_state", partSchema.get(1).getName()); + + assertEquals(1, partitions.size()); + Partition partition = partitions.get(0); + assertEquals("in", partition.getValues().get(0)); + assertEquals("tn", partition.getValues().get(1)); + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + } + + public void testStoreNonPartCompatSchemaTable() throws Exception { + populateDataFile(); + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); + server.registerQuery("store A into '" + NONPART_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', '', 'id:int, name:chararray, dob:chararray, sex:chararray');"); + server.executeBatch(); + + FileSystem fs = cluster.getFileSystem(); + + System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); + + Map.Entry> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata")); + Table table = metadata.getKey(); + List partitions = metadata.getValue(); + + List columns = new ArrayList(); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("id", + Constants.INT_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("name", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("dob", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("sex", + Constants.STRING_TYPE_NAME, ""))); + + + assertEquals("default", table.getDbName()); + assertEquals(NONPART_TABLE, table.getTableName()); + assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), + HCatUtil.getFieldSchemaList(columns))); + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", + table.getSd().getInputFormat()); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", + table.getSd().getOutputFormat()); + assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", + table.getSd().getSerdeInfo().getSerializationLib()); + assertEquals(0, table.getPartitionKeys().size()); + + assertEquals(0, partitions.size()); + } + + public void testStoreNonPartNonCompatSchemaTable() throws Exception { + populateDataFile(); + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray);"); + server.registerQuery("store A into '" + NONPART_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', '', 'id:int, name:chararray, dob:chararray, sex:int');"); + try { + server.executeBatch(); + fail("Expected exception not thrown"); + } catch (FrontendException e) { + } + } + + public void testStoreMultiPartTable() throws Exception { + populateDataFile(); + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '" + fqdataLocation + "' using PigStorage(',') as (emp_id:int, emp_name:chararray, emp_dob:chararray, emp_sex:chararray, emp_country:chararray, emp_state:chararray);"); + server.registerQuery("INTN = FILTER A BY emp_country == 'IN' AND emp_state == 'TN';"); + server.registerQuery("INKA = FILTER A BY emp_country == 'IN' AND emp_state == 'KA';"); + server.registerQuery("USTN = FILTER A BY emp_country == 'US' AND emp_state == 'TN';"); + server.registerQuery("USKA = FILTER A BY emp_country == 'US' AND emp_state == 'KA';"); + server.registerQuery("store INTN into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=tn');"); + server.registerQuery("store INKA into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=in,emp_state=ka');"); + server.registerQuery("store USTN into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=us,emp_state=tn');"); + server.registerQuery("store USKA into '" + PARTITIONED_TABLE + + "' using org.apache.hcatalog.pig.HCatEximStorer('" + fqexportLocation + "', 'emp_country=us,emp_state=ka');"); + server.executeBatch(); + + FileSystem fs = cluster.getFileSystem(); + + System.out.println("Filesystem class : " + cluster.getFileSystem().getClass().getName() + ", fs.default.name : " + props.getProperty("fs.default.name")); + + Map.Entry> metadata = EximUtil.readMetaData(fs, new Path(exportLocation, "_metadata")); + Table table = metadata.getKey(); + List partitions = metadata.getValue(); + + List columns = new ArrayList(); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_id", + Constants.INT_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_name", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_dob", + Constants.STRING_TYPE_NAME, ""))); + columns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("emp_sex", + Constants.STRING_TYPE_NAME, ""))); + + + assertEquals("default", table.getDbName()); + assertEquals(PARTITIONED_TABLE, table.getTableName()); + assertTrue(EximUtil.schemaCompare(table.getSd().getCols(), + HCatUtil.getFieldSchemaList(columns))); + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + table.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + table.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileInputFormat", + table.getSd().getInputFormat()); + assertEquals("org.apache.hadoop.hive.ql.io.RCFileOutputFormat", + table.getSd().getOutputFormat()); + assertEquals("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", + table.getSd().getSerdeInfo().getSerializationLib()); + assertEquals(2, table.getPartitionKeys().size()); + List partSchema = table.getPartitionKeys(); + assertEquals("emp_country", partSchema.get(0).getName()); + assertEquals("emp_state", partSchema.get(1).getName()); + + assertEquals(4, partitions.size()); + Set parts = new TreeSet(); + parts.add("in,tn"); + parts.add("in,ka"); + parts.add("us,tn"); + parts.add("us,ka"); + + for (Partition partition : partitions) { + assertEquals("org.apache.hcatalog.rcfile.RCFileInputDriver", + partition.getParameters().get(HCatConstants.HCAT_ISD_CLASS)); + assertEquals("org.apache.hcatalog.rcfile.RCFileOutputDriver", + partition.getParameters().get(HCatConstants.HCAT_OSD_CLASS)); + assertTrue(parts.remove(partition.getValues().get(0) + "," + partition.getValues().get(1))); + } + assertEquals(0, parts.size()); + } +}