diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java index 13848b6..a00762f 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java @@ -19,12 +19,17 @@ import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.util.Utf8; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.mapred.JobConf; @@ -56,23 +61,35 @@ */ public static Schema determineSchemaOrThrowException(Properties properties) throws IOException, AvroSerdeException { - String schemaString = properties.getProperty(SCHEMA_LITERAL); - if(schemaString != null && !schemaString.equals(SCHEMA_NONE)) + String schemaString = properties.getProperty(SCHEMA_LITERAL); + if (schemaString != null && !schemaString.equals(SCHEMA_NONE)) return Schema.parse(schemaString); + Configuration conf = new Configuration(); + // Try pulling directly from URL schemaString = properties.getProperty(SCHEMA_URL); - if(schemaString == null || schemaString.equals(SCHEMA_NONE)) - throw new AvroSerdeException(EXCEPTION_MESSAGE); + if (schemaString != null && !schemaString.equals(SCHEMA_NONE)) { + try { + if(schemaString.toLowerCase().startsWith("hdfs://")) { + return getSchemaFromHDFS(schemaString, conf); + } else { + return Schema.parse(new URL(schemaString).openStream()); + } + } catch(IOException ioe) { + throw new AvroSerdeException("Unable to read schema from HDFS: " + schemaString, ioe); + } + } - try { - if(schemaString.toLowerCase().startsWith("hdfs://")) - return getSchemaFromHDFS(schemaString, new Configuration()); - } catch(IOException ioe) { - throw new AvroSerdeException("Unable to read schema from HDFS: " + schemaString, ioe); + schemaString = properties.getProperty("location"); + if(schemaString != null) { + final Schema schema = getSchemaFromAvroFile(schemaString, conf); + if (schema != null) { + return schema; + } } - return Schema.parse(new URL(schemaString).openStream()); + throw new AvroSerdeException(EXCEPTION_MESSAGE); } /** @@ -110,6 +127,46 @@ protected static Schema getSchemaFromHDFS(String schemaHDFSUrl, } } + + // Protected for testing and so we can pass in a conf for testing. + protected static Schema getSchemaFromAvroFile(String schemaHDFSUrl, + Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(conf); + final Path path = new Path(schemaHDFSUrl); + + FileStatus[] matchedFiles = null; + final FileStatus fileStatus = fs.getFileStatus(path); + if (fileStatus.isDir()) { + matchedFiles = fs.listStatus(path, AVRO_PATH_FILTER); + } else if (AVRO_PATH_FILTER.accept(path)) { + matchedFiles = new FileStatus[] { fileStatus }; + } + + if (matchedFiles == null || matchedFiles.length == 0) { + LOG.warn("No *.avro file found at path: " + schemaHDFSUrl); + return null; + } + + for (FileStatus file : matchedFiles) { + DataFileStream r = null; + try { + r = new DataFileStream(fs.open(file.getPath()), new GenericDatumReader()); + final Schema schema = r.getSchema(); + LOG.debug("Schema read from avro file: " + file.getPath()); + return schema; + } catch (Exception e) { + LOG.warn("Ignoring avro file: " + file.getPath().getName() + " because of exception", e); + } finally { + if(r != null) { + r.close(); + } + } + } + + LOG.warn("No valid schema found in any avro file at location: " + schemaHDFSUrl); + return null; + } + /** * Determine if an Avro schema is of type Union[T, NULL]. Avro supports nullable * types via a union of type T and null. This is a very common use case. @@ -146,4 +203,13 @@ public static boolean insideMRJob(JobConf job) { && (HiveConf.getVar(job, HiveConf.ConfVars.PLAN) != null) && (!HiveConf.getVar(job, HiveConf.ConfVars.PLAN).isEmpty()); } + + /** ignore hdfs files with suffix other than .avro */ + protected static final PathFilter AVRO_PATH_FILTER = new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().endsWith(".avro"); + } + }; + }